mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Add sync API db tests (#3043)
Co-authored-by: kegsay <kegan@matrix.org>
This commit is contained in:
parent
f66862958d
commit
9fa39263c0
5 changed files with 160 additions and 96 deletions
|
@ -136,15 +136,6 @@ FROM room_ids,
|
||||||
) AS x
|
) AS x
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectEarlyEventsSQL = "" +
|
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
|
||||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
|
||||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
|
||||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
|
||||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
|
||||||
" ORDER BY id ASC LIMIT $8"
|
|
||||||
|
|
||||||
const selectMaxEventIDSQL = "" +
|
const selectMaxEventIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||||
|
|
||||||
|
@ -206,7 +197,6 @@ type outputRoomEventsStatements struct {
|
||||||
selectMaxEventIDStmt *sql.Stmt
|
selectMaxEventIDStmt *sql.Stmt
|
||||||
selectRecentEventsStmt *sql.Stmt
|
selectRecentEventsStmt *sql.Stmt
|
||||||
selectRecentEventsForSyncStmt *sql.Stmt
|
selectRecentEventsForSyncStmt *sql.Stmt
|
||||||
selectEarlyEventsStmt *sql.Stmt
|
|
||||||
selectStateInRangeFilteredStmt *sql.Stmt
|
selectStateInRangeFilteredStmt *sql.Stmt
|
||||||
selectStateInRangeStmt *sql.Stmt
|
selectStateInRangeStmt *sql.Stmt
|
||||||
updateEventJSONStmt *sql.Stmt
|
updateEventJSONStmt *sql.Stmt
|
||||||
|
@ -262,7 +252,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
||||||
{&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
|
{&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
|
||||||
{&s.selectRecentEventsStmt, selectRecentEventsSQL},
|
{&s.selectRecentEventsStmt, selectRecentEventsSQL},
|
||||||
{&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
|
{&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
|
||||||
{&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
|
|
||||||
{&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL},
|
{&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL},
|
||||||
{&s.selectStateInRangeStmt, selectStateInRangeSQL},
|
{&s.selectStateInRangeStmt, selectStateInRangeSQL},
|
||||||
{&s.updateEventJSONStmt, updateEventJSONSQL},
|
{&s.updateEventJSONStmt, updateEventJSONSQL},
|
||||||
|
@ -530,39 +519,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
return result, rows.Err()
|
return result, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectEarlyEvents returns the earliest events in the given room, starting
|
|
||||||
// from a given position, up to a maximum of 'limit'.
|
|
||||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
roomID string, r types.Range, eventFilter *synctypes.RoomEventFilter,
|
|
||||||
) ([]types.StreamEvent, error) {
|
|
||||||
senders, notSenders := getSendersRoomEventFilter(eventFilter)
|
|
||||||
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
|
|
||||||
rows, err := stmt.QueryContext(
|
|
||||||
ctx, roomID, r.Low(), r.High(),
|
|
||||||
pq.StringArray(senders),
|
|
||||||
pq.StringArray(notSenders),
|
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
|
|
||||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
|
|
||||||
eventFilter.Limit,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEarlyEvents: rows.close() failed")
|
|
||||||
events, err := rowsToStreamEvents(rows)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// The events need to be returned from oldest to latest, which isn't
|
|
||||||
// necessarily the way the SQL query returns them, so a sort is necessary to
|
|
||||||
// ensure the events are in the right order in the slice.
|
|
||||||
sort.SliceStable(events, func(i int, j int) bool {
|
|
||||||
return events[i].StreamPosition < events[j].StreamPosition
|
|
||||||
})
|
|
||||||
return events, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectEvents returns the events for the given event IDs. If an event is
|
// selectEvents returns the events for the given event IDs. If an event is
|
||||||
// missing from the database, it will be omitted.
|
// missing from the database, it will be omitted.
|
||||||
func (s *outputRoomEventsStatements) SelectEvents(
|
func (s *outputRoomEventsStatements) SelectEvents(
|
||||||
|
|
103
syncapi/storage/shared/storage_consumer_test.go
Normal file
103
syncapi/storage/shared/storage_consumer_test.go
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package shared_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/dendrite/test"
|
||||||
|
"github.com/matrix-org/dendrite/test/testrig"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newSyncDB(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
|
||||||
|
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
|
||||||
|
syncDB, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create sync DB: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return syncDB, closeDB
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilterTable(t *testing.T) {
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
tab, closeDB := newSyncDB(t, dbType)
|
||||||
|
defer closeDB()
|
||||||
|
|
||||||
|
// initially create a filter
|
||||||
|
filter := &synctypes.Filter{}
|
||||||
|
filterID, err := tab.PutFilter(context.Background(), "alice", filter)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the same filter again, we should receive the existing filter
|
||||||
|
secondFilterID, err := tab.PutFilter(context.Background(), "alice", filter)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if secondFilterID != filterID {
|
||||||
|
t.Fatalf("expected second filter to be the same as the first: %s vs %s", filterID, secondFilterID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// query the filter again
|
||||||
|
targetFilter := &synctypes.Filter{}
|
||||||
|
if err = tab.GetFilter(context.Background(), targetFilter, "alice", filterID); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(filter, targetFilter) {
|
||||||
|
t.Fatalf("%#v vs %#v", filter, targetFilter)
|
||||||
|
}
|
||||||
|
|
||||||
|
// query non-existent filter
|
||||||
|
if err = tab.GetFilter(context.Background(), targetFilter, "bob", filterID); err == nil {
|
||||||
|
t.Fatalf("expected filter to not exist, but it does exist: %v", targetFilter)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIgnores(t *testing.T) {
|
||||||
|
alice := test.NewUser(t)
|
||||||
|
bob := test.NewUser(t)
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
syncDB, closeDB := newSyncDB(t, dbType)
|
||||||
|
defer closeDB()
|
||||||
|
|
||||||
|
tab, err := syncDB.NewDatabaseTransaction(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer tab.Rollback() // nolint: errcheck
|
||||||
|
|
||||||
|
ignoredUsers := &types.IgnoredUsers{List: map[string]interface{}{
|
||||||
|
bob.ID: "",
|
||||||
|
}}
|
||||||
|
if err = tab.UpdateIgnoresForUser(context.Background(), alice.ID, ignoredUsers); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotIgnoredUsers, err := tab.IgnoresForUser(context.Background(), alice.ID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify the ignored users matches those we stored
|
||||||
|
if !reflect.DeepEqual(gotIgnoredUsers, ignoredUsers) {
|
||||||
|
t.Fatalf("%#v vs %#v", gotIgnoredUsers, ignoredUsers)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bob doesn't have any ignored users, so should receive sql.ErrNoRows
|
||||||
|
if _, err = tab.IgnoresForUser(context.Background(), bob.ID); err == nil {
|
||||||
|
t.Fatalf("expected an error but got none")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -29,7 +29,6 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
@ -82,12 +81,6 @@ const selectRecentEventsForSyncSQL = "" +
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
const selectEarlyEventsSQL = "" +
|
|
||||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
|
||||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
|
||||||
|
|
||||||
const selectMaxEventIDSQL = "" +
|
const selectMaxEventIDSQL = "" +
|
||||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||||
|
|
||||||
|
@ -119,7 +112,7 @@ const selectContextAfterEventSQL = "" +
|
||||||
|
|
||||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||||
|
|
||||||
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC"
|
const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type IN ($2)"
|
||||||
|
|
||||||
const purgeEventsSQL = "" +
|
const purgeEventsSQL = "" +
|
||||||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||||
|
@ -430,42 +423,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
roomID string, r types.Range, eventFilter *synctypes.RoomEventFilter,
|
|
||||||
) ([]types.StreamEvent, error) {
|
|
||||||
stmt, params, err := prepareWithFilters(
|
|
||||||
s.db, txn, selectEarlyEventsSQL,
|
|
||||||
[]interface{}{
|
|
||||||
roomID, r.Low(), r.High(),
|
|
||||||
},
|
|
||||||
eventFilter.Senders, eventFilter.NotSenders,
|
|
||||||
eventFilter.Types, eventFilter.NotTypes,
|
|
||||||
nil, eventFilter.ContainsURL, eventFilter.Limit, FilterOrderAsc,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
|
||||||
}
|
|
||||||
defer internal.CloseAndLogIfError(ctx, stmt, "SelectEarlyEvents: stmt.close() failed")
|
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, params...)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer internal.CloseAndLogIfError(ctx, rows, "selectEarlyEvents: rows.close() failed")
|
|
||||||
events, err := rowsToStreamEvents(rows)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// The events need to be returned from oldest to latest, which isn't
|
|
||||||
// necessarily the way the SQL query returns them, so a sort is necessary to
|
|
||||||
// ensure the events are in the right order in the slice.
|
|
||||||
sort.SliceStable(events, func(i int, j int) bool {
|
|
||||||
return events[i].StreamPosition < events[j].StreamPosition
|
|
||||||
})
|
|
||||||
return events, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// selectEvents returns the events for the given event IDs. If an event is
|
// selectEvents returns the events for the given event IDs. If an event is
|
||||||
// missing from the database, it will be omitted.
|
// missing from the database, it will be omitted.
|
||||||
func (s *outputRoomEventsStatements) SelectEvents(
|
func (s *outputRoomEventsStatements) SelectEvents(
|
||||||
|
@ -686,18 +643,18 @@ func (s *outputRoomEventsStatements) PurgeEvents(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
|
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
|
||||||
params := make([]interface{}, len(types))
|
params := make([]interface{}, len(types)+1)
|
||||||
|
params[0] = afterID
|
||||||
for i := range types {
|
for i := range types {
|
||||||
params[i] = types[i]
|
params[i+1] = types[i]
|
||||||
}
|
}
|
||||||
params = append(params, afterID)
|
|
||||||
params = append(params, limit)
|
|
||||||
selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
|
|
||||||
|
|
||||||
stmt, err := s.db.Prepare(selectSQL)
|
selectSQL := strings.Replace(selectSearchSQL, "($2)", sqlutil.QueryVariadicOffset(len(types), 1), 1)
|
||||||
|
stmt, params, err := prepareWithFilters(s.db, txn, selectSQL, params, nil, nil, nil, nil, nil, nil, int(limit), FilterOrderAsc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
|
defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
|
||||||
rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, params...)
|
rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, params...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -68,8 +68,6 @@ type Events interface {
|
||||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||||
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
||||||
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
|
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
|
||||||
// SelectEarlyEvents returns the earliest events in the given room.
|
|
||||||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *synctypes.RoomEventFilter) ([]types.StreamEvent, error)
|
|
||||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
|
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
|
||||||
UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
|
UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
|
||||||
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
|
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
|
||||||
|
|
|
@ -104,3 +104,53 @@ func TestOutputRoomEventsTable(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReindex(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
alice := test.NewUser(t)
|
||||||
|
room := test.NewRoom(t, alice)
|
||||||
|
|
||||||
|
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomName, map[string]interface{}{
|
||||||
|
"name": "my new room name",
|
||||||
|
}, test.WithStateKey(""))
|
||||||
|
|
||||||
|
room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomTopic, map[string]interface{}{
|
||||||
|
"topic": "my new room topic",
|
||||||
|
}, test.WithStateKey(""))
|
||||||
|
|
||||||
|
room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{
|
||||||
|
"msgbody": "my room message",
|
||||||
|
"type": "m.text",
|
||||||
|
})
|
||||||
|
|
||||||
|
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||||
|
tab, db, close := newOutputRoomEventsTable(t, dbType)
|
||||||
|
defer close()
|
||||||
|
err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
|
||||||
|
for _, ev := range room.Events() {
|
||||||
|
_, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to InsertEvent: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
events, err := tab.ReIndex(ctx, nil, 10, 0, []string{
|
||||||
|
gomatrixserverlib.MRoomName,
|
||||||
|
gomatrixserverlib.MRoomTopic,
|
||||||
|
"m.room.message"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantEventCount := 3
|
||||||
|
if len(events) != wantEventCount {
|
||||||
|
t.Fatalf("expected %d events, got %d", wantEventCount, len(events))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue