From a0b9613b86c0ca033057dc5b6f9ec73e0c83e68e Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 24 May 2018 11:14:47 +0100 Subject: [PATCH] Store And Send Application Service Events * Modify INSTALL.md and dendrite-config.yaml for the new appservice database * Correct all instances of casing on 'application service' to align with spec * Store incoming events that an app service is interested in in the database to be later read by transaction workers. * Retrieve these events from transaction workers, one per AS. * Minimal transaction ID data is stored as well to recover after server failure. * Send events to AS and exponentially backoff on failure. Signed-off-by: Andrew Morgan --- INSTALL.md | 13 +- dendrite-config.yaml | 1 + .../matrix-org/dendrite/appservice/README.md | 6 +- .../dendrite/appservice/appservice.go | 25 ++- .../appservice/consumers/roomserver.go | 31 ++- .../storage/appservice_events_table.go | 198 +++++++++++++++++ .../dendrite/appservice/storage/storage.go | 110 +++++++++ .../storage/txn_id_counter_table.go | 88 ++++++++ .../workers/transaction_scheduler.go | 210 ++++++++++++++++++ .../auth/storage/accounts/accounts_table.go | 2 +- .../clientapi/auth/storage/devices/storage.go | 4 +- .../dendrite/clientapi/routing/register.go | 14 +- .../dendrite/common/config/appservice.go | 10 +- .../dendrite/common/config/config.go | 9 +- 14 files changed, 689 insertions(+), 32 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go create mode 100644 src/github.com/matrix-org/dendrite/appservice/storage/storage.go create mode 100644 src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go create mode 100644 src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go diff --git a/INSTALL.md b/INSTALL.md index a7e2d835..ee3c6f1e 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later. ``` * Create databases: ```bash - for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do + for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do sudo -u postgres createdb -O dendrite dendrite_$i done ``` @@ -253,3 +253,14 @@ you want to support federation. ```bash ./bin/dendrite-federation-sender-server --config dendrite.yaml ``` + +### Run an appservice server + +This sends events from the network to [application +services](https://matrix.org/docs/spec/application_service/unstable.html) +running locally. This is only required if you want to support running +application services on your homeserver. + +```bash +./bin/dendrite-appservice-server --config dendrite.yaml +``` diff --git a/dendrite-config.yaml b/dendrite-config.yaml index ae926bab..44441787 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -97,6 +97,7 @@ database: room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable" server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable" federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable" + appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable" public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable" # If using naffka you need to specify a naffka database # naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable" diff --git a/src/github.com/matrix-org/dendrite/appservice/README.md b/src/github.com/matrix-org/dendrite/appservice/README.md index 5b00386d..d7555744 100644 --- a/src/github.com/matrix-org/dendrite/appservice/README.md +++ b/src/github.com/matrix-org/dendrite/appservice/README.md @@ -2,9 +2,9 @@ This component interfaces with external [Application Services](https://matrix.org/docs/spec/application_service/unstable.html). -This includes any HTTP endpoints that Application Services call, as well as talking -to any HTTP endpoints that Application Services provide themselves. +This includes any HTTP endpoints that application services call, as well as talking +to any HTTP endpoints that application services provide themselves. ## Consumers -This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services. \ No newline at end of file +This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services. \ No newline at end of file diff --git a/src/github.com/matrix-org/dendrite/appservice/appservice.go b/src/github.com/matrix-org/dendrite/appservice/appservice.go index 9caf70fb..3d36a917 100644 --- a/src/github.com/matrix-org/dendrite/appservice/appservice.go +++ b/src/github.com/matrix-org/dendrite/appservice/appservice.go @@ -17,6 +17,8 @@ package appservice import ( "github.com/matrix-org/dendrite/appservice/consumers" "github.com/matrix-org/dendrite/appservice/routing" + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/appservice/workers" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/transactions" @@ -35,13 +37,34 @@ func SetupAppServiceAPIComponent( queryAPI api.RoomserverQueryAPI, transactionsCache *transactions.Cache, ) { + // Create a connection to the appservice postgres DB + appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to appservice db") + } + + // Create a map that will keep a counter of events to be sent for each + // application service. This serves as an effective cache so that transaction + // workers do not need to query the database over and over in order to see + // whether there are events for them to send, but rather they can just check if + // their event counter is greater than zero. The counter for an application + // service is incremented when an event meant for them is inserted into the + // appservice database. + eventCounterMap := make(map[string]int) + consumer := consumers.NewOutputRoomEventConsumer( - base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI, + base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB, + queryAPI, aliasAPI, eventCounterMap, ) if err := consumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start app service roomserver consumer") } + // Create application service transaction workers + if err := workers.SetupTransactionWorkers(base.Cfg, appserviceDB, eventCounterMap); err != nil { + logrus.WithError(err).Panicf("failed to start app service transaction workers") + } + // Set up HTTP Endpoints routing.Setup( base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB, diff --git a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go index 1a0404ac..20bb47eb 100644 --- a/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/appservice/consumers/roomserver.go @@ -17,8 +17,8 @@ package consumers import ( "context" "encoding/json" - "fmt" + "github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/config" @@ -31,26 +31,32 @@ import ( var ( appServices []config.ApplicationService + ecm map[string]int ) // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { roomServerConsumer *common.ContinualConsumer db *accounts.Database + asDB *storage.Database query api.RoomserverQueryAPI alias api.RoomserverAliasAPI serverName string } -// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call +// Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( cfg *config.Dendrite, kafkaConsumer sarama.Consumer, store *accounts.Database, + appserviceDB *storage.Database, queryAPI api.RoomserverQueryAPI, aliasAPI api.RoomserverAliasAPI, + eventCounterMap map[string]int, ) *OutputRoomEventConsumer { appServices = cfg.Derived.ApplicationServices + ecm = eventCounterMap consumer := common.ContinualConsumer{ Topic: string(cfg.Kafka.Topics.OutputRoomEvent), @@ -60,6 +66,7 @@ func NewOutputRoomEventConsumer( s := &OutputRoomEventConsumer{ roomServerConsumer: &consumer, db: store, + asDB: appserviceDB, query: queryAPI, alias: aliasAPI, serverName: string(cfg.Matrix.ServerName), @@ -74,9 +81,10 @@ func (s *OutputRoomEventConsumer) Start() error { return s.roomServerConsumer.Start() } -// onMessage is called when the sync server receives a new event from the room server output log. -// It is not safe for this function to be called from multiple goroutines, or else the -// sync stream position may race and be incorrectly calculated. +// onMessage is called when the sync server receives a new event from the room +// server output log. It is not safe for this function to be called from +// multiple goroutines, or else the sync stream position may race and be +// incorrectly calculated. func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // Parse out the event JSON var output api.OutputEvent @@ -165,13 +173,20 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( // each namespace of each registered application service, and if there is a // match, adds the event to the queue for events to be sent to a particular // application service. -func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error { +func (s *OutputRoomEventConsumer) filterRoomserverEvents( + ctx context.Context, + events []gomatrixserverlib.Event, +) error { for _, event := range events { for _, appservice := range appServices { // Check if this event is interesting to this application service if s.appserviceIsInterestedInEvent(ctx, event, appservice) { - // TODO: Queue this event to be sent off to the application service - fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID()) + // Queue this event to be sent off to the application service + if err := s.asDB.StoreEvent(ctx, appservice.ID, event); err != nil { + log.WithError(err).Warn("failed to insert incoming event into appservices database") + } else { + ecm[appservice.ID]++ + } } } } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go new file mode 100644 index 00000000..98f97433 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -0,0 +1,198 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + "errors" + "strconv" + "time" + + "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +const appserviceEventsSchema = ` +-- Stores events to be sent to application services +CREATE TABLE IF NOT EXISTS appservice_events ( + -- An auto-incrementing id unique to each event in the table + id SERIAL NOT NULL PRIMARY KEY, + -- The ID of the application service the event will be sent to + as_id TEXT, + -- The ID of the event + event_id TEXT, + -- Unix seconds that the event was sent at from the originating server + origin_server_ts BIGINT, + -- The ID of the room that the event was sent in + room_id TEXT, + -- The type of the event (e.g. m.text) + type TEXT, + -- The ID of the user that sent the event + sender TEXT, + -- The JSON representation of the event. Text to avoid db JSON parsing + event_json TEXT, + -- The ID of the transaction that this event is a part of + txn_id INTEGER +); +` + +const selectEventsByApplicationServiceIDSQL = "" + + "SELECT event_id, origin_server_ts, room_id, type, sender, event_json FROM appservice_events " + + "WHERE as_id = $1 ORDER BY as_id LIMIT $2" + +const countEventsByApplicationServiceIDSQL = "" + + "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" + +const insertEventSQL = "" + + "INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_json, txn_id) " + + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + +const deleteEventsByIDSQL = "" + + "DELETE FROM appservice_events WHERE event_id = ANY($1)" + +type eventsStatements struct { + selectEventsByApplicationServiceIDStmt *sql.Stmt + countEventsByApplicationServiceIDStmt *sql.Stmt + insertEventStmt *sql.Stmt + deleteEventsByIDStmt *sql.Stmt +} + +func (s *eventsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(appserviceEventsSchema) + if err != nil { + return + } + + if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil { + return + } + if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { + return + } + if s.deleteEventsByIDStmt, err = db.Prepare(deleteEventsByIDSQL); err != nil { + return + } + + return +} + +// selectEventsByApplicationServiceID takes in an application service ID and +// returns a slice of events that need to be sent to that application service. +func (s *eventsStatements) selectEventsByApplicationServiceID( + ctx context.Context, + applicationServiceID string, + limit int, +) ( + eventIDs []string, + events []gomatrixserverlib.ApplicationServiceEvent, + err error, +) { + eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) + if err != nil { + return nil, nil, err + } + defer eventRows.Close() // nolint: errcheck + + // Iterate through each row and store event contents + for eventRows.Next() { + var eventID, originServerTimestamp, roomID, eventType, sender, eventContent *string + if err = eventRows.Scan( + &eventID, + &originServerTimestamp, + &roomID, + &eventType, + &sender, + &eventContent, + ); err != nil || eventID == nil || roomID == nil || eventType == nil || sender == nil || eventContent == nil { + return nil, nil, err + } + eventIDs = append(eventIDs, *eventID) + + // Get age of the event from original timestamp and current time + timestamp, err := strconv.ParseInt(*originServerTimestamp, 10, 64) + if err != nil { + return nil, nil, err + } + ageMilli := time.Now().UnixNano() / int64(time.Millisecond) + age := ageMilli - timestamp + + // Fit event content into AS event format + event := gomatrixserverlib.ApplicationServiceEvent{ + Age: age, + Content: gomatrixserverlib.RawJSON(*eventContent), + EventID: *eventID, + OriginServerTimestamp: timestamp, + RoomID: *roomID, + Sender: *sender, + Type: *eventType, + UserID: *sender, + } + events = append(events, event) + } + + return +} + +// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) countEventsByApplicationServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + var count *int + err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) + if err != nil { + return 0, err + } + if count == nil { + return 0, errors.New("NULL value for application service count") + } + + return *count, nil +} + +// insertEvent inserts an event mapped to its corresponding application service +// IDs into the db. +func (s *eventsStatements) insertEvent( + ctx context.Context, + appServiceID string, + event gomatrixserverlib.Event, +) (err error) { + _, err = s.insertEventStmt.ExecContext( + ctx, + appServiceID, + event.EventID(), + event.OriginServerTS(), + event.RoomID(), + event.Type(), + event.Sender(), + event.Content(), + nil, + ) + return +} + +// deleteEventsByID removes events matching given IDs from the database. +func (s *eventsStatements) deleteEventsByID( + ctx context.Context, + eventIDs []string, +) (err error) { + _, err = s.deleteEventsByIDStmt.ExecContext(ctx, pq.StringArray(eventIDs)) + return err +} diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go new file mode 100644 index 00000000..0e0237f4 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -0,0 +1,110 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + // Import postgres database driver + _ "github.com/lib/pq" + "github.com/matrix-org/gomatrixserverlib" +) + +// Database stores events intended to be later sent to application services +type Database struct { + events eventsStatements + txnID txnStatements + db *sql.DB +} + +// NewDatabase opens a new database +func NewDatabase(dataSourceName string) (*Database, error) { + var result Database + var err error + if result.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = result.prepare(); err != nil { + return nil, err + } + return &result, nil +} + +func (d *Database) prepare() error { + if err := d.events.prepare(d.db); err != nil { + return err + } + + return d.txnID.prepare(d.db) +} + +// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// for a transaction worker to pull and later send to an application service. +func (d *Database) StoreEvent( + ctx context.Context, + appServiceID string, + event gomatrixserverlib.Event, +) error { + return d.events.insertEvent(ctx, appServiceID, event) +} + +// GetEventsWithAppServiceID returns a slice of events and their IDs intended to +// be sent to an application service given its ID. +func (d *Database) GetEventsWithAppServiceID( + ctx context.Context, + appServiceID string, + limit int, +) ([]string, []gomatrixserverlib.ApplicationServiceEvent, error) { + return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) +} + +// CountEventsWithAppServiceID returns the number of events destined for an +// application service given its ID. +func (d *Database) CountEventsWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.events.countEventsByApplicationServiceID(ctx, appServiceID) +} + +// RemoveEventsByID removes events from the database given a slice of their +// event IDs. +func (d *Database) RemoveEventsByID( + ctx context.Context, + eventIDs []string, +) error { + return d.events.deleteEventsByID(ctx, eventIDs) +} + +// GetTxnIDWithAppServiceID takes in an application service ID and returns the +// last used transaction ID associated with it. +func (d *Database) GetTxnIDWithAppServiceID( + ctx context.Context, + appServiceID string, +) (int, error) { + return d.txnID.selectTxnID(ctx, appServiceID) +} + +// UpsertTxnIDWithAppServiceID takes in an application service ID and a +// transaction ID and stores them in the DB, unless the pair already exists, in +// which case it updates them. +func (d *Database) UpsertTxnIDWithAppServiceID( + ctx context.Context, + appServiceID string, + txnID int, +) error { + return d.txnID.upsertTxnID(ctx, appServiceID, txnID) +} diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go new file mode 100644 index 00000000..72f4c578 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go @@ -0,0 +1,88 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" +) + +const txnIDSchema = ` +-- Keeps a count of the current transaction ID per application service +CREATE TABLE IF NOT EXISTS txn_id_counter ( + -- The ID of the application service the this count belongs to + as_id TEXT NOT NULL PRIMARY KEY, + -- The last-used transaction ID + txn_id INTEGER NOT NULL +); +` + +const selectTxnIDSQL = "" + + "SELECT txn_id FROM txn_id_counter WHERE as_id = $1 LIMIT 1" + +const upsertTxnIDSQL = "" + + "INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " + + "ON CONFLICT (as_id) DO UPDATE " + + "SET txn_id = $2" + +type txnStatements struct { + selectTxnIDStmt *sql.Stmt + upsertTxnIDStmt *sql.Stmt +} + +func (s *txnStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(txnIDSchema) + if err != nil { + return + } + + if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil { + return + } + if s.upsertTxnIDStmt, err = db.Prepare(upsertTxnIDSQL); err != nil { + return + } + + return +} + +// selectTxnID inserts a new transactionID mapped to its corresponding +// application service ID into the db. +func (s *txnStatements) selectTxnID( + ctx context.Context, + appServiceID string, +) (txnID int, err error) { + rows, err := s.selectTxnIDStmt.QueryContext(ctx, appServiceID) + if err != nil { + return + } + defer rows.Close() // nolint: errcheck + + // Scan the TxnID from the database and return + rows.Next() + err = rows.Scan(&txnID) + return +} + +// upsertTxnID inserts or updates on existing rows a new transactionID mapped to +// its corresponding application service ID into the db. +func (s *txnStatements) upsertTxnID( + ctx context.Context, + appServiceID string, + txnID int, +) (err error) { + _, err = s.upsertTxnIDStmt.ExecContext(ctx, appServiceID, txnID) + return +} diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go new file mode 100644 index 00000000..34775ca4 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -0,0 +1,210 @@ +// Copyright 2018 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workers + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math" + "net/http" + "time" + + "github.com/matrix-org/dendrite/appservice/storage" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +var ( + // TODO: Expose these in the config? + // Maximum size of events sent in each transaction. + transactionBatchSize = 50 + // Time to wait between checking for new events to send. + transactionBreakTime = time.Millisecond * 50 + // Timeout for sending a single transaction to an application service. + transactionTimeout = time.Second * 15 + // The current transaction ID. Increments after every successful transaction. + currentTransactionID = 0 +) + +// SetupTransactionWorkers spawns a separate goroutine for each application +// service. Each of these "workers" handle taking all events intended for their +// app service, batch them up into a single transaction (up to a max transaction +// size), then send that off to the AS's /transactions/{txnID} endpoint. It also +// handles exponentially backing off in case the AS isn't currently available. +func SetupTransactionWorkers( + cfg *config.Dendrite, + appserviceDB *storage.Database, + // Each worker has access to an event counter, which keeps track of the amount + // of events they still have to send off. The roomserver consumer + // (consumers/roomserver.go) increments this counter every time a new event for + // a specific application service is inserted into the database, whereas the + // counter is decremented by a certain amount when a worker sends some amount + // of events successfully to an application service. To ensure recovery in the + // event of a crash, this counter is initialized to the amount of events meant + // to be sent by a specific worker in the database, so that state is not lost. + eventCounterMap map[string]int, +) error { + // Create a worker that handles transmitting events to a single homeserver + for _, appservice := range cfg.Derived.ApplicationServices { + // Don't create a worker if this AS doesn't want to receive events + if appservice.URL != "" { + go worker(appserviceDB, appservice, eventCounterMap) + } + } + return nil +} + +// worker is a goroutine that +func worker(db *storage.Database, as config.ApplicationService, ecm map[string]int) { + // Initialize transaction ID counter + var err error + currentTransactionID, err = db.GetTxnIDWithAppServiceID(context.TODO(), as.ID) + if err != nil { + logrus.WithError(err).Warnf("appservice worker for %s unable to get latest transaction ID from DB", + as.ID) + } + + // Create an HTTP client for sending requests to app services + client := &http.Client{ + Timeout: transactionTimeout, + } + + // Initialize counter to amount of events currently in the database + eventCount, err := db.CountEventsWithAppServiceID(context.TODO(), as.ID) + if err != nil { + logrus.WithError(err).Warn("appservice worker unable to count queued events from DB") + } + ecm[as.ID] = eventCount + + // Initialize backoff exponent (2^x secs). Max 9, aka 512s. + backoff := 0 + + // Loop forever and keep waiting for more events to send + for { + // Check if there are any events to send + if ecm[as.ID] > 0 { + ctx := context.TODO() + + eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, as.ID, transactionBatchSize) + if err != nil { + logrus.WithError(err).Error("appservice worker unable to read queued events from DB") + + // Wait a little bit for DB to possibly recover + time.Sleep(transactionBreakTime) + continue + } + + // Batch events up into a transaction + transactionJSON, err := createTransaction(events) + if err != nil { + logrus.WithError(err).Error("appservice worker unable to marshal events") + + // Wait a little bit before trying again + time.Sleep(transactionBreakTime) + continue + } + + // Send the events off to the application service + eventsSent, err := send(client, as, transactionJSON, len(events)) + if err != nil { + // Calculate how long to backoff for + backoffDuration := time.Duration(math.Pow(2, float64(backoff))) + backoffSeconds := time.Second * backoffDuration + logrus.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds", + as.ID, backoffDuration) + + // Increment backoff count + backoff++ + if backoff > 9 { + backoff = 9 + } + + // Backoff + time.Sleep(backoffSeconds) + + continue + } + + // We sent successfully, hooray! + backoff = 0 + ecm[as.ID] -= eventsSent + + // Remove sent events from the DB + err = db.RemoveEventsByID(ctx, eventIDs) + if err != nil { + logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s", + as.ID) + } + + // Update transactionID + currentTransactionID++ + if err = db.UpsertTxnIDWithAppServiceID(context.TODO(), as.ID, currentTransactionID); err != nil { + logrus.WithError(err).Errorf("unable to update transaction ID for %s", + as.ID) + } + } else { + // If not, wait a bit and try again + time.Sleep(transactionBreakTime) + } + } +} + +// createTransaction takes in a slice of AS events, stores them in an AS +// transaction, and JSON-encodes the results. +func createTransaction( + events []gomatrixserverlib.ApplicationServiceEvent, +) ([]byte, error) { + // Create a transactions and store the events inside + transaction := gomatrixserverlib.ApplicationServiceTransaction{ + Events: events, + } + + transactionJSON, err := json.Marshal(transaction) + if err != nil { + return nil, err + } + + return transactionJSON, nil +} + +// send sends events to an application service. Returns an error if an OK was not +// received back from the application service or the request timed out. +func send( + client *http.Client, + appservice config.ApplicationService, + transaction []byte, + count int, +) (int, error) { + // POST a transaction to our AS. + address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID) + resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) + if err != nil { + return 0, err + } + defer resp.Body.Close() // nolint: errcheck + + // Check the AS received the events correctly + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf( + "Non-OK status code %d returned from AS", resp.StatusCode, + ) + } + + // Return amount of sent events + return count, nil +} diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go index 4ed54f95..e86654ec 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/accounts/accounts_table.go @@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts ( created_ts BIGINT NOT NULL, -- The password hash for this account. Can be NULL if this is a passwordless account. password_hash TEXT, - -- Identifies which Application Service this account belongs to, if any. + -- Identifies which application service this account belongs to, if any. appservice_id TEXT -- TODO: -- is_guest, is_admin, upgraded_ts, devices, any email reset stuff? diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go index 7683a427..7032fe7b 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go @@ -138,9 +138,9 @@ func (d *Database) UpdateDevice( } // RemoveDevice revokes a device by deleting the entry in the database -// matching with the given device ID and user ID localpart +// matching with the given device ID and user ID localpart. // If the device doesn't exist, it will not return an error -// If something went wrong during the deletion, it will return the SQL error +// If something went wrong during the deletion, it will return the SQL error. func (d *Database) RemoveDevice( ctx context.Context, deviceID, localpart string, ) error { diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go index cb427b71..4a9c9d3e 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go @@ -115,7 +115,7 @@ type registerRequest struct { InitialDisplayName *string `json:"initial_device_display_name"` - // Application Services place Type in the root of their registration + // Application services place Type in the root of their registration // request, whereas clients place it in the authDict struct. Type authtypes.LoginType `json:"type"` } @@ -281,16 +281,16 @@ func validateRecaptcha( } // UsernameIsWithinApplicationServiceNamespace checks to see if a username falls -// within any of the namespaces of a given Application Service. If no -// Application Service is given, it will check to see if it matches any -// Application Service's namespace. +// within any of the namespaces of a given application service. If no +// application service is given, it will check to see if it matches any +// application service's namespace. func UsernameIsWithinApplicationServiceNamespace( cfg *config.Dendrite, username string, appservice *config.ApplicationService, ) bool { if appservice != nil { - // Loop through given Application Service's namespaces and see if any match + // Loop through given application service's namespaces and see if any match for _, namespace := range appservice.NamespaceMap["users"] { // AS namespaces are checked for validity in config if namespace.RegexpObject.MatchString(username) { @@ -300,7 +300,7 @@ func UsernameIsWithinApplicationServiceNamespace( return false } - // Loop through all known Application Service's namespaces and see if any match + // Loop through all known application service's namespaces and see if any match for _, knownAppservice := range cfg.Derived.ApplicationServices { for _, namespace := range knownAppservice.NamespaceMap["users"] { // AS namespaces are checked for validity in config @@ -509,7 +509,7 @@ func handleRegistrationFlow( sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret) case authtypes.LoginTypeApplicationService: - // Check Application Service register user request is valid. + // Check application service register user request is valid. // The application service's ID is returned if so. appserviceID, err := validateApplicationService(cfg, req, r.Username) diff --git a/src/github.com/matrix-org/dendrite/common/config/appservice.go b/src/github.com/matrix-org/dendrite/common/config/appservice.go index 7eb12293..0d266ae8 100644 --- a/src/github.com/matrix-org/dendrite/common/config/appservice.go +++ b/src/github.com/matrix-org/dendrite/common/config/appservice.go @@ -108,8 +108,6 @@ func setupRegexps(cfg *Dendrite) (err error) { } } - fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings) - // Join the regexes together into one big regex. // i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)" // Later we can check if a username or alias matches any exclusive regex and @@ -167,13 +165,13 @@ func checkErrors(config *Dendrite) (err error) { // can have the same ID or token. if idMap[appservice.ID] { return configErrors([]string{fmt.Sprintf( - "Application Service ID %s must be unique", appservice.ID, + "Application service ID %s must be unique", appservice.ID, )}) } // Check if we've already seen this token if tokenMap[appservice.ASToken] { return configErrors([]string{fmt.Sprintf( - "Application Service Token %s must be unique", appservice.ASToken, + "Application service Token %s must be unique", appservice.ASToken, )}) } @@ -189,7 +187,7 @@ func checkErrors(config *Dendrite) (err error) { // namespace, which often ends up in an application service receiving events // it doesn't want, as an empty regex will match all events. return configErrors([]string{fmt.Sprintf( - "Application Service namespace can only contain a single regex tuple. Check your YAML.", + "Application service namespace can only contain a single regex tuple. Check your YAML.", )}) } } @@ -201,7 +199,7 @@ func checkErrors(config *Dendrite) (err error) { for _, namespace := range namespaceSlice { if !IsValidRegex(namespace.Regex) { return configErrors([]string{fmt.Sprintf( - "Invalid regex string for Application Service %s", appservice.ID, + "Invalid regex string for application service %s", appservice.ID, )}) } } diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 8bbac80c..bd6e361d 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -162,6 +162,9 @@ type Dendrite struct { // The FederationSender database stores information used by the FederationSender // It is only accessed by the FederationSender. FederationSender DataSource `yaml:"federation_sender"` + // The AppServices database stores information used by the AppService component. + // It is only accessed by the AppService component. + AppService DataSource `yaml:"appservice"` // The PublicRoomsAPI database stores information used to compute the public // room directory. It is only accessed by the PublicRoomsAPI server. PublicRoomsAPI DataSource `yaml:"public_rooms_api"` @@ -231,15 +234,15 @@ type Dendrite struct { Params map[string]interface{} `json:"params"` } - // Application Services parsed from their config files + // Application services parsed from their config files // The paths of which were given above in the main config file ApplicationServices []ApplicationService - // Meta-regexes compiled from all exclusive Application Service + // Meta-regexes compiled from all exclusive application service // Regexes. // // When a user registers, we check that their username does not match any - // exclusive Application Service namespaces + // exclusive application service namespaces ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp // When a user creates a room alias, we check that it isn't already // reserved by an application service