mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
BREAKING: Make eduserver/appservice use userapi (#1138)
* BREAKING: Make eduserver/appservice use userapi This is a breaking change because this PR restructures how the AS API tracks its position in Kafka streams. Previously, it used the account DB to store partition offsets. However, this is also being used by `clientapi` for the same purpose, which is bad (each component needs to store offsets independently or else you might lose messages across restarts). This PR changes this behaviour to now store partition offsets in the `appservice` database. This means that: - Upon restart, the `appservice` component will attempt to replay all room events from the beginning of time. - An additional table will be created in the appservice database, which in and of itself is backwards compatible. * Return ErrorConflict
This commit is contained in:
parent
83391da0e0
commit
e15a8042a1
19 changed files with 207 additions and 53 deletions
|
@ -16,7 +16,6 @@ package appservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -29,12 +28,10 @@ import (
|
|||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/appservice/types"
|
||||
"github.com/matrix-org/dendrite/appservice/workers"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/internal/setup"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -47,8 +44,7 @@ func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQuer
|
|||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(
|
||||
base *setup.BaseDendrite,
|
||||
accountsDB accounts.Database,
|
||||
deviceDB devices.Database,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
) appserviceAPI.AppServiceQueryAPI {
|
||||
// Create a connection to the appservice postgres DB
|
||||
|
@ -70,7 +66,7 @@ func NewInternalAPI(
|
|||
workerStates[i] = ws
|
||||
|
||||
// Create bot account for this AS if it doesn't already exist
|
||||
if err = generateAppServiceAccount(accountsDB, deviceDB, appservice); err != nil {
|
||||
if err = generateAppServiceAccount(userAPI, appservice); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"appservice": appservice.ID,
|
||||
}).WithError(err).Panicf("failed to generate bot account for appservice")
|
||||
|
@ -90,7 +86,7 @@ func NewInternalAPI(
|
|||
// We can't add ASes at runtime so this is safe to do.
|
||||
if len(workerStates) > 0 {
|
||||
consumer := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
|
||||
base.Cfg, base.KafkaConsumer, appserviceDB,
|
||||
rsAPI, workerStates,
|
||||
)
|
||||
if err := consumer.Start(); err != nil {
|
||||
|
@ -109,22 +105,24 @@ func NewInternalAPI(
|
|||
// `sender_localpart` field of each application service if it doesn't
|
||||
// exist already
|
||||
func generateAppServiceAccount(
|
||||
accountsDB accounts.Database,
|
||||
deviceDB devices.Database,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
as config.ApplicationService,
|
||||
) error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create an account for the application service
|
||||
_, err := accountsDB.CreateAccount(ctx, as.SenderLocalpart, "", as.ID)
|
||||
var accRes userapi.PerformAccountCreationResponse
|
||||
err := userAPI.PerformAccountCreation(context.Background(), &userapi.PerformAccountCreationRequest{
|
||||
Localpart: as.SenderLocalpart,
|
||||
AppServiceID: as.ID,
|
||||
OnConflict: userapi.ConflictUpdate,
|
||||
}, &accRes)
|
||||
if err != nil {
|
||||
if errors.Is(err, sqlutil.ErrUserExists) { // This account already exists
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a dummy device with a dummy token for the application service
|
||||
_, err = deviceDB.CreateDevice(ctx, as.SenderLocalpart, nil, as.ASToken, &as.SenderLocalpart)
|
||||
var devRes userapi.PerformDeviceCreationResponse
|
||||
err = userAPI.PerformDeviceCreation(context.Background(), &userapi.PerformDeviceCreationRequest{
|
||||
Localpart: as.SenderLocalpart,
|
||||
AccessToken: as.ASToken,
|
||||
DeviceID: &as.SenderLocalpart,
|
||||
DeviceDisplayName: &as.SenderLocalpart,
|
||||
}, &devRes)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/appservice/storage"
|
||||
"github.com/matrix-org/dendrite/appservice/types"
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
@ -33,7 +32,6 @@ import (
|
|||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||
type OutputRoomEventConsumer struct {
|
||||
roomServerConsumer *internal.ContinualConsumer
|
||||
db accounts.Database
|
||||
asDB storage.Database
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
serverName string
|
||||
|
@ -45,7 +43,6 @@ type OutputRoomEventConsumer struct {
|
|||
func NewOutputRoomEventConsumer(
|
||||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store accounts.Database,
|
||||
appserviceDB storage.Database,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
workerStates []types.ApplicationServiceWorkerState,
|
||||
|
@ -53,11 +50,10 @@ func NewOutputRoomEventConsumer(
|
|||
consumer := internal.ContinualConsumer{
|
||||
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: store,
|
||||
PartitionStore: appserviceDB,
|
||||
}
|
||||
s := &OutputRoomEventConsumer{
|
||||
roomServerConsumer: &consumer,
|
||||
db: store,
|
||||
asDB: appserviceDB,
|
||||
rsAPI: rsAPI,
|
||||
serverName: string(cfg.Matrix.ServerName),
|
||||
|
|
|
@ -17,10 +17,12 @@ package storage
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database interface {
|
||||
internal.PartitionStorer
|
||||
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
|
||||
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
|
||||
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
sqlutil.PartitionOffsetStatements
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
|
@ -42,6 +43,9 @@ func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (*Dat
|
|||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
// Database stores events intended to be later sent to application services
|
||||
type Database struct {
|
||||
sqlutil.PartitionOffsetStatements
|
||||
events eventsStatements
|
||||
txnID txnStatements
|
||||
db *sql.DB
|
||||
|
@ -46,6 +47,9 @@ func NewDatabase(dataSourceName string) (*Database, error) {
|
|||
if err = result.prepare(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = result.PartitionOffsetStatements.Prepare(result.db, "appservice"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue