Send same transaction if last send attempt failed

This commit is contained in:
Andrew Morgan 2018-06-11 20:48:20 +01:00
parent bc2ea24445
commit d5865fa67d
6 changed files with 168 additions and 83 deletions

View file

@ -51,10 +51,13 @@ func SetupAppServiceAPIComponent(
// events to be sent out.
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for _, appservice := range base.Cfg.Derived.ApplicationServices {
eventCount := 0
m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{
AppService: appservice,
Cond: sync.NewCond(&m),
AppService: appservice,
Cond: sync.NewCond(&m),
EventsReady: &eventCount,
}
workerStates = append(workerStates, ws)
}

View file

@ -175,15 +175,15 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
for _, event := range events {
for _, ws := range s.workerStates {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
if s.appserviceIsInterestedInEvent(ctx, &event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else {
// Tell our worker to send out new messages by setting dirty bit for that
// worker to true, and waking them up with a broadcast
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.Cond.L.Lock()
ws.EventsReady = true
*ws.EventsReady++
ws.Cond.Broadcast()
ws.Cond.L.Unlock()
}
@ -196,7 +196,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool {
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.Event, appservice config.ApplicationService) bool {
// Check sender of the event
for _, userNamespace := range appservice.NamespaceMap["users"] {
if userNamespace.RegexpObject.MatchString(event.Sender()) {

View file

@ -17,6 +17,7 @@ package storage
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
@ -43,15 +44,19 @@ CREATE TABLE IF NOT EXISTS appservice_events (
-- The JSON representation of the event's content. Text to avoid db JSON parsing
event_content TEXT,
-- The ID of the transaction that this event is a part of
txn_id INTEGER
txn_id BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, COUNT(id) OVER() AS full_count " +
"FROM appservice_events WHERE as_id = $1 ORDER BY id ASC LIMIT $2"
const selectPastEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
"FROM appservice_events WHERE as_id = $1 AND txn_id > -1 LIMIT $2"
const selectCurrEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
"FROM appservice_events WHERE as_id = $1 AND txn_id = -1 LIMIT $2"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
@ -60,14 +65,19 @@ const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE id <= $1"
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
selectPastEventsByApplicationServiceIDStmt *sql.Stmt
selectCurrEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
}
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
@ -76,7 +86,10 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
return
}
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
if s.selectPastEventsByApplicationServiceIDStmt, err = db.Prepare(selectPastEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.selectCurrEventsByApplicationServiceIDStmt, err = db.Prepare(selectCurrEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
@ -85,6 +98,9 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return
}
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
return
}
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
return
}
@ -95,30 +111,57 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
// selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database
// once successfully sent to an application service. The total event count is
// used by a worker to determine if more events need to be pulled from the DB
// later.
// once successfully sent to an application service.
func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context,
applicationServiceID string,
limit int,
) (
maxID, totalEvents int,
txnID, maxID int,
events []gomatrixserverlib.ApplicationServiceEvent,
err error,
) {
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
// First check to see if there are any events part of an old transaction
eventRowsPast, err := s.selectPastEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
if err != nil {
return 0, 0, nil, err
}
defer func() {
err = eventRows.Close()
err = eventRowsPast.Close()
if err != nil {
log.WithError(err).Fatalf("Appservice %s unable to select past events to send",
applicationServiceID)
}
}()
events, txnID, maxID, err = retrieveEvents(eventRowsPast)
if err != nil {
return 0, 0, nil, err
}
if len(events) > 0 {
return
}
// Else, if there are old events with existing transaction IDs, grab a batch of new events
eventRowsCurr, err := s.selectCurrEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
if err != nil {
return 0, 0, nil, err
}
defer func() {
err = eventRowsCurr.Close()
if err != nil {
log.WithError(err).Fatalf("Appservice %s unable to select new events to send",
applicationServiceID)
}
}()
events, _, maxID, err = retrieveEvents(eventRowsCurr)
if err != nil {
return 0, 0, nil, err
}
return -1, maxID, events, err
}
func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, txnID, maxID int, err error) {
// Iterate through each row and store event contents
for eventRows.Next() {
var event gomatrixserverlib.ApplicationServiceEvent
@ -132,10 +175,11 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
&event.Type,
&event.UserID,
&eventContent,
&totalEvents,
&txnID,
)
if err != nil {
return 0, 0, nil, err
fmt.Println("Failed:", err.Error())
return nil, 0, 0, err
}
if eventContent.Valid {
event.Content = gomatrixserverlib.RawJSON(eventContent.String)
@ -177,7 +221,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event gomatrixserverlib.Event,
event *gomatrixserverlib.Event,
) (err error) {
_, err = s.insertEventStmt.ExecContext(
ctx,
@ -188,16 +232,29 @@ func (s *eventsStatements) insertEvent(
event.Type(),
event.Sender(),
event.Content(),
nil,
-1,
)
return
}
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
// before sending them to an AppService. Referenced before sending to make sure
// we aren't constructing multiple transactions with the same events.
func (s *eventsStatements) updateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) (err error) {
_, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
return
}
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) (err error) {
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventTableID)
return err
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
return
}

View file

@ -56,7 +56,7 @@ func (d *Database) prepare() error {
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event gomatrixserverlib.Event,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
@ -80,14 +80,26 @@ func (d *Database) CountEventsWithAppServiceID(
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, eventTableID)
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetTxnIDWithAppServiceID takes in an application service ID and returns the

View file

@ -23,9 +23,10 @@ import (
// roomserver to notify appservice workers when there are events ready to send
// externally to application services.
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
EventsReady bool
AppService config.ApplicationService
Cond *sync.Cond
// Events ready to be sent
EventsReady *int
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
}

View file

@ -32,11 +32,12 @@ import (
)
var (
// TODO: Expose these in the config?
// Maximum size of events sent in each transaction.
// Warning, if this is lowered and a number of events greater than the previous
// batch size were still to be sent, then a number of events equal to the
// difference will be ignored by the app service.
// TL;DR: Don't lower this number with any AS events still left in the database.
transactionBatchSize = 50
// Time to wait between checking for new events to send.
transactionBreakTime = time.Millisecond * 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 15
// The current transaction ID. Increments after every successful transaction.
@ -65,6 +66,7 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
log.Infof("Starting Application Service %s", ws.AppService.ID)
ctx := context.Background()
// Initialize transaction ID counter
@ -88,40 +90,31 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.AppService.ID)
return
}
// Wait if there are no new events to go out
if eventCount == 0 {
waitForEvents(&ws)
}
ws.Cond.L.Lock()
*ws.EventsReady = eventCount
ws.Cond.L.Unlock()
// Loop forever and keep waiting for more events to send
for {
// Set EventsReady to false for some reason (we just sent events?)
ws.Cond.L.Lock()
ws.EventsReady = false
ws.Cond.L.Unlock()
maxID, totalEvents, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize)
if err != nil {
log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID)
// Wait a little bit for DB to possibly recover
time.Sleep(transactionBreakTime)
continue
// Wait for more events if we've sent all the events in the database
if *ws.EventsReady <= 0 {
fmt.Println("Waiting")
ws.Cond.L.Lock()
ws.Cond.Wait()
ws.Cond.L.Unlock()
}
// Batch events up into a transaction
transactionJSON, err := createTransaction(events)
eventsCount, maxEventID, transactionID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to marshal events",
log.WithError(err).Fatalf("appservice %s worker unable to create transaction",
ws.AppService.ID)
return
}
// Send the events off to the application service
err = send(client, ws.AppService, transactionJSON)
err = send(client, ws.AppService, transactionID, transactionJSON)
if err != nil {
// Backoff
backoff(err, &ws)
@ -131,8 +124,12 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// We sent successfully, hooray!
ws.Backoff = 0
ws.Cond.L.Lock()
*ws.EventsReady -= eventsCount
ws.Cond.L.Unlock()
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, maxID)
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.WithError(err).Fatalf("unable to remove appservice events from the database for %s",
ws.AppService.ID)
@ -146,29 +143,15 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.AppService.ID)
return
}
// Only wait for more events once we've sent all the events in the database
if totalEvents <= transactionBatchSize {
waitForEvents(&ws)
}
}
}
// waitForEvents pauses the calling goroutine while it waits for a broadcast message
func waitForEvents(ws *types.ApplicationServiceWorkerState) {
ws.Cond.L.Lock()
if !ws.EventsReady {
// Wait for a broadcast about new events
ws.Cond.Wait()
}
ws.Cond.L.Unlock()
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(err error, ws *types.ApplicationServiceWorkerState) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
ws.AppService.ID, backoffDuration)
@ -184,19 +167,47 @@ func backoff(err error, ws *types.ApplicationServiceWorkerState) {
// createTransaction takes in a slice of AS events, stores them in an AS
// transaction, and JSON-encodes the results.
func createTransaction(
events []gomatrixserverlib.ApplicationServiceEvent,
) ([]byte, error) {
// Create a transactions and store the events inside
ctx context.Context,
db *storage.Database,
appserviceID string,
) (
eventsCount, maxID, txnID int,
transactionJSON []byte,
err error,
) {
transactionID := currentTransactionID
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB",
appserviceID)
return
}
// Check if these are old events we are resending. If so, reuse old transactionID
if txnID != -1 {
transactionID = txnID
} else {
// Mark new events with current transactionID
err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, transactionID)
if err != nil {
return 0, 0, 0, nil, err
}
}
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
Events: events,
}
transactionJSON, err := json.Marshal(transaction)
transactionJSON, err = json.Marshal(transaction)
if err != nil {
return nil, err
return
}
return transactionJSON, nil
return len(events), maxID, transactionID, transactionJSON, nil
}
// send sends events to an application service. Returns an error if an OK was not
@ -204,10 +215,11 @@ func createTransaction(
func send(
client *http.Client,
appservice config.ApplicationService,
transactionID int,
transaction []byte,
) error {
// POST a transaction to our AS.
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID)
// POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, transactionID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil {
return err