Reduce cyclomatic complexity

This commit is contained in:
Andrew Morgan 2018-06-11 13:25:03 +01:00
parent 7c31687c36
commit 1fac994d61
2 changed files with 39 additions and 36 deletions

View file

@ -20,11 +20,14 @@ import (
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
) )
// ApplicationServiceWorkerState is a type that pairs and application service with a // ApplicationServiceWorkerState is a type that couples an application service,
// lockable condition, allowing the roomserver to notify appservice workers when // a lockable condition as well as some other state variables, allowing the
// there are events ready to send externally to application services. // roomserver to notify appservice workers when there are events ready to send
// externally to application services.
type ApplicationServiceWorkerState struct { type ApplicationServiceWorkerState struct {
AppService config.ApplicationService AppService config.ApplicationService
Cond *sync.Cond Cond *sync.Cond
EventsReady bool EventsReady bool
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
} }

View file

@ -81,9 +81,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
Timeout: transactionTimeout, Timeout: transactionTimeout,
} }
// Initialize backoff exponent (2^x secs). Max 6, aka 64s.
backoff := 0
// Initial check for any leftover events to send from last time // Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil { if err != nil {
@ -94,12 +91,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Wait if there are no new events to go out // Wait if there are no new events to go out
if eventCount == 0 { if eventCount == 0 {
ws.Cond.L.Lock() waitForEvents(&ws)
if !ws.EventsReady {
// Wait for a broadcast about new events
ws.Cond.Wait()
}
ws.Cond.L.Unlock()
} }
// Loop forever and keep waiting for more events to send // Loop forever and keep waiting for more events to send
@ -122,34 +114,22 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Batch events up into a transaction // Batch events up into a transaction
transactionJSON, err := createTransaction(events) transactionJSON, err := createTransaction(events)
if err != nil { if err != nil {
log.WithError(err).Fatal("appservice %s worker unable to marshal events", log.WithError(err).Fatalf("appservice %s worker unable to marshal events",
ws.AppService.ID) ws.AppService.ID)
return return
} }
// Send the events off to the application service // Send the events off to the application service
err = send(client, ws.AppService, transactionJSON, len(events)) err = send(client, ws.AppService, transactionJSON)
if err != nil { if err != nil {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
ws.AppService.ID, backoffDuration)
// Increment backoff count
backoff++
if backoff > 6 {
backoff = 6
}
// Backoff // Backoff
time.Sleep(backoffSeconds) backoff(err, &ws)
continue continue
} }
// We sent successfully, hooray! // We sent successfully, hooray!
backoff = 0 ws.Backoff = 0
// Remove sent events from the DB // Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1])
@ -166,7 +146,12 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.AppService.ID) ws.AppService.ID)
return return
} }
waitForEvents(&ws)
}
}
// waitForEvents pauses the calling goroutine while it waits for a broadcast message
func waitForEvents(ws *types.ApplicationServiceWorkerState) {
ws.Cond.L.Lock() ws.Cond.L.Lock()
if !ws.EventsReady { if !ws.EventsReady {
// Wait for a broadcast about new events // Wait for a broadcast about new events
@ -174,6 +159,22 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
} }
ws.Cond.L.Unlock() ws.Cond.L.Unlock()
} }
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(err error, ws *types.ApplicationServiceWorkerState) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
ws.AppService.ID, backoffDuration)
ws.Backoff++
if ws.Backoff > 6 {
ws.Backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
} }
// createTransaction takes in a slice of AS events, stores them in an AS // createTransaction takes in a slice of AS events, stores them in an AS
@ -200,7 +201,6 @@ func send(
client *http.Client, client *http.Client,
appservice config.ApplicationService, appservice config.ApplicationService,
transaction []byte, transaction []byte,
count int,
) error { ) error {
// POST a transaction to our AS. // POST a transaction to our AS.
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID) address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID)