diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 82fb59ab..93b28f5e 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -50,13 +50,9 @@ CREATE TABLE IF NOT EXISTS appservice_events ( CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` -const selectPastEventsByApplicationServiceIDSQL = "" + +const selectEventsByApplicationServiceIDSQL = "" + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " + - "FROM appservice_events WHERE as_id = $1 AND txn_id > -1 LIMIT $2" - -const selectCurrEventsByApplicationServiceIDSQL = "" + - "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " + - "FROM appservice_events WHERE as_id = $1 AND txn_id = -1 LIMIT $2" + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC LIMIT $2" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" @@ -72,12 +68,11 @@ const deleteEventsBeforeAndIncludingIDSQL = "" + "DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2" type eventsStatements struct { - selectPastEventsByApplicationServiceIDStmt *sql.Stmt - selectCurrEventsByApplicationServiceIDStmt *sql.Stmt - countEventsByApplicationServiceIDStmt *sql.Stmt - insertEventStmt *sql.Stmt - updateTxnIDForEventsStmt *sql.Stmt - deleteEventsBeforeAndIncludingIDStmt *sql.Stmt + selectEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + updateTxnIDForEventsStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt } func (s *eventsStatements) prepare(db *sql.DB) (err error) { @@ -86,10 +81,7 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { return } - if s.selectPastEventsByApplicationServiceIDStmt, err = db.Prepare(selectPastEventsByApplicationServiceIDSQL); err != nil { - return - } - if s.selectCurrEventsByApplicationServiceIDStmt, err = db.Prepare(selectCurrEventsByApplicationServiceIDSQL); err != nil { + if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { return } if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { @@ -121,28 +113,8 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( events []gomatrixserverlib.ApplicationServiceEvent, err error, ) { - // First check to see if there are any events part of an old transaction - eventRowsPast, err := s.selectPastEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) - if err != nil { - return 0, 0, nil, err - } - defer func() { - err = eventRowsPast.Close() - if err != nil { - log.WithError(err).Fatalf("Appservice %s unable to select past events to send", - applicationServiceID) - } - }() - events, txnID, maxID, err = retrieveEvents(eventRowsPast) - if err != nil { - return 0, 0, nil, err - } - if len(events) > 0 { - return - } - - // Else, if there are old events with existing transaction IDs, grab a batch of new events - eventRowsCurr, err := s.selectCurrEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) + // Retrieve events from the database. Unsuccessfully sent events first + eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) if err != nil { return 0, 0, nil, err } @@ -163,6 +135,9 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, txnID, maxID int, err error) { // Iterate through each row and store event contents + // If txn_id changes dramatically, we've switched from collecting old events to + // new ones. Send back those events first. + lastTxnID := -2 // Invalid transaction ID for eventRows.Next() { var event gomatrixserverlib.ApplicationServiceEvent var eventContent sql.NullString @@ -181,6 +156,14 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application fmt.Println("Failed:", err.Error()) return nil, 0, 0, err } + + // If txnID has changed on this event from the previous event, then we've + // reached the end of a transaction's events. Return only those events. + if lastTxnID > -2 && lastTxnID != txnID { + return + } + lastTxnID = txnID + if eventContent.Valid { event.Content = gomatrixserverlib.RawJSON(eventContent.String) } @@ -192,7 +175,8 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application ageMilli := time.Now().UnixNano() / int64(time.Millisecond) event.Age = ageMilli - event.OriginServerTimestamp - // TODO: Synapse does this. Do app services really require this? :) + // TODO: Synapse does this. It's unnecessary to send Sender and UserID as the + // same value. Do app services really require this? :) event.Sender = event.UserID events = append(events, event) diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index d73bb56a..c6bb0e86 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -39,7 +39,7 @@ var ( // TL;DR: Don't lower this number with any AS events still left in the database. transactionBatchSize = 50 // Timeout for sending a single transaction to an application service. - transactionTimeout = time.Second * 15 + transactionTimeout = time.Second * 60 // The current transaction ID. Increments after every successful transaction. currentTransactionID = 0 ) @@ -98,7 +98,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { for { // Wait for more events if we've sent all the events in the database if *ws.EventsReady <= 0 { - fmt.Println("Waiting") ws.Cond.L.Lock() ws.Cond.Wait() ws.Cond.L.Unlock() @@ -114,11 +113,15 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { } // Send the events off to the application service - err = send(client, ws.AppService, transactionID, transactionJSON) - if err != nil { - // Backoff - backoff(err, &ws) - continue + // Backoff if the application service does not respond + for { + err = send(client, ws.AppService, transactionID, transactionJSON) + if err != nil { + // Backoff + backoff(&ws, err) + continue + } + break } // We sent successfully, hooray! @@ -147,7 +150,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { } // backoff pauses the calling goroutine for a 2^some backoff exponent seconds -func backoff(err error, ws *types.ApplicationServiceWorkerState) { +func backoff(ws *types.ApplicationServiceWorkerState, err error) { // Calculate how long to backoff for backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) backoffSeconds := time.Second * backoffDuration @@ -233,9 +236,8 @@ func send( // Check the AS received the events correctly if resp.StatusCode != http.StatusOK { - return fmt.Errorf( - "Non-OK status code %d returned from AS", resp.StatusCode, - ) + // TODO: Handle non-200 error codes from application services + return fmt.Errorf("Non-OK status code %d returned from AS", resp.StatusCode) } return nil