Merge branch 'anoa/as_forward_events' into anoa/testing

This commit is contained in:
Andrew Morgan 2018-06-12 16:11:29 +01:00
commit 93b1d1f1f9
15 changed files with 810 additions and 41 deletions

View file

@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later.
```
* Create databases:
```bash
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
@ -253,3 +253,14 @@ you want to support federation.
```bash
./bin/dendrite-federation-sender-server --config dendrite.yaml
```
### Run an appservice server
This sends events from the network to [application
services](https://matrix.org/docs/spec/application_service/unstable.html)
running locally. This is only required if you want to support running
application services on your homeserver.
```bash
./bin/dendrite-appservice-server --config dendrite.yaml
```

View file

@ -97,6 +97,7 @@ database:
room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable"
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
# If using naffka you need to specify a naffka database
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"

View file

@ -2,9 +2,9 @@
This component interfaces with external [Application
Services](https://matrix.org/docs/spec/application_service/unstable.html).
This includes any HTTP endpoints that Application Services call, as well as talking
to any HTTP endpoints that Application Services provide themselves.
This includes any HTTP endpoints that application services call, as well as talking
to any HTTP endpoints that application services provide themselves.
## Consumers
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services.
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services.

View file

@ -15,8 +15,13 @@
package appservice
import (
"sync"
"github.com/matrix-org/dendrite/appservice/consumers"
"github.com/matrix-org/dendrite/appservice/routing"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/appservice/workers"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/transactions"
@ -35,13 +40,41 @@ func SetupAppServiceAPIComponent(
queryAPI api.RoomserverQueryAPI,
transactionsCache *transactions.Cache,
) {
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to appservice db")
}
// Wrap application services in a type that relates the application service and
// a sync.Cond object that can be used to notify workers when there are new
// events to be sent out.
workerStates := make([]types.ApplicationServiceWorkerState, len(base.Cfg.Derived.ApplicationServices))
for _, appservice := range base.Cfg.Derived.ApplicationServices {
eventCount := 0
m := sync.Mutex{}
ws := types.ApplicationServiceWorkerState{
AppService: appservice,
Cond: sync.NewCond(&m),
EventsReady: &eventCount,
}
workerStates = append(workerStates, ws)
}
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI,
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
queryAPI, aliasAPI, workerStates,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
}
// Create application service transaction workers
if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
logrus.WithError(err).Panicf("failed to start app service transaction workers")
}
// Set up HTTP Endpoints
routing.Setup(
base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB,

View file

@ -17,8 +17,9 @@ package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
@ -29,29 +30,28 @@ import (
sarama "gopkg.in/Shopify/sarama.v1"
)
var (
appServices []config.ApplicationService
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *accounts.Database
asDB *storage.Database
query api.RoomserverQueryAPI
alias api.RoomserverAliasAPI
serverName string
workerStates []types.ApplicationServiceWorkerState
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store *accounts.Database,
appserviceDB *storage.Database,
queryAPI api.RoomserverQueryAPI,
aliasAPI api.RoomserverAliasAPI,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
appServices = cfg.Derived.ApplicationServices
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
@ -60,9 +60,11 @@ func NewOutputRoomEventConsumer(
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
asDB: appserviceDB,
query: queryAPI,
alias: aliasAPI,
serverName: string(cfg.Matrix.ServerName),
workerStates: workerStates,
}
consumer.ProcessMessage = s.onMessage
@ -74,9 +76,10 @@ func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
// onMessage is called when the sync server receives a new event from the room
// server output log. It is not safe for this function to be called from
// multiple goroutines, or else the sync stream position may race and be
// incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
@ -165,13 +168,25 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error {
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
events []gomatrixserverlib.Event,
) error {
for _, event := range events {
for _, appservice := range appServices {
for _, ws := range s.workerStates {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, appservice) {
// TODO: Queue this event to be sent off to the application service
fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID())
if s.appserviceIsInterestedInEvent(ctx, &event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, &event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.Cond.L.Lock()
*ws.EventsReady++
ws.Cond.Broadcast()
ws.Cond.L.Unlock()
}
}
}
}
@ -181,7 +196,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, ev
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool {
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.Event, appservice config.ApplicationService) bool {
// Check sender of the event
for _, userNamespace := range appservice.NamespaceMap["users"] {
if userNamespace.RegexpObject.MatchString(event.Sender()) {

View file

@ -0,0 +1,247 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
const appserviceEventsSchema = `
-- Stores events to be sent to application services
CREATE TABLE IF NOT EXISTS appservice_events (
-- An auto-incrementing id unique to each event in the table
id BIGSERIAL NOT NULL PRIMARY KEY,
-- The ID of the application service the event will be sent to
as_id TEXT NOT NULL,
-- The ID of the event
event_id TEXT NOT NULL,
-- Unix seconds that the event was sent at from the originating server
origin_server_ts BIGINT NOT NULL,
-- The ID of the room that the event was sent in
room_id TEXT NOT NULL,
-- The type of the event (e.g. m.text)
type TEXT NOT NULL,
-- The ID of the user that sent the event
sender TEXT NOT NULL,
-- The JSON representation of the event's content. Text to avoid db JSON parsing
event_content TEXT,
-- The ID of the transaction that this event is a part of
txn_id BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
`
const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC LIMIT $2"
const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
const updateTxnIDForEventsSQL = "" +
"UPDATE appservice_events SET txn_id = $1 WHERE as_id = $2 AND id <= $3"
const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
updateTxnIDForEventsStmt *sql.Stmt
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
}
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(appserviceEventsSchema)
if err != nil {
return
}
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
return
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return
}
if s.updateTxnIDForEventsStmt, err = db.Prepare(updateTxnIDForEventsSQL); err != nil {
return
}
if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
return
}
return
}
// selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database
// once successfully sent to an application service.
func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context,
applicationServiceID string,
limit int,
) (
txnID, maxID int,
events []gomatrixserverlib.ApplicationServiceEvent,
err error,
) {
// Retrieve events from the database. Unsuccessfully sent events first
eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
if err != nil {
return 0, 0, nil, err
}
defer func() {
err = eventRowsCurr.Close()
if err != nil {
log.WithError(err).Fatalf("Appservice %s unable to select new events to send",
applicationServiceID)
}
}()
events, maxID, txnID, err = retrieveEvents(eventRowsCurr)
if err != nil {
return 0, 0, nil, err
}
return
}
func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
// Iterate through each row and store event contents
// If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first.
lastTxnID := -2 // Invalid transaction ID
for eventRows.Next() {
var event gomatrixserverlib.ApplicationServiceEvent
var eventContent sql.NullString
var id int
err = eventRows.Scan(
&id,
&event.EventID,
&event.OriginServerTimestamp,
&event.RoomID,
&event.Type,
&event.UserID,
&eventContent,
&txnID,
)
if err != nil {
fmt.Println("Failed:", err.Error())
return nil, 0, 0, err
}
// If txnID has changed on this event from the previous event, then we've
// reached the end of a transaction's events. Return only those events.
if lastTxnID > -2 && lastTxnID != txnID {
return
}
lastTxnID = txnID
if eventContent.Valid {
event.Content = gomatrixserverlib.RawJSON(eventContent.String)
}
if id > maxID {
maxID = id
}
// Get age of the event from original timestamp and current time
// TODO: Consider removing age as not many app services use it
event.Age = nowMilli - event.OriginServerTimestamp
// TODO: Synapse does this. It's unnecessary to send Sender and UserID as the
// same value. Do app services really require this? :)
event.Sender = event.UserID
events = append(events, event)
}
return
}
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) countEventsByApplicationServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil && err != sql.ErrNoRows {
return 0, err
}
return count, nil
}
// insertEvent inserts an event mapped to its corresponding application service
// IDs into the db.
func (s *eventsStatements) insertEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) (err error) {
_, err = s.insertEventStmt.ExecContext(
ctx,
appServiceID,
event.EventID(),
event.OriginServerTS(),
event.RoomID(),
event.Type(),
event.Sender(),
event.Content(),
-1, // No transaction ID yet
)
return
}
// updateTxnIDForEvents sets the transactionID for a collection of events. Done
// before sending them to an AppService. Referenced before sending to make sure
// we aren't constructing multiple transactions with the same events.
func (s *eventsStatements) updateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) (err error) {
_, err = s.updateTxnIDForEventsStmt.ExecContext(ctx, txnID, appserviceID, maxID)
return
}
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) (err error) {
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, appserviceID, eventTableID)
return
}

View file

@ -0,0 +1,123 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
// Import postgres database driver
_ "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
)
// Database stores events intended to be later sent to application services
type Database struct {
events eventsStatements
txnID txnStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
if err := d.events.prepare(d.db); err != nil {
return err
}
return d.txnID.prepare(d.db)
}
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
// for a transaction worker to pull and later send to an application service.
func (d *Database) StoreEvent(
ctx context.Context,
appServiceID string,
event *gomatrixserverlib.Event,
) error {
return d.events.insertEvent(ctx, appServiceID, event)
}
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
// be sent to an application service given its ID.
func (d *Database) GetEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
limit int,
) (int, int, []gomatrixserverlib.ApplicationServiceEvent, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
}
// CountEventsWithAppServiceID returns the number of events destined for an
// application service given its ID.
func (d *Database) CountEventsWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
}
// UpdateTxnIDForEvents takes in an application service ID and a
// and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpdateTxnIDForEvents(
ctx context.Context,
appserviceID string,
maxID, txnID int,
) error {
return d.events.updateTxnIDForEvents(ctx, appserviceID, maxID, txnID)
}
// RemoveEventsBeforeAndIncludingID removes all events from the database that
// are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context,
appserviceID string,
eventTableID int,
) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, appserviceID, eventTableID)
}
// GetTxnIDWithAppServiceID takes in an application service ID and returns the
// last used transaction ID associated with it.
func (d *Database) GetTxnIDWithAppServiceID(
ctx context.Context,
appServiceID string,
) (int, error) {
return d.txnID.selectTxnID(ctx, appServiceID)
}
// UpsertTxnIDWithAppServiceID takes in an application service ID and a
// transaction ID and stores them in the DB, unless the pair already exists, in
// which case it updates them.
func (d *Database) UpsertTxnIDWithAppServiceID(
ctx context.Context,
appServiceID string,
txnID int,
) error {
return d.txnID.upsertTxnID(ctx, appServiceID, txnID)
}

View file

@ -0,0 +1,80 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
)
const txnIDSchema = `
-- Keeps a count of the current transaction ID per application service
CREATE TABLE IF NOT EXISTS txn_id_counter (
-- The ID of the application service the this count belongs to
as_id TEXT NOT NULL PRIMARY KEY,
-- The last-used transaction ID
txn_id INTEGER NOT NULL
);
`
const selectTxnIDSQL = "" +
"SELECT txn_id FROM txn_id_counter WHERE as_id = $1"
const upsertTxnIDSQL = "" +
"INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " +
"ON CONFLICT (as_id) DO UPDATE " +
"SET txn_id = $2"
type txnStatements struct {
selectTxnIDStmt *sql.Stmt
upsertTxnIDStmt *sql.Stmt
}
func (s *txnStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(txnIDSchema)
if err != nil {
return
}
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
return
}
if s.upsertTxnIDStmt, err = db.Prepare(upsertTxnIDSQL); err != nil {
return
}
return
}
// selectTxnID inserts a new transactionID mapped to its corresponding
// application service ID into the db.
func (s *txnStatements) selectTxnID(
ctx context.Context,
appServiceID string,
) (txnID int, err error) {
err = s.selectTxnIDStmt.QueryRowContext(ctx, appServiceID).Scan(&txnID)
return
}
// upsertTxnID inserts or updates on existing rows a new transactionID mapped to
// its corresponding application service ID into the db.
func (s *txnStatements) upsertTxnID(
ctx context.Context,
appServiceID string,
txnID int,
) (err error) {
_, err = s.upsertTxnIDStmt.ExecContext(ctx, appServiceID, txnID)
return
}

View file

@ -12,6 +12,25 @@
package types
import (
"sync"
"github.com/matrix-org/dendrite/common/config"
)
// ApplicationServiceWorkerState is a type that couples an application service,
// a lockable condition as well as some other state variables, allowing the
// roomserver to notify appservice workers when there are events ready to send
// externally to application services.
type ApplicationServiceWorkerState struct {
AppService config.ApplicationService
Cond *sync.Cond
// Events ready to be sent
EventsReady *int
// Backoff exponent (2^x secs). Max 6, aka 64s.
Backoff int
}
const (
// AppServiceDeviceID is the AS dummy device ID
AppServiceDeviceID = "AS_Device"

View file

@ -0,0 +1,239 @@
// Copyright 2018 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package workers
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"math"
"net/http"
"time"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
var (
// Maximum size of events sent in each transaction.
// Warning, if this is lowered and a number of events greater than the previous
// batch size were still to be sent, then a number of events equal to the
// difference will be ignored by the app service.
// TL;DR: Don't lower this number with any AS events still left in the database.
transactionBatchSize = 50
// Timeout for sending a single transaction to an application service.
transactionTimeout = time.Second * 60
// The current transaction ID. Increments after every successful transaction.
currentTransactionID = 0
)
// SetupTransactionWorkers spawns a separate goroutine for each application
// service. Each of these "workers" handle taking all events intended for their
// app service, batch them up into a single transaction (up to a max transaction
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
// handles exponentially backing off in case the AS isn't currently available.
func SetupTransactionWorkers(
appserviceDB *storage.Database,
workerStates []types.ApplicationServiceWorkerState,
) error {
// Create a worker that handles transmitting events to a single homeserver
for _, workerState := range workerStates {
// Don't create a worker if this AS doesn't want to receive events
if workerState.AppService.URL != "" {
go worker(appserviceDB, workerState)
}
}
return nil
}
// worker is a goroutine that sends any queued events to the application service
// it is given.
func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
log.Infof("Starting Application Service %s", ws.AppService.ID)
ctx := context.Background()
// Initialize transaction ID counter
var err error
currentTransactionID, err = db.GetTxnIDWithAppServiceID(ctx, ws.AppService.ID)
if err != nil && err != sql.ErrNoRows {
log.WithError(err).Fatalf("appservice %s worker unable to get latest transaction ID from DB",
ws.AppService.ID)
return
}
// Grab the HTTP client for sending requests to app services
client := &http.Client{
Timeout: transactionTimeout,
}
// Initial check for any leftover events to send from last time
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID)
return
}
ws.Cond.L.Lock()
*ws.EventsReady = eventCount
ws.Cond.L.Unlock()
// Loop forever and keep waiting for more events to send
for {
// Wait for more events if we've sent all the events in the database
if *ws.EventsReady <= 0 {
ws.Cond.L.Lock()
ws.Cond.Wait()
ws.Cond.L.Unlock()
}
// Batch events up into a transaction
eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to create transaction",
ws.AppService.ID)
return
}
// Send the events off to the application service
// Backoff if the application service does not respond
err = send(client, ws.AppService, txnID, transactionJSON)
if err != nil {
// Backoff
backoff(&ws, err)
continue
}
// We sent successfully, hooray!
ws.Backoff = 0
ws.Cond.L.Lock()
*ws.EventsReady -= eventsCount
ws.Cond.L.Unlock()
// Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, ws.AppService.ID, maxEventID)
if err != nil {
log.WithError(err).Fatalf("unable to remove appservice events from the database for %s",
ws.AppService.ID)
return
}
// Update transactionID
currentTransactionID++
if err = db.UpsertTxnIDWithAppServiceID(ctx, ws.AppService.ID, currentTransactionID); err != nil {
log.WithError(err).Fatalf("unable to update transaction ID for %s",
ws.AppService.ID)
return
}
}
}
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func backoff(ws *types.ApplicationServiceWorkerState, err error) {
// Calculate how long to backoff for
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
backoffSeconds := time.Second * backoffDuration
log.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
ws.AppService.ID, backoffDuration)
ws.Backoff++
if ws.Backoff > 6 {
ws.Backoff = 6
}
// Backoff
time.Sleep(backoffSeconds)
}
// createTransaction takes in a slice of AS events, stores them in an AS
// transaction, and JSON-encodes the results.
func createTransaction(
ctx context.Context,
db *storage.Database,
appserviceID string,
) (
eventsCount, txnID, maxID int,
transactionJSON []byte,
err error,
) {
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to read queued events from DB",
appserviceID)
return
}
// Check if these events already have a transaction ID
if txnID == -1 {
txnID = currentTransactionID
// Mark new events with current transactionID
err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, currentTransactionID)
if err != nil {
return 0, 0, 0, nil, err
}
}
// Create a transaction and store the events inside
transaction := gomatrixserverlib.ApplicationServiceTransaction{
Events: events,
}
transactionJSON, err = json.Marshal(transaction)
if err != nil {
return
}
eventsCount = len(events)
return
}
// send sends events to an application service. Returns an error if an OK was not
// received back from the application service or the request timed out.
func send(
client *http.Client,
appservice config.ApplicationService,
txnID int,
transaction []byte,
) error {
// POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
log.WithError(err).Errorf("Unable to close response body from application service %s", appservice.ID)
}
}()
// Check the AS received the events correctly
if resp.StatusCode != http.StatusOK {
// TODO: Handle non-200 error codes from application services
return fmt.Errorf("Non-OK status code %d returned from AS", resp.StatusCode)
}
return nil
}

View file

@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts (
created_ts BIGINT NOT NULL,
-- The password hash for this account. Can be NULL if this is a passwordless account.
password_hash TEXT,
-- Identifies which Application Service this account belongs to, if any.
-- Identifies which application service this account belongs to, if any.
appservice_id TEXT
-- TODO:
-- is_guest, is_admin, upgraded_ts, devices, any email reset stuff?

View file

@ -138,9 +138,9 @@ func (d *Database) UpdateDevice(
}
// RemoveDevice revokes a device by deleting the entry in the database
// matching with the given device ID and user ID localpart
// matching with the given device ID and user ID localpart.
// If the device doesn't exist, it will not return an error
// If something went wrong during the deletion, it will return the SQL error
// If something went wrong during the deletion, it will return the SQL error.
func (d *Database) RemoveDevice(
ctx context.Context, deviceID, localpart string,
) error {

View file

@ -115,7 +115,7 @@ type registerRequest struct {
InitialDisplayName *string `json:"initial_device_display_name"`
// Application Services place Type in the root of their registration
// Application services place Type in the root of their registration
// request, whereas clients place it in the authDict struct.
Type authtypes.LoginType `json:"type"`
}
@ -281,16 +281,16 @@ func validateRecaptcha(
}
// UsernameIsWithinApplicationServiceNamespace checks to see if a username falls
// within any of the namespaces of a given Application Service. If no
// Application Service is given, it will check to see if it matches any
// Application Service's namespace.
// within any of the namespaces of a given application service. If no
// application service is given, it will check to see if it matches any
// application service's namespace.
func UsernameIsWithinApplicationServiceNamespace(
cfg *config.Dendrite,
username string,
appservice *config.ApplicationService,
) bool {
if appservice != nil {
// Loop through given Application Service's namespaces and see if any match
// Loop through given application service's namespaces and see if any match
for _, namespace := range appservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config
if namespace.RegexpObject.MatchString(username) {
@ -300,7 +300,7 @@ func UsernameIsWithinApplicationServiceNamespace(
return false
}
// Loop through all known Application Service's namespaces and see if any match
// Loop through all known application service's namespaces and see if any match
for _, knownAppservice := range cfg.Derived.ApplicationServices {
for _, namespace := range knownAppservice.NamespaceMap["users"] {
// AS namespaces are checked for validity in config
@ -509,7 +509,7 @@ func handleRegistrationFlow(
sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret)
case authtypes.LoginTypeApplicationService:
// Check Application Service register user request is valid.
// Check application service register user request is valid.
// The application service's ID is returned if so.
appserviceID, err := validateApplicationService(cfg, req, r.Username)

View file

@ -108,8 +108,6 @@ func setupRegexps(cfg *Dendrite) (err error) {
}
}
fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings)
// Join the regexes together into one big regex.
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
// Later we can check if a username or alias matches any exclusive regex and
@ -167,13 +165,13 @@ func checkErrors(config *Dendrite) (err error) {
// can have the same ID or token.
if idMap[appservice.ID] {
return configErrors([]string{fmt.Sprintf(
"Application Service ID %s must be unique", appservice.ID,
"Application service ID %s must be unique", appservice.ID,
)})
}
// Check if we've already seen this token
if tokenMap[appservice.ASToken] {
return configErrors([]string{fmt.Sprintf(
"Application Service Token %s must be unique", appservice.ASToken,
"Application service Token %s must be unique", appservice.ASToken,
)})
}
@ -189,7 +187,7 @@ func checkErrors(config *Dendrite) (err error) {
// namespace, which often ends up in an application service receiving events
// it doesn't want, as an empty regex will match all events.
return configErrors([]string{fmt.Sprintf(
"Application Service namespace can only contain a single regex tuple. Check your YAML.",
"Application service namespace can only contain a single regex tuple. Check your YAML.",
)})
}
}
@ -201,7 +199,7 @@ func checkErrors(config *Dendrite) (err error) {
for _, namespace := range namespaceSlice {
if !IsValidRegex(namespace.Regex) {
return configErrors([]string{fmt.Sprintf(
"Invalid regex string for Application Service %s", appservice.ID,
"Invalid regex string for application service %s", appservice.ID,
)})
}
}

View file

@ -162,6 +162,9 @@ type Dendrite struct {
// The FederationSender database stores information used by the FederationSender
// It is only accessed by the FederationSender.
FederationSender DataSource `yaml:"federation_sender"`
// The AppServices database stores information used by the AppService component.
// It is only accessed by the AppService component.
AppService DataSource `yaml:"appservice"`
// The PublicRoomsAPI database stores information used to compute the public
// room directory. It is only accessed by the PublicRoomsAPI server.
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
@ -231,15 +234,15 @@ type Dendrite struct {
Params map[string]interface{} `json:"params"`
}
// Application Services parsed from their config files
// Application services parsed from their config files
// The paths of which were given above in the main config file
ApplicationServices []ApplicationService
// Meta-regexes compiled from all exclusive Application Service
// Meta-regexes compiled from all exclusive application service
// Regexes.
//
// When a user registers, we check that their username does not match any
// exclusive Application Service namespaces
// exclusive application service namespaces
ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
// When a user creates a room alias, we check that it isn't already
// reserved by an application service