mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-26 15:08:28 +00:00
Store And Send Application Service Events
* Modify INSTALL.md and dendrite-config.yaml for the new appservice database * Correct all instances of casing on 'application service' to align with spec * Store incoming events that an app service is interested in in the database to be later read by transaction workers. * Retrieve these events from transaction workers, one per AS. * Minimal transaction ID data is stored as well to recover after server failure. * Send events to AS and exponentially backoff on failure. Signed-off-by: Andrew Morgan <andrewm@matrix.org>
This commit is contained in:
parent
5f576148ac
commit
a0b9613b86
14 changed files with 689 additions and 32 deletions
13
INSTALL.md
13
INSTALL.md
|
@ -72,7 +72,7 @@ Dendrite requires a postgres database engine, version 9.5 or later.
|
|||
```
|
||||
* Create databases:
|
||||
```bash
|
||||
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi naffka; do
|
||||
for i in account device mediaapi syncapi roomserver serverkey federationsender publicroomsapi appservice naffka; do
|
||||
sudo -u postgres createdb -O dendrite dendrite_$i
|
||||
done
|
||||
```
|
||||
|
@ -253,3 +253,14 @@ you want to support federation.
|
|||
```bash
|
||||
./bin/dendrite-federation-sender-server --config dendrite.yaml
|
||||
```
|
||||
|
||||
### Run an appservice server
|
||||
|
||||
This sends events from the network to [application
|
||||
services](https://matrix.org/docs/spec/application_service/unstable.html)
|
||||
running locally. This is only required if you want to support running
|
||||
application services on your homeserver.
|
||||
|
||||
```bash
|
||||
./bin/dendrite-appservice-server --config dendrite.yaml
|
||||
```
|
||||
|
|
|
@ -97,6 +97,7 @@ database:
|
|||
room_server: "postgres://dendrite:itsasecret@localhost/dendrite_roomserver?sslmode=disable"
|
||||
server_key: "postgres://dendrite:itsasecret@localhost/dendrite_serverkey?sslmode=disable"
|
||||
federation_sender: "postgres://dendrite:itsasecret@localhost/dendrite_federationsender?sslmode=disable"
|
||||
appservice: "postgres://dendrite:itsasecret@localhost/dendrite_appservice?sslmode=disable"
|
||||
public_rooms_api: "postgres://dendrite:itsasecret@localhost/dendrite_publicroomsapi?sslmode=disable"
|
||||
# If using naffka you need to specify a naffka database
|
||||
# naffka: "postgres://dendrite:itsasecret@localhost/dendrite_naffka?sslmode=disable"
|
||||
|
|
|
@ -2,9 +2,9 @@
|
|||
|
||||
This component interfaces with external [Application
|
||||
Services](https://matrix.org/docs/spec/application_service/unstable.html).
|
||||
This includes any HTTP endpoints that Application Services call, as well as talking
|
||||
to any HTTP endpoints that Application Services provide themselves.
|
||||
This includes any HTTP endpoints that application services call, as well as talking
|
||||
to any HTTP endpoints that application services provide themselves.
|
||||
|
||||
## Consumers
|
||||
|
||||
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing Application Services.
|
||||
This component consumes and filters events from the Roomserver Kafka stream, passing on any necessary events to subscribing application services.
|
|
@ -17,6 +17,8 @@ package appservice
|
|||
import (
|
||||
"github.com/matrix-org/dendrite/appservice/consumers"
|
||||
"github.com/matrix-org/dendrite/appservice/routing"
|
||||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/appservice/workers"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/common/basecomponent"
|
||||
"github.com/matrix-org/dendrite/common/transactions"
|
||||
|
@ -35,13 +37,34 @@ func SetupAppServiceAPIComponent(
|
|||
queryAPI api.RoomserverQueryAPI,
|
||||
transactionsCache *transactions.Cache,
|
||||
) {
|
||||
// Create a connection to the appservice postgres DB
|
||||
appserviceDB, err := storage.NewDatabase(string(base.Cfg.Database.AppService))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to appservice db")
|
||||
}
|
||||
|
||||
// Create a map that will keep a counter of events to be sent for each
|
||||
// application service. This serves as an effective cache so that transaction
|
||||
// workers do not need to query the database over and over in order to see
|
||||
// whether there are events for them to send, but rather they can just check if
|
||||
// their event counter is greater than zero. The counter for an application
|
||||
// service is incremented when an event meant for them is inserted into the
|
||||
// appservice database.
|
||||
eventCounterMap := make(map[string]int)
|
||||
|
||||
consumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, base.KafkaConsumer, accountsDB, queryAPI, aliasAPI,
|
||||
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
|
||||
queryAPI, aliasAPI, eventCounterMap,
|
||||
)
|
||||
if err := consumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start app service roomserver consumer")
|
||||
}
|
||||
|
||||
// Create application service transaction workers
|
||||
if err := workers.SetupTransactionWorkers(base.Cfg, appserviceDB, eventCounterMap); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start app service transaction workers")
|
||||
}
|
||||
|
||||
// Set up HTTP Endpoints
|
||||
routing.Setup(
|
||||
base.APIMux, *base.Cfg, queryAPI, aliasAPI, accountsDB,
|
||||
|
|
|
@ -17,8 +17,8 @@ package consumers
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
|
@ -31,26 +31,32 @@ import (
|
|||
|
||||
var (
|
||||
appServices []config.ApplicationService
|
||||
ecm map[string]int
|
||||
)
|
||||
|
||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||
type OutputRoomEventConsumer struct {
|
||||
roomServerConsumer *common.ContinualConsumer
|
||||
db *accounts.Database
|
||||
asDB *storage.Database
|
||||
query api.RoomserverQueryAPI
|
||||
alias api.RoomserverAliasAPI
|
||||
serverName string
|
||||
}
|
||||
|
||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
|
||||
// Start() to begin consuming from room servers.
|
||||
func NewOutputRoomEventConsumer(
|
||||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store *accounts.Database,
|
||||
appserviceDB *storage.Database,
|
||||
queryAPI api.RoomserverQueryAPI,
|
||||
aliasAPI api.RoomserverAliasAPI,
|
||||
eventCounterMap map[string]int,
|
||||
) *OutputRoomEventConsumer {
|
||||
appServices = cfg.Derived.ApplicationServices
|
||||
ecm = eventCounterMap
|
||||
|
||||
consumer := common.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
|
@ -60,6 +66,7 @@ func NewOutputRoomEventConsumer(
|
|||
s := &OutputRoomEventConsumer{
|
||||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
asDB: appserviceDB,
|
||||
query: queryAPI,
|
||||
alias: aliasAPI,
|
||||
serverName: string(cfg.Matrix.ServerName),
|
||||
|
@ -74,9 +81,10 @@ func (s *OutputRoomEventConsumer) Start() error {
|
|||
return s.roomServerConsumer.Start()
|
||||
}
|
||||
|
||||
// onMessage is called when the sync server receives a new event from the room server output log.
|
||||
// It is not safe for this function to be called from multiple goroutines, or else the
|
||||
// sync stream position may race and be incorrectly calculated.
|
||||
// onMessage is called when the sync server receives a new event from the room
|
||||
// server output log. It is not safe for this function to be called from
|
||||
// multiple goroutines, or else the sync stream position may race and be
|
||||
// incorrectly calculated.
|
||||
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
// Parse out the event JSON
|
||||
var output api.OutputEvent
|
||||
|
@ -165,13 +173,20 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
|
|||
// each namespace of each registered application service, and if there is a
|
||||
// match, adds the event to the queue for events to be sent to a particular
|
||||
// application service.
|
||||
func (s *OutputRoomEventConsumer) filterRoomserverEvents(ctx context.Context, events []gomatrixserverlib.Event) error {
|
||||
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||
ctx context.Context,
|
||||
events []gomatrixserverlib.Event,
|
||||
) error {
|
||||
for _, event := range events {
|
||||
for _, appservice := range appServices {
|
||||
// Check if this event is interesting to this application service
|
||||
if s.appserviceIsInterestedInEvent(ctx, event, appservice) {
|
||||
// TODO: Queue this event to be sent off to the application service
|
||||
fmt.Println(appservice.ID, "was interested in", event.Sender(), event.Type(), event.RoomID())
|
||||
// Queue this event to be sent off to the application service
|
||||
if err := s.asDB.StoreEvent(ctx, appservice.ID, event); err != nil {
|
||||
log.WithError(err).Warn("failed to insert incoming event into appservices database")
|
||||
} else {
|
||||
ecm[appservice.ID]++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const appserviceEventsSchema = `
|
||||
-- Stores events to be sent to application services
|
||||
CREATE TABLE IF NOT EXISTS appservice_events (
|
||||
-- An auto-incrementing id unique to each event in the table
|
||||
id SERIAL NOT NULL PRIMARY KEY,
|
||||
-- The ID of the application service the event will be sent to
|
||||
as_id TEXT,
|
||||
-- The ID of the event
|
||||
event_id TEXT,
|
||||
-- Unix seconds that the event was sent at from the originating server
|
||||
origin_server_ts BIGINT,
|
||||
-- The ID of the room that the event was sent in
|
||||
room_id TEXT,
|
||||
-- The type of the event (e.g. m.text)
|
||||
type TEXT,
|
||||
-- The ID of the user that sent the event
|
||||
sender TEXT,
|
||||
-- The JSON representation of the event. Text to avoid db JSON parsing
|
||||
event_json TEXT,
|
||||
-- The ID of the transaction that this event is a part of
|
||||
txn_id INTEGER
|
||||
);
|
||||
`
|
||||
|
||||
const selectEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT event_id, origin_server_ts, room_id, type, sender, event_json FROM appservice_events " +
|
||||
"WHERE as_id = $1 ORDER BY as_id LIMIT $2"
|
||||
|
||||
const countEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
|
||||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_json, txn_id) " +
|
||||
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
|
||||
|
||||
const deleteEventsByIDSQL = "" +
|
||||
"DELETE FROM appservice_events WHERE event_id = ANY($1)"
|
||||
|
||||
type eventsStatements struct {
|
||||
selectEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
countEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
deleteEventsByIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(appserviceEventsSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.deleteEventsByIDStmt, err = db.Prepare(deleteEventsByIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// selectEventsByApplicationServiceID takes in an application service ID and
|
||||
// returns a slice of events that need to be sent to that application service.
|
||||
func (s *eventsStatements) selectEventsByApplicationServiceID(
|
||||
ctx context.Context,
|
||||
applicationServiceID string,
|
||||
limit int,
|
||||
) (
|
||||
eventIDs []string,
|
||||
events []gomatrixserverlib.ApplicationServiceEvent,
|
||||
err error,
|
||||
) {
|
||||
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer eventRows.Close() // nolint: errcheck
|
||||
|
||||
// Iterate through each row and store event contents
|
||||
for eventRows.Next() {
|
||||
var eventID, originServerTimestamp, roomID, eventType, sender, eventContent *string
|
||||
if err = eventRows.Scan(
|
||||
&eventID,
|
||||
&originServerTimestamp,
|
||||
&roomID,
|
||||
&eventType,
|
||||
&sender,
|
||||
&eventContent,
|
||||
); err != nil || eventID == nil || roomID == nil || eventType == nil || sender == nil || eventContent == nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eventIDs = append(eventIDs, *eventID)
|
||||
|
||||
// Get age of the event from original timestamp and current time
|
||||
timestamp, err := strconv.ParseInt(*originServerTimestamp, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ageMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
age := ageMilli - timestamp
|
||||
|
||||
// Fit event content into AS event format
|
||||
event := gomatrixserverlib.ApplicationServiceEvent{
|
||||
Age: age,
|
||||
Content: gomatrixserverlib.RawJSON(*eventContent),
|
||||
EventID: *eventID,
|
||||
OriginServerTimestamp: timestamp,
|
||||
RoomID: *roomID,
|
||||
Sender: *sender,
|
||||
Type: *eventType,
|
||||
UserID: *sender,
|
||||
}
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// countEventsByApplicationServiceID inserts an event mapped to its corresponding application service
|
||||
// IDs into the db.
|
||||
func (s *eventsStatements) countEventsByApplicationServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
var count *int
|
||||
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if count == nil {
|
||||
return 0, errors.New("NULL value for application service count")
|
||||
}
|
||||
|
||||
return *count, nil
|
||||
}
|
||||
|
||||
// insertEvent inserts an event mapped to its corresponding application service
|
||||
// IDs into the db.
|
||||
func (s *eventsStatements) insertEvent(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
event gomatrixserverlib.Event,
|
||||
) (err error) {
|
||||
_, err = s.insertEventStmt.ExecContext(
|
||||
ctx,
|
||||
appServiceID,
|
||||
event.EventID(),
|
||||
event.OriginServerTS(),
|
||||
event.RoomID(),
|
||||
event.Type(),
|
||||
event.Sender(),
|
||||
event.Content(),
|
||||
nil,
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// deleteEventsByID removes events matching given IDs from the database.
|
||||
func (s *eventsStatements) deleteEventsByID(
|
||||
ctx context.Context,
|
||||
eventIDs []string,
|
||||
) (err error) {
|
||||
_, err = s.deleteEventsByIDStmt.ExecContext(ctx, pq.StringArray(eventIDs))
|
||||
return err
|
||||
}
|
110
src/github.com/matrix-org/dendrite/appservice/storage/storage.go
Normal file
110
src/github.com/matrix-org/dendrite/appservice/storage/storage.go
Normal file
|
@ -0,0 +1,110 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
// Import postgres database driver
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewDatabase opens a new database
|
||||
func NewDatabase(dataSourceName string) (*Database, error) {
|
||||
var result Database
|
||||
var err error
|
||||
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (d *Database) prepare() error {
|
||||
if err := d.events.prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.txnID.prepare(d.db)
|
||||
}
|
||||
|
||||
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
|
||||
// for a transaction worker to pull and later send to an application service.
|
||||
func (d *Database) StoreEvent(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
event gomatrixserverlib.Event,
|
||||
) error {
|
||||
return d.events.insertEvent(ctx, appServiceID, event)
|
||||
}
|
||||
|
||||
// GetEventsWithAppServiceID returns a slice of events and their IDs intended to
|
||||
// be sent to an application service given its ID.
|
||||
func (d *Database) GetEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
limit int,
|
||||
) ([]string, []gomatrixserverlib.ApplicationServiceEvent, error) {
|
||||
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
||||
}
|
||||
|
||||
// CountEventsWithAppServiceID returns the number of events destined for an
|
||||
// application service given its ID.
|
||||
func (d *Database) CountEventsWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
|
||||
}
|
||||
|
||||
// RemoveEventsByID removes events from the database given a slice of their
|
||||
// event IDs.
|
||||
func (d *Database) RemoveEventsByID(
|
||||
ctx context.Context,
|
||||
eventIDs []string,
|
||||
) error {
|
||||
return d.events.deleteEventsByID(ctx, eventIDs)
|
||||
}
|
||||
|
||||
// GetTxnIDWithAppServiceID takes in an application service ID and returns the
|
||||
// last used transaction ID associated with it.
|
||||
func (d *Database) GetTxnIDWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (int, error) {
|
||||
return d.txnID.selectTxnID(ctx, appServiceID)
|
||||
}
|
||||
|
||||
// UpsertTxnIDWithAppServiceID takes in an application service ID and a
|
||||
// transaction ID and stores them in the DB, unless the pair already exists, in
|
||||
// which case it updates them.
|
||||
func (d *Database) UpsertTxnIDWithAppServiceID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
txnID int,
|
||||
) error {
|
||||
return d.txnID.upsertTxnID(ctx, appServiceID, txnID)
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
const txnIDSchema = `
|
||||
-- Keeps a count of the current transaction ID per application service
|
||||
CREATE TABLE IF NOT EXISTS txn_id_counter (
|
||||
-- The ID of the application service the this count belongs to
|
||||
as_id TEXT NOT NULL PRIMARY KEY,
|
||||
-- The last-used transaction ID
|
||||
txn_id INTEGER NOT NULL
|
||||
);
|
||||
`
|
||||
|
||||
const selectTxnIDSQL = "" +
|
||||
"SELECT txn_id FROM txn_id_counter WHERE as_id = $1 LIMIT 1"
|
||||
|
||||
const upsertTxnIDSQL = "" +
|
||||
"INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " +
|
||||
"ON CONFLICT (as_id) DO UPDATE " +
|
||||
"SET txn_id = $2"
|
||||
|
||||
type txnStatements struct {
|
||||
selectTxnIDStmt *sql.Stmt
|
||||
upsertTxnIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *txnStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(txnIDSchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.selectTxnIDStmt, err = db.Prepare(selectTxnIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.upsertTxnIDStmt, err = db.Prepare(upsertTxnIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// selectTxnID inserts a new transactionID mapped to its corresponding
|
||||
// application service ID into the db.
|
||||
func (s *txnStatements) selectTxnID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
) (txnID int, err error) {
|
||||
rows, err := s.selectTxnIDStmt.QueryContext(ctx, appServiceID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
|
||||
// Scan the TxnID from the database and return
|
||||
rows.Next()
|
||||
err = rows.Scan(&txnID)
|
||||
return
|
||||
}
|
||||
|
||||
// upsertTxnID inserts or updates on existing rows a new transactionID mapped to
|
||||
// its corresponding application service ID into the db.
|
||||
func (s *txnStatements) upsertTxnID(
|
||||
ctx context.Context,
|
||||
appServiceID string,
|
||||
txnID int,
|
||||
) (err error) {
|
||||
_, err = s.upsertTxnIDStmt.ExecContext(ctx, appServiceID, txnID)
|
||||
return
|
||||
}
|
|
@ -0,0 +1,210 @@
|
|||
// Copyright 2018 Vector Creations Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package workers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/common/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO: Expose these in the config?
|
||||
// Maximum size of events sent in each transaction.
|
||||
transactionBatchSize = 50
|
||||
// Time to wait between checking for new events to send.
|
||||
transactionBreakTime = time.Millisecond * 50
|
||||
// Timeout for sending a single transaction to an application service.
|
||||
transactionTimeout = time.Second * 15
|
||||
// The current transaction ID. Increments after every successful transaction.
|
||||
currentTransactionID = 0
|
||||
)
|
||||
|
||||
// SetupTransactionWorkers spawns a separate goroutine for each application
|
||||
// service. Each of these "workers" handle taking all events intended for their
|
||||
// app service, batch them up into a single transaction (up to a max transaction
|
||||
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
|
||||
// handles exponentially backing off in case the AS isn't currently available.
|
||||
func SetupTransactionWorkers(
|
||||
cfg *config.Dendrite,
|
||||
appserviceDB *storage.Database,
|
||||
// Each worker has access to an event counter, which keeps track of the amount
|
||||
// of events they still have to send off. The roomserver consumer
|
||||
// (consumers/roomserver.go) increments this counter every time a new event for
|
||||
// a specific application service is inserted into the database, whereas the
|
||||
// counter is decremented by a certain amount when a worker sends some amount
|
||||
// of events successfully to an application service. To ensure recovery in the
|
||||
// event of a crash, this counter is initialized to the amount of events meant
|
||||
// to be sent by a specific worker in the database, so that state is not lost.
|
||||
eventCounterMap map[string]int,
|
||||
) error {
|
||||
// Create a worker that handles transmitting events to a single homeserver
|
||||
for _, appservice := range cfg.Derived.ApplicationServices {
|
||||
// Don't create a worker if this AS doesn't want to receive events
|
||||
if appservice.URL != "" {
|
||||
go worker(appserviceDB, appservice, eventCounterMap)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// worker is a goroutine that
|
||||
func worker(db *storage.Database, as config.ApplicationService, ecm map[string]int) {
|
||||
// Initialize transaction ID counter
|
||||
var err error
|
||||
currentTransactionID, err = db.GetTxnIDWithAppServiceID(context.TODO(), as.ID)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warnf("appservice worker for %s unable to get latest transaction ID from DB",
|
||||
as.ID)
|
||||
}
|
||||
|
||||
// Create an HTTP client for sending requests to app services
|
||||
client := &http.Client{
|
||||
Timeout: transactionTimeout,
|
||||
}
|
||||
|
||||
// Initialize counter to amount of events currently in the database
|
||||
eventCount, err := db.CountEventsWithAppServiceID(context.TODO(), as.ID)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warn("appservice worker unable to count queued events from DB")
|
||||
}
|
||||
ecm[as.ID] = eventCount
|
||||
|
||||
// Initialize backoff exponent (2^x secs). Max 9, aka 512s.
|
||||
backoff := 0
|
||||
|
||||
// Loop forever and keep waiting for more events to send
|
||||
for {
|
||||
// Check if there are any events to send
|
||||
if ecm[as.ID] > 0 {
|
||||
ctx := context.TODO()
|
||||
|
||||
eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, as.ID, transactionBatchSize)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("appservice worker unable to read queued events from DB")
|
||||
|
||||
// Wait a little bit for DB to possibly recover
|
||||
time.Sleep(transactionBreakTime)
|
||||
continue
|
||||
}
|
||||
|
||||
// Batch events up into a transaction
|
||||
transactionJSON, err := createTransaction(events)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("appservice worker unable to marshal events")
|
||||
|
||||
// Wait a little bit before trying again
|
||||
time.Sleep(transactionBreakTime)
|
||||
continue
|
||||
}
|
||||
|
||||
// Send the events off to the application service
|
||||
eventsSent, err := send(client, as, transactionJSON, len(events))
|
||||
if err != nil {
|
||||
// Calculate how long to backoff for
|
||||
backoffDuration := time.Duration(math.Pow(2, float64(backoff)))
|
||||
backoffSeconds := time.Second * backoffDuration
|
||||
logrus.WithError(err).Warnf("unable to send transactions to %s, backing off for %ds",
|
||||
as.ID, backoffDuration)
|
||||
|
||||
// Increment backoff count
|
||||
backoff++
|
||||
if backoff > 9 {
|
||||
backoff = 9
|
||||
}
|
||||
|
||||
// Backoff
|
||||
time.Sleep(backoffSeconds)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// We sent successfully, hooray!
|
||||
backoff = 0
|
||||
ecm[as.ID] -= eventsSent
|
||||
|
||||
// Remove sent events from the DB
|
||||
err = db.RemoveEventsByID(ctx, eventIDs)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s",
|
||||
as.ID)
|
||||
}
|
||||
|
||||
// Update transactionID
|
||||
currentTransactionID++
|
||||
if err = db.UpsertTxnIDWithAppServiceID(context.TODO(), as.ID, currentTransactionID); err != nil {
|
||||
logrus.WithError(err).Errorf("unable to update transaction ID for %s",
|
||||
as.ID)
|
||||
}
|
||||
} else {
|
||||
// If not, wait a bit and try again
|
||||
time.Sleep(transactionBreakTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createTransaction takes in a slice of AS events, stores them in an AS
|
||||
// transaction, and JSON-encodes the results.
|
||||
func createTransaction(
|
||||
events []gomatrixserverlib.ApplicationServiceEvent,
|
||||
) ([]byte, error) {
|
||||
// Create a transactions and store the events inside
|
||||
transaction := gomatrixserverlib.ApplicationServiceTransaction{
|
||||
Events: events,
|
||||
}
|
||||
|
||||
transactionJSON, err := json.Marshal(transaction)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return transactionJSON, nil
|
||||
}
|
||||
|
||||
// send sends events to an application service. Returns an error if an OK was not
|
||||
// received back from the application service or the request timed out.
|
||||
func send(
|
||||
client *http.Client,
|
||||
appservice config.ApplicationService,
|
||||
transaction []byte,
|
||||
count int,
|
||||
) (int, error) {
|
||||
// POST a transaction to our AS.
|
||||
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, currentTransactionID)
|
||||
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close() // nolint: errcheck
|
||||
|
||||
// Check the AS received the events correctly
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return 0, fmt.Errorf(
|
||||
"Non-OK status code %d returned from AS", resp.StatusCode,
|
||||
)
|
||||
}
|
||||
|
||||
// Return amount of sent events
|
||||
return count, nil
|
||||
}
|
|
@ -35,7 +35,7 @@ CREATE TABLE IF NOT EXISTS account_accounts (
|
|||
created_ts BIGINT NOT NULL,
|
||||
-- The password hash for this account. Can be NULL if this is a passwordless account.
|
||||
password_hash TEXT,
|
||||
-- Identifies which Application Service this account belongs to, if any.
|
||||
-- Identifies which application service this account belongs to, if any.
|
||||
appservice_id TEXT
|
||||
-- TODO:
|
||||
-- is_guest, is_admin, upgraded_ts, devices, any email reset stuff?
|
||||
|
|
|
@ -138,9 +138,9 @@ func (d *Database) UpdateDevice(
|
|||
}
|
||||
|
||||
// RemoveDevice revokes a device by deleting the entry in the database
|
||||
// matching with the given device ID and user ID localpart
|
||||
// matching with the given device ID and user ID localpart.
|
||||
// If the device doesn't exist, it will not return an error
|
||||
// If something went wrong during the deletion, it will return the SQL error
|
||||
// If something went wrong during the deletion, it will return the SQL error.
|
||||
func (d *Database) RemoveDevice(
|
||||
ctx context.Context, deviceID, localpart string,
|
||||
) error {
|
||||
|
|
|
@ -115,7 +115,7 @@ type registerRequest struct {
|
|||
|
||||
InitialDisplayName *string `json:"initial_device_display_name"`
|
||||
|
||||
// Application Services place Type in the root of their registration
|
||||
// Application services place Type in the root of their registration
|
||||
// request, whereas clients place it in the authDict struct.
|
||||
Type authtypes.LoginType `json:"type"`
|
||||
}
|
||||
|
@ -281,16 +281,16 @@ func validateRecaptcha(
|
|||
}
|
||||
|
||||
// UsernameIsWithinApplicationServiceNamespace checks to see if a username falls
|
||||
// within any of the namespaces of a given Application Service. If no
|
||||
// Application Service is given, it will check to see if it matches any
|
||||
// Application Service's namespace.
|
||||
// within any of the namespaces of a given application service. If no
|
||||
// application service is given, it will check to see if it matches any
|
||||
// application service's namespace.
|
||||
func UsernameIsWithinApplicationServiceNamespace(
|
||||
cfg *config.Dendrite,
|
||||
username string,
|
||||
appservice *config.ApplicationService,
|
||||
) bool {
|
||||
if appservice != nil {
|
||||
// Loop through given Application Service's namespaces and see if any match
|
||||
// Loop through given application service's namespaces and see if any match
|
||||
for _, namespace := range appservice.NamespaceMap["users"] {
|
||||
// AS namespaces are checked for validity in config
|
||||
if namespace.RegexpObject.MatchString(username) {
|
||||
|
@ -300,7 +300,7 @@ func UsernameIsWithinApplicationServiceNamespace(
|
|||
return false
|
||||
}
|
||||
|
||||
// Loop through all known Application Service's namespaces and see if any match
|
||||
// Loop through all known application service's namespaces and see if any match
|
||||
for _, knownAppservice := range cfg.Derived.ApplicationServices {
|
||||
for _, namespace := range knownAppservice.NamespaceMap["users"] {
|
||||
// AS namespaces are checked for validity in config
|
||||
|
@ -509,7 +509,7 @@ func handleRegistrationFlow(
|
|||
sessions.AddCompletedStage(sessionID, authtypes.LoginTypeSharedSecret)
|
||||
|
||||
case authtypes.LoginTypeApplicationService:
|
||||
// Check Application Service register user request is valid.
|
||||
// Check application service register user request is valid.
|
||||
// The application service's ID is returned if so.
|
||||
appserviceID, err := validateApplicationService(cfg, req, r.Username)
|
||||
|
||||
|
|
|
@ -108,8 +108,6 @@ func setupRegexps(cfg *Dendrite) (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
fmt.Println(exclusiveUsernameStrings, exclusiveAliasStrings)
|
||||
|
||||
// Join the regexes together into one big regex.
|
||||
// i.e. "app1.*", "app2.*" -> "(app1.*)|(app2.*)"
|
||||
// Later we can check if a username or alias matches any exclusive regex and
|
||||
|
@ -167,13 +165,13 @@ func checkErrors(config *Dendrite) (err error) {
|
|||
// can have the same ID or token.
|
||||
if idMap[appservice.ID] {
|
||||
return configErrors([]string{fmt.Sprintf(
|
||||
"Application Service ID %s must be unique", appservice.ID,
|
||||
"Application service ID %s must be unique", appservice.ID,
|
||||
)})
|
||||
}
|
||||
// Check if we've already seen this token
|
||||
if tokenMap[appservice.ASToken] {
|
||||
return configErrors([]string{fmt.Sprintf(
|
||||
"Application Service Token %s must be unique", appservice.ASToken,
|
||||
"Application service Token %s must be unique", appservice.ASToken,
|
||||
)})
|
||||
}
|
||||
|
||||
|
@ -189,7 +187,7 @@ func checkErrors(config *Dendrite) (err error) {
|
|||
// namespace, which often ends up in an application service receiving events
|
||||
// it doesn't want, as an empty regex will match all events.
|
||||
return configErrors([]string{fmt.Sprintf(
|
||||
"Application Service namespace can only contain a single regex tuple. Check your YAML.",
|
||||
"Application service namespace can only contain a single regex tuple. Check your YAML.",
|
||||
)})
|
||||
}
|
||||
}
|
||||
|
@ -201,7 +199,7 @@ func checkErrors(config *Dendrite) (err error) {
|
|||
for _, namespace := range namespaceSlice {
|
||||
if !IsValidRegex(namespace.Regex) {
|
||||
return configErrors([]string{fmt.Sprintf(
|
||||
"Invalid regex string for Application Service %s", appservice.ID,
|
||||
"Invalid regex string for application service %s", appservice.ID,
|
||||
)})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,6 +162,9 @@ type Dendrite struct {
|
|||
// The FederationSender database stores information used by the FederationSender
|
||||
// It is only accessed by the FederationSender.
|
||||
FederationSender DataSource `yaml:"federation_sender"`
|
||||
// The AppServices database stores information used by the AppService component.
|
||||
// It is only accessed by the AppService component.
|
||||
AppService DataSource `yaml:"appservice"`
|
||||
// The PublicRoomsAPI database stores information used to compute the public
|
||||
// room directory. It is only accessed by the PublicRoomsAPI server.
|
||||
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
|
||||
|
@ -231,15 +234,15 @@ type Dendrite struct {
|
|||
Params map[string]interface{} `json:"params"`
|
||||
}
|
||||
|
||||
// Application Services parsed from their config files
|
||||
// Application services parsed from their config files
|
||||
// The paths of which were given above in the main config file
|
||||
ApplicationServices []ApplicationService
|
||||
|
||||
// Meta-regexes compiled from all exclusive Application Service
|
||||
// Meta-regexes compiled from all exclusive application service
|
||||
// Regexes.
|
||||
//
|
||||
// When a user registers, we check that their username does not match any
|
||||
// exclusive Application Service namespaces
|
||||
// exclusive application service namespaces
|
||||
ExclusiveApplicationServicesUsernameRegexp *regexp.Regexp
|
||||
// When a user creates a room alias, we check that it isn't already
|
||||
// reserved by an application service
|
||||
|
|
Loading…
Reference in a new issue