Preparations for removing BaseDendrite (#3016)

Preparations to actually remove/replace `BaseDendrite`.
Quite a few changes:
- SyncAPI accepts an `fulltext.Indexer` interface (fulltext is removed
from `BaseDendrite`)
- Caches are removed from `BaseDendrite`
- Introduces a `Router` struct (likely to change)
  - also fixes #2903
- Introduces a `sqlutil.ConnectionManager`, which should remove
`base.DatabaseConnection` later on
- probably more
This commit is contained in:
Till 2023-03-17 12:09:45 +01:00 committed by GitHub
parent d88f71ab71
commit 5579121c6f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
85 changed files with 722 additions and 470 deletions

View file

@ -50,7 +50,7 @@ type OutputClientDataConsumer struct {
stream streams.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
fts *fulltext.Search
fts fulltext.Indexer
cfg *config.SyncAPI
}

View file

@ -51,7 +51,7 @@ type OutputRoomEventConsumer struct {
pduStream streams.StreamProvider
inviteStream streams.StreamProvider
notifier *notifier.Notifier
fts *fulltext.Search
fts fulltext.Indexer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.

View file

@ -43,7 +43,7 @@ func Setup(
rsAPI api.SyncRoomserverAPI,
cfg *config.SyncAPI,
lazyLoadCache caching.LazyLoadCache,
fts *fulltext.Search,
fts fulltext.Indexer,
) {
v1unstablemux := csMux.PathPrefix("/{apiversion:(?:v1|unstable)}/").Subrouter()
v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter()

View file

@ -37,7 +37,7 @@ import (
)
// nolint:gocyclo
func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts *fulltext.Search, from *string) util.JSONResponse {
func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts fulltext.Indexer, from *string) util.JSONResponse {
start := time.Now()
var (
searchReq SearchRequest

View file

@ -16,12 +16,12 @@
package postgres
import (
"context"
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
@ -36,10 +36,10 @@ type SyncServerDatasource struct {
}
// NewDatabase creates a new sync server database
func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
func NewDatabase(ctx context.Context, cm sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
if d.db, d.writer, err = cm.Connection(dbProperties); err != nil {
return nil, err
}
accountData, err := NewPostgresAccountDataTable(d.db)
@ -111,7 +111,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
},
)
err = m.Up(base.Context())
err = m.Up(ctx)
if err != nil {
return nil, err
}

View file

@ -20,7 +20,6 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
@ -37,13 +36,14 @@ type SyncServerDatasource struct {
// NewDatabase creates a new sync server database
// nolint: gocyclo
func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
var d SyncServerDatasource
var err error
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil {
return nil, err
}
if err = d.prepare(base.Context()); err != nil {
if err = d.prepare(ctx); err != nil {
return nil, err
}
return &d, nil

View file

@ -18,21 +18,22 @@
package storage
import (
"context"
"fmt"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
)
// NewSyncServerDatasource opens a database connection.
func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(base, dbProperties)
return sqlite3.NewDatabase(ctx, conMan, dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(base, dbProperties)
return postgres.NewDatabase(ctx, conMan, dbProperties)
default:
return nil, fmt.Errorf("unexpected database type")
}

View file

@ -9,27 +9,27 @@ import (
"reflect"
"testing"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/matrix-org/gomatrixserverlib"
"github.com/stretchr/testify/assert"
)
var ctx = context.Background()
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) {
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
connStr, close := test.PrepareDBConnectionString(t, dbType)
base, closeBase := testrig.CreateBaseDendrite(t, dbType)
db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{
cm := sqlutil.NewConnectionManager()
db, err := storage.NewSyncServerDatasource(context.Background(), cm, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
})
if err != nil {
t.Fatalf("NewSyncServerDatasource returned %s", err)
}
return db, close, closeBase
return db, close
}
func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
@ -55,9 +55,8 @@ func TestWriteEvents(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
db, close, closeBase := MustCreateDatabase(t, dbType)
db, close := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
MustWriteEvents(t, db, r.Events())
})
}
@ -76,9 +75,8 @@ func WithSnapshot(t *testing.T, db storage.Database, f func(snapshot storage.Dat
// These tests assert basic functionality of RecentEvents for PDUs
func TestRecentEventsPDU(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
db, close := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
alice := test.NewUser(t)
// dummy room to make sure SQL queries are filtering on room ID
MustWriteEvents(t, db, test.NewRoom(t, alice).Events())
@ -191,9 +189,8 @@ func TestRecentEventsPDU(t *testing.T) {
// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token
func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
db, close := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
alice := test.NewUser(t)
r := test.NewRoom(t, alice)
for i := 0; i < 10; i++ {
@ -276,9 +273,8 @@ func TestStreamToTopologicalPosition(t *testing.T) {
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
db, close := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
txn, err := db.NewDatabaseTransaction(ctx)
if err != nil {
@ -514,9 +510,8 @@ func TestSendToDeviceBehaviour(t *testing.T) {
bob := test.NewUser(t)
deviceID := "one"
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
db, close := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
// At this point there should be no messages. We haven't sent anything
// yet.
@ -899,9 +894,8 @@ func TestRoomSummary(t *testing.T) {
}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close, closeBase := MustCreateDatabase(t, dbType)
db, close := MustCreateDatabase(t, dbType)
defer close()
defer closeBase()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -939,11 +933,8 @@ func TestRecentEvents(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
filter := gomatrixserverlib.DefaultRoomEventFilter()
db, close, closeBase := MustCreateDatabase(t, dbType)
t.Cleanup(func() {
close()
closeBase()
})
db, close := MustCreateDatabase(t, dbType)
t.Cleanup(close)
MustWriteEvents(t, db, room1.Events())
MustWriteEvents(t, db, room2.Events())

View file

@ -15,18 +15,19 @@
package storage
import (
"context"
"fmt"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3"
)
// NewPublicRoomsServerDatabase opens a database connection.
func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) {
func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(base, dbProperties)
return sqlite3.NewDatabase(ctx, conMan, dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation")
default:

View file

@ -17,6 +17,7 @@ package syncapi
import (
"context"
"github.com/matrix-org/dendrite/internal/fulltext"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal/caching"
@ -41,24 +42,34 @@ func AddPublicRoutes(
base *base.BaseDendrite,
userAPI userapi.SyncUserAPI,
rsAPI api.SyncRoomserverAPI,
caches caching.LazyLoadCache,
) {
cfg := &base.Cfg.SyncAPI
js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database)
syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to sync db")
}
eduCache := caching.NewTypingCache()
notifier := notifier.NewNotifier()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, base.Caches, notifier)
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, caches, notifier)
notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
var fts *fulltext.Search
if cfg.Fulltext.Enabled {
fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext)
if err != nil {
logrus.WithError(err).Panicf("failed to create full text")
}
base.ProcessContext.ComponentStarted()
}
federationPresenceProducer := &producers.FederationAPIPresenceProducer{
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
JetStream: js,
@ -86,7 +97,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI, base.Fulltext,
streams.InviteStreamProvider, rsAPI, fts,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@ -94,7 +105,7 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, natsClient, syncDB, notifier,
streams.AccountDataStreamProvider, base.Fulltext,
streams.AccountDataStreamProvider, fts,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
@ -129,7 +140,7 @@ func AddPublicRoutes(
}
routing.Setup(
base.PublicClientAPIMux, requestPool, syncDB, userAPI,
rsAPI, cfg, base.Caches, base.Fulltext,
base.Routers.Client, requestPool, syncDB, userAPI,
rsAPI, cfg, caches, fts,
)
}

View file

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
@ -114,12 +115,13 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
}
base, close := testrig.CreateBaseDendrite(t, dbType)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
defer close()
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
msgs := toNATSMsgs(t, base, room.Events()...)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}})
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
testrig.MustPublishMsgs(t, jsctx, msgs...)
testCases := []struct {
@ -162,7 +164,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) {
for _, tc := range testCases {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, tc.req)
base.Routers.Client.ServeHTTP(w, tc.req)
if w.Code != tc.wantCode {
t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
}
@ -218,12 +220,13 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
// m.room.history_visibility
msgs := toNATSMsgs(t, base, room.Events()...)
sinceTokens := make([]string, len(msgs))
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}})
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches)
for i, msg := range msgs {
testrig.MustPublishMsgs(t, jsctx, msg)
time.Sleep(100 * time.Millisecond)
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
})))
@ -253,7 +256,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) {
t.Logf("waited for events to be consumed; syncing with %v", sinceTokens)
for i, since := range sinceTokens {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"since": since,
@ -302,9 +305,10 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{})
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"timeout": "0",
"set_presence": "online",
@ -417,10 +421,10 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use the actual internal roomserver API
rsAPI := roomserver.NewInternalAPI(base)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases {
testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType)
@ -444,7 +448,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
"filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
@ -484,7 +488,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
// Verify the messages after/before invite are visible or not
w = httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
})))
@ -717,10 +721,11 @@ func TestGetMembership(t *testing.T) {
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
// Use an actual roomserver for this
rsAPI := roomserver.NewInternalAPI(base)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@ -748,7 +753,7 @@ func TestGetMembership(t *testing.T) {
}
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, tc.request(t, room))
base.Routers.Client.ServeHTTP(w, tc.request(t, room))
if w.Code != 200 && tc.wantOK {
t.Logf("%s", w.Body.String())
t.Fatalf("got HTTP %d want %d", w.Code, 200)
@ -786,8 +791,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{})
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches)
producer := producers.SyncAPIProducer{
TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
@ -885,7 +890,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Execute a /sync request, recording the response
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": alice.AccessToken,
"since": tc.since,
})))
@ -1003,10 +1008,11 @@ func testContext(t *testing.T, dbType test.DBType) {
defer baseClose()
// Use an actual roomserver for this
rsAPI := roomserver.NewInternalAPI(base)
caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(base, caches)
rsAPI.SetFederationAPI(nil, nil)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI)
AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches)
room := test.NewRoom(t, user)
@ -1049,7 +1055,7 @@ func testContext(t *testing.T, dbType test.DBType) {
params[k] = v
}
}
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
if tc.wantError && w.Code == 200 {
t.Fatalf("Expected an error, but got none")
@ -1139,7 +1145,7 @@ func TestUpdateRelations(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
base, shutdownBase := testrig.CreateBaseDendrite(t, dbType)
t.Cleanup(shutdownBase)
db, err := storage.NewSyncServerDatasource(base, &base.Cfg.SyncAPI.Database)
db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database)
if err != nil {
t.Fatal(err)
}
@ -1178,7 +1184,7 @@ func syncUntil(t *testing.T,
go func() {
for {
w := httptest.NewRecorder()
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{
"access_token": accessToken,
"timeout": "1000",
})))