diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go index a013e8b2..ac83b264 100644 --- a/src/github.com/matrix-org/dendrite/appservice/types/types.go +++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go @@ -20,11 +20,14 @@ import ( "github.com/matrix-org/dendrite/common/config" ) -// ApplicationServiceWorkerState is a type that pairs and application service with a -// lockable condition, allowing the roomserver to notify appservice workers when -// there are events ready to send externally to application services. +// ApplicationServiceWorkerState is a type that couples an application service, +// a lockable condition as well as some other state variables, allowing the +// roomserver to notify appservice workers when there are events ready to send +// externally to application services. type ApplicationServiceWorkerState struct { AppService config.ApplicationService Cond *sync.Cond EventsReady bool + // Backoff exponent (2^x secs). Max 6, aka 64s. + Backoff int } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index ba169a91..671824a1 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -81,9 +81,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { Timeout: transactionTimeout, } - // Initialize backoff exponent (2^x secs). Max 6, aka 64s. - backoff := 0 - // Initial check for any leftover events to send from last time eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) if err != nil { @@ -94,12 +91,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // Wait if there are no new events to go out if eventCount == 0 { - ws.Cond.L.Lock() - if !ws.EventsReady { - // Wait for a broadcast about new events - ws.Cond.Wait() - } - ws.Cond.L.Unlock() + waitForEvents(&ws) } // Loop forever and keep waiting for more events to send @@ -122,34 +114,22 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { // Batch events up into a transaction transactionJSON, err := createTransaction(events) if err != nil { - log.WithError(err).Fatal("appservice %s worker unable to marshal events", + log.WithError(err).Fatalf("appservice %s worker unable to marshal events", ws.AppService.ID) return } // Send the events off to the application service - err = send(client, ws.AppService, transactionJSON, len(events)) + err = send(client, ws.AppService, transactionJSON) if err != nil { - // Calculate how long to backoff for - backoffDuration := time.Duration(math.Pow(2, float64(backoff))) - backoffSeconds := time.Second * backoffDuration - log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", - ws.AppService.ID, backoffDuration) - - // Increment backoff count - backoff++ - if backoff > 6 { - backoff = 6 - } - // Backoff - time.Sleep(backoffSeconds) + backoff(err, &ws) continue } // We sent successfully, hooray! - backoff = 0 + ws.Backoff = 0 // Remove sent events from the DB err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) @@ -166,16 +146,37 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.AppService.ID) return } - - ws.Cond.L.Lock() - if !ws.EventsReady { - // Wait for a broadcast about new events - ws.Cond.Wait() - } - ws.Cond.L.Unlock() + waitForEvents(&ws) } } +// waitForEvents pauses the calling goroutine while it waits for a broadcast message +func waitForEvents(ws *types.ApplicationServiceWorkerState) { + ws.Cond.L.Lock() + if !ws.EventsReady { + // Wait for a broadcast about new events + ws.Cond.Wait() + } + ws.Cond.L.Unlock() +} + +// backoff pauses the calling goroutine for a 2^some backoff exponent seconds +func backoff(err error, ws *types.ApplicationServiceWorkerState) { + // Calculate how long to backoff for + backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff))) + backoffSeconds := time.Second * backoffDuration + log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", + ws.AppService.ID, backoffDuration) + + ws.Backoff++ + if ws.Backoff > 6 { + ws.Backoff = 6 + } + + // Backoff + time.Sleep(backoffSeconds) +} + // createTransaction takes in a slice of AS events, stores them in an AS // transaction, and JSON-encodes the results. func createTransaction( @@ -200,7 +201,6 @@ func send( client *http.Client, appservice config.ApplicationService, transaction []byte, - count int, ) error { // POST a transaction to our AS. address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID)