Change to event-based AS sending

This commit is contained in:
Andrew Morgan 2018-06-11 12:03:17 +01:00
parent 1f67fd9b89
commit 7c31687c36
5 changed files with 172 additions and 120 deletions

View file

@ -15,9 +15,12 @@
package appservice package appservice
import ( import (
"sync"
"github.com/matrix-org/dendrite/appservice/consumers" "github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/routing" "github.com/matrix-org/dendrite/appservice/routing"
"github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
@ -43,25 +46,29 @@ func SetupAppServiceAPIComponent(
logrus.WithError(err).Panicf("failed to connect to appservice db") logrus.WithError(err).Panicf("failed to connect to appservice db")
} }
// Create a map that will keep a counter of events to be sent for each // Wrap application services in a type that relates the application service and
// application service. This serves as an effective cache so that transaction // a sync.Cond object that can be used to notify workers when there are new
// workers do not need to query the database over and over in order to see // events to be sent out.
// whether there are events for them to send, but rather they can just check if workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
// their event counter is greater than zero. The counter for an application for _, appservice := range base.Cfg.Derived.ApplicationServices {
// service is incremented when an event meant for them is inserted into the m := sync.Mutex{}
// appservice database. ws := types.ApplicationServiceWorkerState{
eventCounterMap := make(map[string]int) AppService: appservice,
Cond: sync.NewCond(&m),
}
workerStates = append(workerStates, ws)
}
consumer := consumers.NewOutputRoomEventConsumer( consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
queryAPI, aliasAPI, eventCounterMap, queryAPI, aliasAPI, workerStates,
) )
if err := consumer.Start(); err != nil { if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service roomserver consumer") logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
} }
// Create application service transaction workers // Create application service transaction workers
if err := workers.SetupTransactionWorkers(base.Cfg, appserviceDB, eventCounterMap); err != nil { if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
logrus.WithError(err).Panicf("failed to start app service transaction workers") logrus.WithError(err).Panicf("failed to start app service transaction workers")
} }

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
@ -29,11 +30,6 @@ import (
sarama "gopkg.in/Shopify/sarama.v1" sarama "gopkg.in/Shopify/sarama.v1"
) )
var (
appServices []config.ApplicationService
ecm map[string]int
)
// OutputRoomEventConsumer consumes events that originated in the room server. // OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct { type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer roomServerConsumer *common.ContinualConsumer
@ -42,6 +38,7 @@ type OutputRoomEventConsumer struct {
query api.RoomserverQueryAPI query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI alias api.RoomserverAliasAPI
serverName string serverName string
workerStates []types.ApplicationServiceWorkerState
} }
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
@ -53,11 +50,8 @@ func NewOutputRoomEventConsumer(
appserviceDB *storage.Database, appserviceDB *storage.Database,
queryAPI api.RoomserverQueryAPI, queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI, aliasAPI api.RoomserverAliasAPI,
eventCounterMap map[string]int, workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer { ) *OutputRoomEventConsumer {
appServices = cfg.Derived.ApplicationServices
ecm = eventCounterMap
consumer := common.ContinualConsumer{ consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent), Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer, Consumer: kafkaConsumer,
@ -70,6 +64,7 @@ func NewOutputRoomEventConsumer(
query: queryAPI, query: queryAPI,
alias: aliasAPI, alias: aliasAPI,
serverName: string(cfg.Matrix.ServerName), serverName: string(cfg.Matrix.ServerName),
workerStates: workerStates,
} }
consumer.ProcessMessage = s.onMessage consumer.ProcessMessage = s.onMessage
@ -178,14 +173,19 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
events []gomatrixserverlib.Event, events []gomatrixserverlib.Event,
) error { ) error {
for _, event := range events { for _, event := range events {
for _, appservice := range appServices { for _, ws := range s.workerStates {
// Check if this event is interesting to this application service // Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, appservice) { if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service // Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, appservice.ID, event); err != nil { if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database") log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else { } else {
ecm[appservice.ID]++ // Tell our worker to send out new messages by setting dirty bit for that
// worker to true, and waking them up with a broadcast
ws.Cond.L.Lock()
ws.EventsReady = true
ws.Cond.Broadcast()
ws.Cond.L.Unlock()
} }
} }
} }

View file

@ -42,10 +42,10 @@ CREATE TABLE IF NOT EXISTS appservice_events (
-- The JSON representation of the event's content. Text to avoid db JSON parsing -- The JSON representation of the event's content. Text to avoid db JSON parsing
event_content TEXT, event_content TEXT,
-- The ID of the transaction that this event is a part of -- The ID of the transaction that this event is a part of
txn_id INTEGER NOT NULL txn_id INTEGER
); );
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_event(as_id); CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
` `
const selectEventsByApplicationServiceIDSQL = "" + const selectEventsByApplicationServiceIDSQL = "" +

View file

@ -0,0 +1,30 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"sync"
"github.com/matrix-org/dendrite/common/config"
)
// ApplicationServiceWorkerState is a type that pairs and application service with a
// lockable condition, allowing the roomserver to notify appservice workers when
// there are events ready to send externally to application services.
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
EventsReady bool
}

View file

@ -17,6 +17,7 @@ package workers
import ( import (
"bytes" "bytes"
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
@ -24,9 +25,10 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var ( var (
@ -47,23 +49,14 @@ var (
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also // size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available. // handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers( func SetupTransactionWorkers(
cfg *config.Dendrite,
appserviceDB *storage.Database, appserviceDB *storage.Database,
// Each worker has access to an event counter, which keeps track of the amount workerStates []types.ApplicationServiceWorkerState,
// of events they still have to send off. The roomserver consumer
// (consumers/roomserver.go) increments this counter every time a new event for
// a specific application service is inserted into the database, whereas the
// counter is decremented by a certain amount when a worker sends some amount
// of events successfully to an application service. To ensure recovery in the
// event of a crash, this counter is initialized to the amount of events meant
// to be sent by a specific worker in the database, so that state is not lost.
eventCounterMap map[string]int,
) error { ) error {
// Create a worker that handles transmitting events to a single homeserver // Create a worker that handles transmitting events to a single homeserver
for _, appservice := range cfg.Derived.ApplicationServices { for _, workerState := range workerStates {
// Don't create a worker if this AS doesn't want to receive events // Don't create a worker if this AS doesn't want to receive events
if appservice.URL != "" { if workerState.AppService.URL != "" {
go worker(appserviceDB, appservice, eventCounterMap) go worker(appserviceDB, workerState)
} }
} }
return nil return nil
@ -71,97 +64,115 @@ func SetupTransactionWorkers(
// worker is a goroutine that sends any queued events to the application service // worker is a goroutine that sends any queued events to the application service
// it is given. // it is given.
func worker(db *storage.Database, as config.ApplicationService, ecm map[string]int) { func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ctx := context.Background()
// Initialize transaction ID counter // Initialize transaction ID counter
var err error var err error
currentTransactionID, err = db.GetTxnIDWithAppServiceID(context.TODO(), as.ID) currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID)
if err != nil { if err != nil && err != sql.ErrNoRows {
logrus.WithError(err).Warnf("appservice worker for %s unable to get latest transaction ID from DB", log.WithError(err).Fatalf("appservice %s worker unable to get latest transaction ID from DB",
as.ID) ws.AppService.ID)
return
} }
// Create an HTTP client for sending requests to app services // Grab the HTTP client for sending requests to app services
client := &http.Client{ client := &http.Client{
Timeout: transactionTimeout, Timeout: transactionTimeout,
} }
// Initialize counter to amount of events currently in the database // Initialize backoff exponent (2^x secs). Max 6, aka 64s.
eventCount, err := db.CountEventsWithAppServiceID(context.TODO(), as.ID)
if err != nil {
logrus.WithError(err).Warn("appservice worker unable to count queued events from DB")
}
ecm[as.ID] = eventCount
// Initialize backoff exponent (2^x secs). Max 9, aka 512s.
backoff := 0 backoff := 0
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID)
return
}
// Wait if there are no new events to go out
if eventCount == 0 {
ws.Cond.L.Lock()
if !ws.EventsReady {
// Wait for a broadcast about new events
ws.Cond.Wait()
}
ws.Cond.L.Unlock()
}
// Loop forever and keep waiting for more events to send // Loop forever and keep waiting for more events to send
for { for {
// Check if there are any events to send // Set EventsReady to false for some reason (we just sent events?)
if ecm[as.ID] > 0 { ws.Cond.L.Lock()
ctx := context.TODO() ws.EventsReady = false
ws.Cond.L.Unlock()
eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, as.ID, transactionBatchSize) eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize)
if err != nil { if err != nil {
logrus.WithError(err).Error("appservice worker unable to read queued events from DB") log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID)
// Wait a little bit for DB to possibly recover // Wait a little bit for DB to possibly recover
time.Sleep(transactionBreakTime)
continue
}
// Batch events up into a transaction
transactionJSON, err := createTransaction(events)
if err != nil {
logrus.WithError(err).Error("appservice worker unable to marshal events")
// Wait a little bit before trying again
time.Sleep(transactionBreakTime)
continue
}
// Send the events off to the application service
eventsSent, err := send(client, as, transactionJSON, len(events))
if err != nil {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(backoff)))
backoffSeconds := time.Second * backoffDuration
logrus.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
as.ID, backoffDuration)
// Increment backoff count
backoff++
if backoff > 9 {
backoff = 9
}
// Backoff
time.Sleep(backoffSeconds)
continue
}
// We sent successfully, hooray!
backoff = 0
ecm[as.ID] -= eventsSent
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1])
if err != nil {
logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s",
as.ID)
}
// Update transactionID
currentTransactionID++
if err = db.UpsertTxnIDWithAppServiceID(context.TODO(), as.ID, currentTransactionID); err != nil {
logrus.WithError(err).Errorf("unable to update transaction ID for %s",
as.ID)
}
} else {
// If not, wait a bit and try again
time.Sleep(transactionBreakTime) time.Sleep(transactionBreakTime)
continue
} }
// Batch events up into a transaction
transactionJSON, err := createTransaction(events)
if err != nil {
log.WithError(err).Fatal("appservice %s worker unable to marshal events",
ws.AppService.ID)
return
}
// Send the events off to the application service
err = send(client, ws.AppService, transactionJSON, len(events))
if err != nil {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
ws.AppService.ID, backoffDuration)
// Increment backoff count
backoff++
if backoff > 6 {
backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
continue
}
// We sent successfully, hooray!
backoff = 0
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1])
if err != nil {
log.WithError(err).Fatalf("unable to remove appservice events from the database for %s",
ws.AppService.ID)
return
}
// Update transactionID
currentTransactionID++
if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil {
log.WithError(err).Fatalf("unable to update transaction ID for %s",
ws.AppService.ID)
return
}
ws.Cond.L.Lock()
if !ws.EventsReady {
// Wait for a broadcast about new events
ws.Cond.Wait()
}
ws.Cond.L.Unlock()
} }
} }
@ -190,22 +201,26 @@ func send(
appservice config.ApplicationService, appservice config.ApplicationService,
transaction []byte, transaction []byte,
count int, count int,
) (int, error) { ) error {
// POST a transaction to our AS. // POST a transaction to our AS.
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID) address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil { if err != nil {
return 0, err return err
} }
defer resp.Body.Close() // nolint: errcheck defer func() {
err := resp.Body.Close()
if err != nil {
log.WithError(err).Errorf("Unable to close response body from application service %s", appservice.ID)
}
}()
// Check the AS received the events correctly // Check the AS received the events correctly
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf( return fmt.Errorf(
"Non-OK status code %d returned from AS", resp.StatusCode, "Non-OK status code %d returned from AS", resp.StatusCode,
) )
} }
// Return amount of sent events return nil
return count, nil
} }