From 45bcca2cb634d4df6c04ac6c29f580d40273a6fe Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 16 Apr 2021 12:52:46 +0100 Subject: [PATCH] Try to filter event types/state keys at the database level instead of in Go --- roomserver/storage/postgres/events_table.go | 21 +++++---- roomserver/storage/shared/storage.go | 18 ++++---- roomserver/storage/sqlite3/events_table.go | 49 ++++++++++++--------- roomserver/storage/tables/interface.go | 2 +- 4 files changed, 47 insertions(+), 43 deletions(-) diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 02c3446e..cc1422e0 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "fmt" + "sort" "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" @@ -91,6 +92,10 @@ const bulkSelectStateEventByIDSQL = "" + const bulkSelectStateEventByNIDSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + " WHERE event_nid = ANY($1)" + + //" AND event_type_nid = ANY($2)" + + //" AND event_state_key_nid = ANY($3)" + + " AND ($2::bigint[] IS NULL OR event_type_nid = ANY($2))" + + " AND ($3::bigint[] IS NULL OR event_state_key_nid = ANY($3))" + " ORDER BY event_type_nid, event_state_key_nid ASC" const bulkSelectStateAtEventByIDSQL = "" + @@ -249,8 +254,12 @@ func (s *eventStatements) BulkSelectStateEventByID( // If any of the requested events are missing from the database it returns a types.MissingEventError func (s *eventStatements) BulkSelectStateEventByNID( ctx context.Context, eventNIDs []types.EventNID, + stateKeyTuples []types.StateKeyTuple, ) ([]types.StateEntry, error) { - rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs)) + tuples := stateKeyTupleSorter(stateKeyTuples) + sort.Sort(tuples) + eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays() + rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs), eventTypeNIDArray, eventStateKeyNIDArray) if err != nil { return nil, err } @@ -274,16 +283,6 @@ func (s *eventStatements) BulkSelectStateEventByNID( if err = rows.Err(); err != nil { return nil, err } - if i != len(eventNIDs) { - // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. - // We don't know which ones were missing because we don't return the string IDs in the query. - // However it should be possible debug this by replaying queries or entries from the input kafka logs. - // If this turns out to be impossible and we do need the debug information here, it would be better - // to do it as a separate query rather than slowing down/complicating the internal case. - return nil, types.MissingEventError( - fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventNIDs)), - ) - } return results, nil } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 14c3d1a4..03c3f43b 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -126,16 +126,16 @@ func (d *Database) StateEntriesForTuples( } lists := []types.StateEntryList{} for i, entry := range entries { - entries, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry) + entries, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry, stateKeyTuples) if err != nil { return nil, fmt.Errorf("d.EventsTable.BulkSelectStateEventByNID: %w", err) } - if len(stateKeyTuples) == 0 { - lists = append(lists, types.StateEntryList{ - StateBlockNID: stateBlockNIDs[i], - StateEntries: entries, - }) - } else { + //if len(stateKeyTuples) == 0 { + lists = append(lists, types.StateEntryList{ + StateBlockNID: stateBlockNIDs[i], + StateEntries: entries, + }) + /*} else { eventTypes := map[types.EventTypeNID]struct{}{} stateKeys := map[types.EventStateKeyNID]struct{}{} for _, t := range stateKeyTuples { @@ -154,7 +154,7 @@ func (d *Database) StateEntriesForTuples( StateBlockNID: stateBlockNIDs[i], StateEntries: filteredEntries, }) - } + }*/ } return lists, nil } @@ -281,7 +281,7 @@ func (d *Database) StateEntries( } lists := []types.StateEntryList{} for i, entry := range entries { - eventNIDs, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry) + eventNIDs, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry, nil) if err != nil { return nil, fmt.Errorf("d.EventsTable.BulkSelectStateEventByNID: %w", err) } diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index 52dbe600..420a4845 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -20,6 +20,7 @@ import ( "database/sql" "encoding/json" "fmt" + "sort" "strings" "github.com/matrix-org/dendrite/internal" @@ -65,8 +66,8 @@ const bulkSelectStateEventByIDSQL = "" + const bulkSelectStateEventByNIDSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + - " WHERE event_nid IN ($1)" + - " ORDER BY event_type_nid, event_state_key_nid ASC" + " WHERE event_nid IN ($1)" + // Rest of query is built by BulkSelectStateEventByNID const bulkSelectStateAtEventByIDSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" + @@ -241,22 +242,36 @@ func (s *eventStatements) BulkSelectStateEventByID( // If any of the requested events are missing from the database it returns a types.MissingEventError func (s *eventStatements) BulkSelectStateEventByNID( ctx context.Context, eventNIDs []types.EventNID, + stateKeyTuples []types.StateKeyTuple, ) ([]types.StateEntry, error) { - /////////////// - iEventIDs := make([]interface{}, len(eventNIDs)) - for k, v := range eventNIDs { - iEventIDs[k] = v + tuples := stateKeyTupleSorter(stateKeyTuples) + sort.Sort(tuples) + eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays() + params := make([]interface{}, 0, len(eventNIDs)+len(eventTypeNIDArray)+len(eventStateKeyNIDArray)) + selectOrig := strings.Replace(bulkSelectStateEventByNIDSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1) + for _, v := range eventNIDs { + params = append(params, v) } - selectOrig := strings.Replace(bulkSelectStateEventByNIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1) + if len(eventTypeNIDArray) > 0 { + selectOrig += " AND event_type_nid IN " + sqlutil.QueryVariadicOffset(len(eventTypeNIDArray), len(params)) + for _, v := range eventTypeNIDArray { + params = append(params, v) + } + } + if len(eventStateKeyNIDArray) > 0 { + selectOrig += " AND event_state_key_nid IN " + sqlutil.QueryVariadicOffset(len(eventStateKeyNIDArray), len(params)) + for _, v := range eventStateKeyNIDArray { + params = append(params, v) + } + } + selectOrig += " ORDER BY event_type_nid, event_state_key_nid ASC" selectStmt, err := s.db.Prepare(selectOrig) if err != nil { - return nil, err + return nil, fmt.Errorf("s.db.Prepare: %w", err) } - /////////////// - - rows, err := selectStmt.QueryContext(ctx, iEventIDs...) + rows, err := selectStmt.QueryContext(ctx, params...) if err != nil { - return nil, err + return nil, fmt.Errorf("selectStmt.QueryContext: %w", err) } defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed") // We know that we will only get as many results as event IDs @@ -275,16 +290,6 @@ func (s *eventStatements) BulkSelectStateEventByNID( return nil, err } } - if i != len(eventNIDs) { - // If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have. - // We don't know which ones were missing because we don't return the string IDs in the query. - // However it should be possible debug this by replaying queries or entries from the input kafka logs. - // If this turns out to be impossible and we do need the debug information here, it would be better - // to do it as a separate query rather than slowing down/complicating the internal case. - return nil, types.MissingEventError( - fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventNIDs)), - ) - } return results, err } diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index 62d481e1..dd486873 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -43,7 +43,7 @@ type Events interface { // bulkSelectStateEventByID lookups a list of state events by event ID. // If any of the requested events are missing from the database it returns a types.MissingEventError BulkSelectStateEventByID(ctx context.Context, eventIDs []string) ([]types.StateEntry, error) - BulkSelectStateEventByNID(ctx context.Context, eventNIDs []types.EventNID) ([]types.StateEntry, error) + BulkSelectStateEventByNID(ctx context.Context, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error) // BulkSelectStateAtEventByID lookups the state at a list of events by event ID. // If any of the requested events are missing from the database it returns a types.MissingEventError. // If we do not have the state for any of the requested events it returns a types.MissingEventError.