diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go index 3d36a917..632dee00 100644 --- a/src/github.com/matrix-org/dendrite/appservice/appservice.go +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -15,9 +15,12 @@ package appservice import ( + "sync" + "github.com/matrix-org/dendrite/appservice/consumers" "github.com/matrix-org/dendrite/appservice/routing" "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common/basecomponent" @@ -43,25 +46,29 @@ func SetupAppServiceAPIComponent( logrus.WithError(err).Panicf("failed to connect to appservice db") } - // Create a map that will keep a counter of events to be sent for each - // application service. This serves as an effective cache so that transaction - // workers do not need to query the database over and over in order to see - // whether there are events for them to send, but rather they can just check if - // their event counter is greater than zero. The counter for an application - // service is incremented when an event meant for them is inserted into the - // appservice database. - eventCounterMap := make(map[string]int) + // Wrap application services in a type that relates the application service and + // a sync.Cond object that can be used to notify workers when there are new + // events to be sent out. + workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices)) + for _, appservice := range base.Cfg.Derived.ApplicationServices { + m := sync.Mutex{} + ws := types.ApplicationServiceWorkerState{ + AppService: appservice, + Cond: sync.NewCond(&m), + } + workerStates = append(workerStates, ws) + } consumer := consumers.NewOutputRoomEventConsumer( base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, - queryAPI, aliasAPI, eventCounterMap, + queryAPI, aliasAPI, workerStates, ) if err := consumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start app service roomserver consumer") } // Create application service transaction workers - if err := workers.SetupTransactionWorkers(base.Cfg, appserviceDB, eventCounterMap); err != nil { + if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil { logrus.WithError(err).Panicf("failed to start app service transaction workers") } diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index 20bb47eb..802f63d9 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -19,6 +19,7 @@ import ( "encoding/json" "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" @@ -29,11 +30,6 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) -var ( - appServices []config.ApplicationService - ecm map[string]int -) - // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer @@ -42,6 +38,7 @@ type OutputRoomEventConsumer struct { query api.RoomserverQueryAPI alias api.RoomserverAliasAPI serverName string + workerStates []types.ApplicationServiceWorkerState } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call @@ -53,11 +50,8 @@ func NewOutputRoomEventConsumer( appserviceDB *storage.Database, queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, - eventCounterMap map[string]int, + workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { - appServices = cfg.Derived.ApplicationServices - ecm = eventCounterMap - consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Consumer: kafkaConsumer, @@ -70,6 +64,7 @@ func NewOutputRoomEventConsumer( query: queryAPI, alias: aliasAPI, serverName: string(cfg.Matrix.ServerName), + workerStates: workerStates, } consumer.ProcessMessage = s.onMessage @@ -178,14 +173,19 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( events []gomatrixserverlib.Event, ) error { for _, event := range events { - for _, appservice := range appServices { + for _, ws := range s.workerStates { // Check if this event is interesting to this application service - if s.appserviceIsInterestedInEvent(ctx, event, appservice) { + if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { // Queue this event to be sent off to the application service - if err := s.asDB.StoreEvent(ctx, appservice.ID, event); err != nil { + if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { log.WithError(err).Warn("failed to insert incoming event into appservices database") } else { - ecm[appservice.ID]++ + // Tell our worker to send out new messages by setting dirty bit for that + // worker to true, and waking them up with a broadcast + ws.Cond.L.Lock() + ws.EventsReady = true + ws.Cond.Broadcast() + ws.Cond.L.Unlock() } } } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 031342d1..58d21873 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -42,10 +42,10 @@ CREATE TABLE IF NOT EXISTS appservice_events ( -- The JSON representation of the event's content. Text to avoid db JSON parsing event_content TEXT, -- The ID of the transaction that this event is a part of - txn_id INTEGER NOT NULL + txn_id INTEGER ); -CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_event(as_id); +CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + diff --git a/src/github.com/matrix-org/dendrite/appservice/types/types.go b/src/github.com/matrix-org/dendrite/appservice/types/types.go new file mode 100644 index 00000000..a013e8b2 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/types/types.go @@ -0,0 +1,30 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "sync" + + "github.com/matrix-org/dendrite/common/config" +) + +// ApplicationServiceWorkerState is a type that pairs and application service with a +// lockable condition, allowing the roomserver to notify appservice workers when +// there are events ready to send externally to application services. +type ApplicationServiceWorkerState struct { + AppService config.ApplicationService + Cond *sync.Cond + EventsReady bool +} diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index dc736865..ba169a91 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -17,6 +17,7 @@ package workers import ( "bytes" "context" + "database/sql" "encoding/json" "fmt" "math" @@ -24,9 +25,10 @@ import ( "time" "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" ) var ( @@ -47,23 +49,14 @@ var ( // size), then send that off to the AS's /transactions/{txnID} endpoint. It also // handles exponentially backing off in case the AS isn't currently available. func SetupTransactionWorkers( - cfg *config.Dendrite, appserviceDB *storage.Database, - // Each worker has access to an event counter, which keeps track of the amount - // of events they still have to send off. The roomserver consumer - // (consumers/roomserver.go) increments this counter every time a new event for - // a specific application service is inserted into the database, whereas the - // counter is decremented by a certain amount when a worker sends some amount - // of events successfully to an application service. To ensure recovery in the - // event of a crash, this counter is initialized to the amount of events meant - // to be sent by a specific worker in the database, so that state is not lost. - eventCounterMap map[string]int, + workerStates []types.ApplicationServiceWorkerState, ) error { // Create a worker that handles transmitting events to a single homeserver - for _, appservice := range cfg.Derived.ApplicationServices { + for _, workerState := range workerStates { // Don't create a worker if this AS doesn't want to receive events - if appservice.URL != "" { - go worker(appserviceDB, appservice, eventCounterMap) + if workerState.AppService.URL != "" { + go worker(appserviceDB, workerState) } } return nil @@ -71,97 +64,115 @@ func SetupTransactionWorkers( // worker is a goroutine that sends any queued events to the application service // it is given. -func worker(db *storage.Database, as config.ApplicationService, ecm map[string]int) { +func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { + ctx := context.Background() + // Initialize transaction ID counter var err error - currentTransactionID, err = db.GetTxnIDWithAppServiceID(context.TODO(), as.ID) - if err != nil { - logrus.WithError(err).Warnf("appservice worker for %s unable to get latest transaction ID from DB", - as.ID) + currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID) + if err != nil && err != sql.ErrNoRows { + log.WithError(err).Fatalf("appservice %s worker unable to get latest transaction ID from DB", + ws.AppService.ID) + return } - // Create an HTTP client for sending requests to app services + // Grab the HTTP client for sending requests to app services client := &http.Client{ Timeout: transactionTimeout, } - // Initialize counter to amount of events currently in the database - eventCount, err := db.CountEventsWithAppServiceID(context.TODO(), as.ID) - if err != nil { - logrus.WithError(err).Warn("appservice worker unable to count queued events from DB") - } - ecm[as.ID] = eventCount - - // Initialize backoff exponent (2^x secs). Max 9, aka 512s. + // Initialize backoff exponent (2^x secs). Max 6, aka 64s. backoff := 0 + // Initial check for any leftover events to send from last time + eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID) + if err != nil { + log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB", + ws.AppService.ID) + return + } + + // Wait if there are no new events to go out + if eventCount == 0 { + ws.Cond.L.Lock() + if !ws.EventsReady { + // Wait for a broadcast about new events + ws.Cond.Wait() + } + ws.Cond.L.Unlock() + } + // Loop forever and keep waiting for more events to send for { - // Check if there are any events to send - if ecm[as.ID] > 0 { - ctx := context.TODO() + // Set EventsReady to false for some reason (we just sent events?) + ws.Cond.L.Lock() + ws.EventsReady = false + ws.Cond.L.Unlock() - eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, as.ID, transactionBatchSize) - if err != nil { - logrus.WithError(err).Error("appservice worker unable to read queued events from DB") + eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) + if err != nil { + log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB", + ws.AppService.ID) - // Wait a little bit for DB to possibly recover - time.Sleep(transactionBreakTime) - continue - } - - // Batch events up into a transaction - transactionJSON, err := createTransaction(events) - if err != nil { - logrus.WithError(err).Error("appservice worker unable to marshal events") - - // Wait a little bit before trying again - time.Sleep(transactionBreakTime) - continue - } - - // Send the events off to the application service - eventsSent, err := send(client, as, transactionJSON, len(events)) - if err != nil { - // Calculate how long to backoff for - backoffDuration := time.Duration(math.Pow(2, float64(backoff))) - backoffSeconds := time.Second * backoffDuration - logrus.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", - as.ID, backoffDuration) - - // Increment backoff count - backoff++ - if backoff > 9 { - backoff = 9 - } - - // Backoff - time.Sleep(backoffSeconds) - - continue - } - - // We sent successfully, hooray! - backoff = 0 - ecm[as.ID] -= eventsSent - - // Remove sent events from the DB - err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) - if err != nil { - logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s", - as.ID) - } - - // Update transactionID - currentTransactionID++ - if err = db.UpsertTxnIDWithAppServiceID(context.TODO(), as.ID, currentTransactionID); err != nil { - logrus.WithError(err).Errorf("unable to update transaction ID for %s", - as.ID) - } - } else { - // If not, wait a bit and try again + // Wait a little bit for DB to possibly recover time.Sleep(transactionBreakTime) + continue } + + // Batch events up into a transaction + transactionJSON, err := createTransaction(events) + if err != nil { + log.WithError(err).Fatal("appservice %s worker unable to marshal events", + ws.AppService.ID) + + return + } + + // Send the events off to the application service + err = send(client, ws.AppService, transactionJSON, len(events)) + if err != nil { + // Calculate how long to backoff for + backoffDuration := time.Duration(math.Pow(2, float64(backoff))) + backoffSeconds := time.Second * backoffDuration + log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", + ws.AppService.ID, backoffDuration) + + // Increment backoff count + backoff++ + if backoff > 6 { + backoff = 6 + } + + // Backoff + time.Sleep(backoffSeconds) + continue + } + + // We sent successfully, hooray! + backoff = 0 + + // Remove sent events from the DB + err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) + if err != nil { + log.WithError(err).Fatalf("unable to remove appservice events from the database for %s", + ws.AppService.ID) + return + } + + // Update transactionID + currentTransactionID++ + if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil { + log.WithError(err).Fatalf("unable to update transaction ID for %s", + ws.AppService.ID) + return + } + + ws.Cond.L.Lock() + if !ws.EventsReady { + // Wait for a broadcast about new events + ws.Cond.Wait() + } + ws.Cond.L.Unlock() } } @@ -190,22 +201,26 @@ func send( appservice config.ApplicationService, transaction []byte, count int, -) (int, error) { +) error { // POST a transaction to our AS. address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID) resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) if err != nil { - return 0, err + return err } - defer resp.Body.Close() // nolint: errcheck + defer func() { + err := resp.Body.Close() + if err != nil { + log.WithError(err).Errorf("Unable to close response body from application service %s", appservice.ID) + } + }() // Check the AS received the events correctly if resp.StatusCode != http.StatusOK { - return 0, fmt.Errorf( + return fmt.Errorf( "Non-OK status code %d returned from AS", resp.StatusCode, ) } - // Return amount of sent events - return count, nil + return nil }