From af841cdcc54abe6985a8fffd5a28d10b71097b36 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 12 Jun 2018 15:38:45 +0100 Subject: [PATCH] Remove tight send loop. Fix events not being deleted --- .../storage/appservice_events_table.go | 15 ++++---- .../workers/transaction_scheduler.go | 35 ++++++++----------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 93b28f5e..89b5283f 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -125,15 +125,18 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( applicationServiceID) } }() - events, _, maxID, err = retrieveEvents(eventRowsCurr) + events, maxID, txnID, err = retrieveEvents(eventRowsCurr) if err != nil { return 0, 0, nil, err } - return -1, maxID, events, err + return } -func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, txnID, maxID int, err error) { +func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) { + // Get current time for use in calculating event age + nowMilli := time.Now().UnixNano() / int64(time.Millisecond) + // Iterate through each row and store event contents // If txn_id changes dramatically, we've switched from collecting old events to // new ones. Send back those events first. @@ -172,8 +175,8 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application } // Get age of the event from original timestamp and current time - ageMilli := time.Now().UnixNano() / int64(time.Millisecond) - event.Age = ageMilli - event.OriginServerTimestamp + // TODO: Consider removing age as not many app services use it + event.Age = nowMilli - event.OriginServerTimestamp // TODO: Synapse does this. It's unnecessary to send Sender and UserID as the // same value. Do app services really require this? :) @@ -216,7 +219,7 @@ func (s *eventsStatements) insertEvent( event.Type(), event.Sender(), event.Content(), - -1, + -1, // No transaction ID yet ) return } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index c6bb0e86..47bb78d9 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -104,7 +104,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { } // Batch events up into a transaction - eventsCount, maxEventID, transactionID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) + eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) if err != nil { log.WithError(err).Fatalf("appservice %s worker unable to create transaction", ws.AppService.ID) @@ -114,14 +114,11 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // Send the events off to the application service // Backoff if the application service does not respond - for { - err = send(client, ws.AppService, transactionID, transactionJSON) - if err != nil { - // Backoff - backoff(&ws, err) - continue - } - break + err = send(client, ws.AppService, txnID, transactionJSON) + if err != nil { + // Backoff + backoff(&ws, err) + continue } // We sent successfully, hooray! @@ -174,12 +171,10 @@ func createTransaction( db *storage.Database, appserviceID string, ) ( - eventsCount, maxID, txnID int, + eventsCount, txnID, maxID int, transactionJSON []byte, err error, ) { - transactionID := currentTransactionID - // Retrieve the latest events from the DB (will return old events if they weren't successfully sent) txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) if err != nil { @@ -189,12 +184,11 @@ func createTransaction( return } - // Check if these are old events we are resending. If so, reuse old transactionID - if txnID != -1 { - transactionID = txnID - } else { + // Check if these events already have a transaction ID + if txnID == -1 { + txnID = currentTransactionID // Mark new events with current transactionID - err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, transactionID) + err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, currentTransactionID) if err != nil { return 0, 0, 0, nil, err } @@ -210,7 +204,8 @@ func createTransaction( return } - return len(events), maxID, transactionID, transactionJSON, nil + eventsCount = len(events) + return } // send sends events to an application service. Returns an error if an OK was not @@ -218,11 +213,11 @@ func createTransaction( func send( client *http.Client, appservice config.ApplicationService, - transactionID int, + txnID int, transaction []byte, ) error { // POST a transaction to our AS - address := fmt.Sprintf("%s/transactions/%d", appservice.URL, transactionID) + address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID) resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) if err != nil { return err