mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-04 11:03:39 +00:00
Try to filter event types/state keys at the database level instead of in Go
This commit is contained in:
parent
6900e0f495
commit
45bcca2cb6
4 changed files with 47 additions and 43 deletions
|
@ -19,6 +19,7 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
@ -91,6 +92,10 @@ const bulkSelectStateEventByIDSQL = "" +
|
|||
const bulkSelectStateEventByNIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
||||
" WHERE event_nid = ANY($1)" +
|
||||
//" AND event_type_nid = ANY($2)" +
|
||||
//" AND event_state_key_nid = ANY($3)" +
|
||||
" AND ($2::bigint[] IS NULL OR event_type_nid = ANY($2))" +
|
||||
" AND ($3::bigint[] IS NULL OR event_state_key_nid = ANY($3))" +
|
||||
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||
|
||||
const bulkSelectStateAtEventByIDSQL = "" +
|
||||
|
@ -249,8 +254,12 @@ func (s *eventStatements) BulkSelectStateEventByID(
|
|||
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||
func (s *eventStatements) BulkSelectStateEventByNID(
|
||||
ctx context.Context, eventNIDs []types.EventNID,
|
||||
stateKeyTuples []types.StateKeyTuple,
|
||||
) ([]types.StateEntry, error) {
|
||||
rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs))
|
||||
tuples := stateKeyTupleSorter(stateKeyTuples)
|
||||
sort.Sort(tuples)
|
||||
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
||||
rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs), eventTypeNIDArray, eventStateKeyNIDArray)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -274,16 +283,6 @@ func (s *eventStatements) BulkSelectStateEventByNID(
|
|||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
|
||||
// We don't know which ones were missing because we don't return the string IDs in the query.
|
||||
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
|
||||
// If this turns out to be impossible and we do need the debug information here, it would be better
|
||||
// to do it as a separate query rather than slowing down/complicating the internal case.
|
||||
return nil, types.MissingEventError(
|
||||
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventNIDs)),
|
||||
)
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -126,16 +126,16 @@ func (d *Database) StateEntriesForTuples(
|
|||
}
|
||||
lists := []types.StateEntryList{}
|
||||
for i, entry := range entries {
|
||||
entries, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry)
|
||||
entries, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry, stateKeyTuples)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("d.EventsTable.BulkSelectStateEventByNID: %w", err)
|
||||
}
|
||||
if len(stateKeyTuples) == 0 {
|
||||
lists = append(lists, types.StateEntryList{
|
||||
StateBlockNID: stateBlockNIDs[i],
|
||||
StateEntries: entries,
|
||||
})
|
||||
} else {
|
||||
//if len(stateKeyTuples) == 0 {
|
||||
lists = append(lists, types.StateEntryList{
|
||||
StateBlockNID: stateBlockNIDs[i],
|
||||
StateEntries: entries,
|
||||
})
|
||||
/*} else {
|
||||
eventTypes := map[types.EventTypeNID]struct{}{}
|
||||
stateKeys := map[types.EventStateKeyNID]struct{}{}
|
||||
for _, t := range stateKeyTuples {
|
||||
|
@ -154,7 +154,7 @@ func (d *Database) StateEntriesForTuples(
|
|||
StateBlockNID: stateBlockNIDs[i],
|
||||
StateEntries: filteredEntries,
|
||||
})
|
||||
}
|
||||
}*/
|
||||
}
|
||||
return lists, nil
|
||||
}
|
||||
|
@ -281,7 +281,7 @@ func (d *Database) StateEntries(
|
|||
}
|
||||
lists := []types.StateEntryList{}
|
||||
for i, entry := range entries {
|
||||
eventNIDs, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry)
|
||||
eventNIDs, err := d.EventsTable.BulkSelectStateEventByNID(ctx, entry, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("d.EventsTable.BulkSelectStateEventByNID: %w", err)
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
@ -65,8 +66,8 @@ const bulkSelectStateEventByIDSQL = "" +
|
|||
|
||||
const bulkSelectStateEventByNIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
|
||||
" WHERE event_nid IN ($1)" +
|
||||
" ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||
" WHERE event_nid IN ($1)"
|
||||
// Rest of query is built by BulkSelectStateEventByNID
|
||||
|
||||
const bulkSelectStateAtEventByIDSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
|
||||
|
@ -241,22 +242,36 @@ func (s *eventStatements) BulkSelectStateEventByID(
|
|||
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||
func (s *eventStatements) BulkSelectStateEventByNID(
|
||||
ctx context.Context, eventNIDs []types.EventNID,
|
||||
stateKeyTuples []types.StateKeyTuple,
|
||||
) ([]types.StateEntry, error) {
|
||||
///////////////
|
||||
iEventIDs := make([]interface{}, len(eventNIDs))
|
||||
for k, v := range eventNIDs {
|
||||
iEventIDs[k] = v
|
||||
tuples := stateKeyTupleSorter(stateKeyTuples)
|
||||
sort.Sort(tuples)
|
||||
eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
|
||||
params := make([]interface{}, 0, len(eventNIDs)+len(eventTypeNIDArray)+len(eventStateKeyNIDArray))
|
||||
selectOrig := strings.Replace(bulkSelectStateEventByNIDSQL, "($1)", sqlutil.QueryVariadic(len(eventNIDs)), 1)
|
||||
for _, v := range eventNIDs {
|
||||
params = append(params, v)
|
||||
}
|
||||
selectOrig := strings.Replace(bulkSelectStateEventByNIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
|
||||
if len(eventTypeNIDArray) > 0 {
|
||||
selectOrig += " AND event_type_nid IN " + sqlutil.QueryVariadicOffset(len(eventTypeNIDArray), len(params))
|
||||
for _, v := range eventTypeNIDArray {
|
||||
params = append(params, v)
|
||||
}
|
||||
}
|
||||
if len(eventStateKeyNIDArray) > 0 {
|
||||
selectOrig += " AND event_state_key_nid IN " + sqlutil.QueryVariadicOffset(len(eventStateKeyNIDArray), len(params))
|
||||
for _, v := range eventStateKeyNIDArray {
|
||||
params = append(params, v)
|
||||
}
|
||||
}
|
||||
selectOrig += " ORDER BY event_type_nid, event_state_key_nid ASC"
|
||||
selectStmt, err := s.db.Prepare(selectOrig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("s.db.Prepare: %w", err)
|
||||
}
|
||||
///////////////
|
||||
|
||||
rows, err := selectStmt.QueryContext(ctx, iEventIDs...)
|
||||
rows, err := selectStmt.QueryContext(ctx, params...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("selectStmt.QueryContext: %w", err)
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
|
||||
// We know that we will only get as many results as event IDs
|
||||
|
@ -275,16 +290,6 @@ func (s *eventStatements) BulkSelectStateEventByNID(
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
|
||||
// We don't know which ones were missing because we don't return the string IDs in the query.
|
||||
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
|
||||
// If this turns out to be impossible and we do need the debug information here, it would be better
|
||||
// to do it as a separate query rather than slowing down/complicating the internal case.
|
||||
return nil, types.MissingEventError(
|
||||
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventNIDs)),
|
||||
)
|
||||
}
|
||||
return results, err
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ type Events interface {
|
|||
// bulkSelectStateEventByID lookups a list of state events by event ID.
|
||||
// If any of the requested events are missing from the database it returns a types.MissingEventError
|
||||
BulkSelectStateEventByID(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
|
||||
BulkSelectStateEventByNID(ctx context.Context, eventNIDs []types.EventNID) ([]types.StateEntry, error)
|
||||
BulkSelectStateEventByNID(ctx context.Context, eventNIDs []types.EventNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntry, error)
|
||||
// BulkSelectStateAtEventByID lookups the state at a list of events by event ID.
|
||||
// If any of the requested events are missing from the database it returns a types.MissingEventError.
|
||||
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
|
||||
|
|
Loading…
Reference in a new issue