Remove BaseDendrite (#3023)

Removes `BaseDendrite` to, hopefully, make testing and composing of
components easier in the future.
This commit is contained in:
Till 2023-03-22 09:21:32 +01:00 committed by GitHub
parent ec6879e5ae
commit 5e85a00cb3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
68 changed files with 1186 additions and 1002 deletions

View file

@ -22,7 +22,7 @@ var ctx = context.Background()
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
connStr, close := test.PrepareDBConnectionString(t, dbType)
cm := sqlutil.NewConnectionManager()
cm := sqlutil.NewConnectionManager(nil, config.DatabaseOptions{})
db, err := storage.NewSyncServerDatasource(context.Background(), cm, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})

View file

@ -18,12 +18,15 @@ import (
"context"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
@ -39,16 +42,19 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
// component.
func AddPublicRoutes(
base *base.BaseDendrite,
processContext *process.ProcessContext,
routers httputil.Routers,
dendriteCfg *config.Dendrite,
cm sqlutil.Connections,
natsInstance *jetstream.NATSInstance,
userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI,
caches caching.LazyLoadCache,
enableMetrics bool,
) {
cfg := &base.Cfg.SyncAPI
js, natsClient := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database)
syncDB, err := storage.NewSyncServerDatasource(processContext.Context(), cm, &dendriteCfg.SyncAPI.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
@ -62,32 +68,32 @@ func AddPublicRoutes(
}
var fts *fulltext.Search
if cfg.Fulltext.Enabled {
fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext)
if dendriteCfg.SyncAPI.Fulltext.Enabled {
fts, err = fulltext.New(processContext.Context(), dendriteCfg.SyncAPI.Fulltext)
if err != nil {
logrus.WithError(err).Panicf("failed to create full text")
}
base.ProcessContext.ComponentStarted()
processContext.ComponentStarted()
}
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
Topic: dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js,
}
presenceConsumer := consumers.NewPresenceConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB,
processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB,
notifier, streams.PresenceStreamProvider,
userAPI,
)
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, base.EnableMetrics)
requestPool := sync.NewRequestPool(syncDB, &dendriteCfg.SyncAPI, userAPI, rsAPI, streams, notifier, federationPresenceProducer, presenceConsumer, enableMetrics)
if err = presenceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start presence consumer")
}
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
processContext, &dendriteCfg.SyncAPI, dendriteCfg.Global.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider,
)
@ -96,7 +102,7 @@ func AddPublicRoutes(
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, fts,
)
if err = roomConsumer.Start(); err != nil {
@ -104,7 +110,7 @@ func AddPublicRoutes(
}
clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
processContext, &dendriteCfg.SyncAPI, js, natsClient, syncDB, notifier,
streams.AccountDataStreamProvider, fts,
)
if err = clientConsumer.Start(); err != nil {
@ -112,35 +118,35 @@ func AddPublicRoutes(
}
notificationConsumer := consumers.NewOutputNotificationDataConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider,
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.NotificationDataStreamProvider,
)
if err = notificationConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start notification data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
base.ProcessContext, cfg, js, eduCache, notifier, streams.TypingStreamProvider,
processContext, &dendriteCfg.SyncAPI, js, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
base.ProcessContext, cfg, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider,
processContext, &dendriteCfg.SyncAPI, js, syncDB, userAPI, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider,
processContext, &dendriteCfg.SyncAPI, js, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
routing.Setup(
base.Routers.Client, requestPool, syncDB, userAPI,
rsAPI, cfg, caches, fts,
routers.Client, requestPool, syncDB, userAPI,
rsAPI, &dendriteCfg.SyncAPI, caches, fts,
)
}

View file

@ -11,6 +11,9 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
@ -22,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
@ -114,14 +116,17 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
natsInstance := jetstream.NATSInstance{}
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
msgs := toNATSMsgs(t, cfg, room.Events()...)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
@ -156,7 +161,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
},
}
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
@ -164,7 +169,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
for _, tc := range testCases {
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, tc.req)
routers.Client.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
@ -207,26 +212,29 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// order is:
// m.room.create
// m.room.member
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
msgs := toNATSMsgs(t, base, room.Events()...)
msgs := toNATSMsgs(t, cfg, room.Events()...)
sinceTokens := make([]string, len(msgs))
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches, caching.DisableMetrics)
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})))
@ -256,7 +264,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens {
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"since": since,
@ -298,17 +306,20 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, close := testrig.CreateBaseDendrite(t, dbType)
base.Cfg.Global.Presence.EnableOutbound = true
base.Cfg.Global.Presence.EnableInbound = true
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
cfg.Global.Presence.EnableOutbound = true
cfg.Global.Presence.EnableInbound = true
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"set_presence": "online",
@ -414,17 +425,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
userType = "real user"
}
base, close := testrig.CreateBaseDendrite(t, dbType)
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use the actual internal roomserver API
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -439,7 +453,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody)
return gjson.Get(syncBody, path).Exists()
@ -448,7 +462,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
@ -479,7 +493,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
syncUntil(t, routers, aliceDev.AccessToken, false,
func(syncBody string) bool {
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody)
return gjson.Get(syncBody, path).Exists()
@ -488,7 +502,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
@ -714,18 +728,20 @@ func TestGetMembership(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, close := testrig.CreateBaseDendrite(t, dbType)
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
natsInstance := jetstream.NATSInstance{}
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
// Use an actual roomserver for this
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches, caching.DisableMetrics)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -745,7 +761,7 @@ func TestGetMembership(t *testing.T) {
if tc.useSleep {
time.Sleep(time.Millisecond * 100)
} else {
syncUntil(t, base, aliceDev.AccessToken, false, func(syncBody string) bool {
syncUntil(t, routers, aliceDev.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID())
return gjson.Get(syncBody, path).Exists()
@ -753,7 +769,7 @@ func TestGetMembership(t *testing.T) {
}
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, tc.request(t, room))
routers.Client.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
@ -786,16 +802,19 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
natsInstance := jetstream.NATSInstance{}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches, caching.DisableMetrics)
producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
JetStream: jsctx,
}
@ -881,7 +900,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
syncUntil(t, base, alice.AccessToken,
syncUntil(t, routers, alice.AccessToken,
len(tc.want) == 0,
func(body string) bool {
return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists()
@ -890,7 +909,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
@ -1004,15 +1023,18 @@ func testContext(t *testing.T, dbType test.DBType) {
AccountType: userapi.AccountTypeUser,
}
base, baseClose := testrig.CreateBaseDendrite(t, dbType)
defer baseClose()
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
routers := httputil.NewRouters()
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
defer close()
// Use an actual roomserver for this
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches, caching.DisableMetrics)
room := test.NewRoom(t, user)
@ -1025,10 +1047,10 @@ func testContext(t *testing.T, dbType test.DBType) {
t.Fatalf("failed to send events: %v", err)
}
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
jsctx, _ := natsInstance.Prepare(processCtx, &cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &cfg.Global.JetStream)
syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
syncUntil(t, routers, alice.AccessToken, false, func(syncBody string) bool {
// wait for the last sent eventID to come down sync
path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
return gjson.Get(syncBody, path).Exists()
@ -1055,7 +1077,7 @@ func testContext(t *testing.T, dbType test.DBType) {
params[k] = v
}
}
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
if tc.wantError && w.Code == 200 {
t.Fatalf("Expected an error, but got none")
@ -1143,9 +1165,10 @@ func TestUpdateRelations(t *testing.T) {
room := test.NewRoom(t, alice)
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, shutdownBase := testrig.CreateBaseDendrite(t, dbType)
t.Cleanup(shutdownBase)
db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database)
cfg, processCtx, close := testrig.CreateConfig(t, dbType)
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
t.Cleanup(close)
db, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
if err != nil {
t.Fatal(err)
}
@ -1167,10 +1190,11 @@ func TestUpdateRelations(t *testing.T) {
}
func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string,
routers httputil.Routers, accessToken string,
skip bool,
checkFunc func(syncBody string) bool,
) {
t.Helper()
if checkFunc == nil {
t.Fatalf("No checkFunc defined")
}
@ -1184,7 +1208,7 @@ func syncUntil(t *testing.T,
go func() {
for {
w := httptest.NewRecorder()
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))
@ -1202,14 +1226,14 @@ func syncUntil(t *testing.T,
}
}
func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
func toNATSMsgs(t *testing.T, cfg *config.Dendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg {
result := make([]*nats.Msg, len(input))
for i, ev := range input {
var addsStateIDs []string
if ev.StateKey() != nil {
addsStateIDs = append(addsStateIDs, ev.EventID())
}
result[i] = testrig.NewOutputEventMsg(t, base, ev.RoomID(), api.OutputEvent{
result[i] = testrig.NewOutputEventMsg(t, cfg, ev.RoomID(), api.OutputEvent{
Type: rsapi.OutputTypeNewRoomEvent,
NewRoomEvent: &rsapi.OutputNewRoomEvent{
Event: ev,