Slowly begin to unpick storage functions

This commit is contained in:
Kegan Dougal 2020-09-07 17:45:01 +01:00
parent c992f4f1f4
commit 2e1374057b
10 changed files with 99 additions and 195 deletions

View file

@ -32,7 +32,7 @@ func CheckAuthEvents(
authEventIDs []string,
) ([]types.EventNID, error) {
// Grab the numeric IDs for the supplied auth state events from the database.
authStateEntries, err := db.StateEntriesForEventIDs(ctx, authEventIDs)
authStateEntries, err := db.StateAtEventIDs(ctx, authEventIDs)
if err != nil {
return nil, err
}
@ -130,7 +130,7 @@ func loadAuthEvents(
ctx context.Context,
db storage.Database,
needed gomatrixserverlib.StateNeeded,
state []types.StateEntry,
state []types.StateAtEvent,
) (result authEvents, err error) {
// Look up the numeric IDs for the state keys needed for auth.
var neededStateKeys []string
@ -141,7 +141,7 @@ func loadAuthEvents(
}
// Load the events we need.
result.state = state
result.state = StateEntries(state)
var eventNIDs []types.EventNID
keyTuplesNeeded := stateKeyTuplesNeeded(result.stateKeyNIDMap, needed)
for _, keyTuple := range keyTuplesNeeded {

View file

@ -377,3 +377,11 @@ func QueryLatestEventsAndState(
return nil
}
func StateEntries(input []types.StateAtEvent) []types.StateEntry {
output := make([]types.StateEntry, len(input))
for i := range input {
output[i] = input[i].StateEntry
}
return output
}

View file

@ -165,12 +165,12 @@ func (r *Inputer) calculateAndSetState(
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
var entries []types.StateAtEvent
if entries, err = r.DB.StateAtEventIDs(ctx, input.StateEventIDs); err != nil {
return err
}
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, helpers.StateEntries(entries)); err != nil {
return err
}
} else {

View file

@ -125,12 +125,12 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to find state IDs for event which passed auth checks")
continue
}
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs); err != nil {
var entries []types.StateAtEvent
if entries, err = r.DB.StateAtEventIDs(ctx, stateIDs); err != nil {
// attempt to fetch the missing events
r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs)
// try again
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs)
entries, err = r.DB.StateAtEventIDs(ctx, stateIDs)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to get state entries for event")
return err
@ -138,7 +138,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
}
var beforeStateSnapshotNID types.StateSnapshotNID
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, helpers.StateEntries(entries)); err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist state entries to get snapshot nid")
return err
}

View file

@ -86,15 +86,15 @@ func (v StateResolution) LoadStateAtSnapshot(
func (v StateResolution) LoadStateAtEvent(
ctx context.Context, eventID string,
) ([]types.StateEntry, error) {
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
stateAtEvents, err := v.db.StateAtEventIDs(ctx, []string{eventID})
if err != nil {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID failed for event %s : %s", eventID, err)
}
if snapshotNID == 0 {
if len(stateAtEvents) == 0 || stateAtEvents[0].BeforeStateSnapshotNID == 0 {
return nil, fmt.Errorf("LoadStateAtEvent.SnapshotNIDFromEventID(%s) returned 0 NID, was this event stored?", eventID)
}
stateEntries, err := v.LoadStateAtSnapshot(ctx, snapshotNID)
stateEntries, err := v.LoadStateAtSnapshot(ctx, stateAtEvents[0].BeforeStateSnapshotNID)
if err != nil {
return nil, err
}

View file

@ -24,11 +24,57 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
// Do we support processing input events for more than one room at a time?
SupportsConcurrentRoomInputs() bool
// RoomInfo returns room information for the given room ID, or nil if there is no room.
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
// Lookup maps IDs to NIDs
type Lookup interface {
// Look up the numeric IDs for a list of string event types.
// Returns a map from string event type to numeric ID for the event type.
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
// Look up the numeric IDs for a list of string event state keys.
// Returns a map from string state key to numeric ID for the state key.
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
// Look up the numeric IDs for a list of events.
// Returns an error if there was a problem talking to the database.
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
}
// ReverseLookup maps NIDs to IDs
type ReverseLookup interface {
// Lookup the event IDs for a batch of event numeric IDs.
// Returns an error if the retrieval went wrong.
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
// Look up the string event state keys for a list of numeric event state keys
// Returns an error if there was a problem talking to the database.
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
}
// Alias contains room alias manipulations
type Alias interface {
// Save a given room alias with the room ID it refers to.
// Returns an error if there was a problem talking to the database.
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error
// Look up the room ID a given alias refers to.
// Returns an error if there was a problem talking to the database.
GetRoomIDForAlias(ctx context.Context, alias string) (string, error)
// Look up all aliases referring to a given room ID.
// Returns an error if there was a problem talking to the database.
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
// Get the user ID of the creator of an alias.
// Returns an error if there was a problem talking to the database.
GetCreatorIDForAlias(ctx context.Context, alias string) (string, error)
// Remove a given room alias.
// Returns an error if there was a problem talking to the database.
RemoveRoomAlias(ctx context.Context, alias string) error
}
// PublishedRooms contains room publishing manipulations
type PublishedRooms interface {
// Publish or unpublish a room from the room directory.
PublishRoom(ctx context.Context, roomID string, publish bool) error
// Returns a list of room IDs for rooms which are published.
GetPublishedRooms(ctx context.Context) ([]string, error)
}
type State interface {
// Store the room state at an event in the database
AddState(
ctx context.Context,
@ -41,12 +87,6 @@ type Database interface {
// The length of []types.StateAtEvent is guaranteed to equal the length of eventIDs if no error is returned.
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(ctx context.Context, eventIDs []string) ([]types.StateAtEvent, error)
// Look up the numeric IDs for a list of string event types.
// Returns a map from string event type to numeric ID for the event type.
EventTypeNIDs(ctx context.Context, eventTypes []string) (map[string]types.EventTypeNID, error)
// Look up the numeric IDs for a list of string event state keys.
// Returns a map from string state key to numeric ID for the state key.
EventStateKeyNIDs(ctx context.Context, eventStateKeys []string) (map[string]types.EventStateKeyNID, error)
// Look up the numeric state data IDs for each numeric state snapshot ID
// The returned slice is sorted by numeric state snapshot ID.
StateBlockNIDs(ctx context.Context, stateNIDs []types.StateSnapshotNID) ([]types.StateBlockNIDList, error)
@ -62,30 +102,36 @@ type Database interface {
stateBlockNIDs []types.StateBlockNID,
stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntryList, error)
// Set the state at an event. FIXME TODO: "at"
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
}
type Events interface {
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
// not found.
// Returns an error if the retrieval went wrong.
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
// Look up the Events for a list of numeric event IDs.
// Returns a sorted list of events.
Events(ctx context.Context, eventNIDs []types.EventNID) ([]types.Event, error)
// Look up snapshot NID for an event ID string
SnapshotNIDFromEventID(ctx context.Context, eventID string) (types.StateSnapshotNID, error)
// Stores a matrix room event in the database. Returns the room NID, the state snapshot and the redacted event ID if any, or an error.
StoreEvent(
ctx context.Context, event gomatrixserverlib.Event, txnAndSessionID *api.TransactionID, authEventNIDs []types.EventNID,
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
// Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
// Returns a types.MissingEventError if the event IDs aren't in the database.
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
// Look up the string event state keys for a list of numeric event state keys
// Returns an error if there was a problem talking to the database.
EventStateKeys(ctx context.Context, eventStateKeyNIDs []types.EventStateKeyNID) (map[types.EventStateKeyNID]string, error)
// Look up the numeric IDs for a list of events.
// Returns an error if there was a problem talking to the database.
EventNIDs(ctx context.Context, eventIDs []string) (map[string]types.EventNID, error)
// Set the state at an event. FIXME TODO: "at"
SetState(ctx context.Context, eventNID types.EventNID, stateNID types.StateSnapshotNID) error
// Lookup the event IDs for a batch of event numeric IDs.
// Returns an error if the retrieval went wrong.
EventIDs(ctx context.Context, eventNIDs []types.EventNID) (map[types.EventNID]string, error)
}
// Database contains all database functions including helpers
type Database interface {
Lookup
ReverseLookup
Alias
PublishedRooms
State
Events
// Do we support processing input events for more than one room at a time?
SupportsConcurrentRoomInputs() bool
// RoomInfo returns room information for the given room ID, or nil if there is no room.
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
// Look up the latest events in a room in preparation for an update.
// The RoomRecentEventsUpdater must have Commit or Rollback called on it if this doesn't return an error.
// Returns the latest events in the room and the last eventID sent to the log along with an updater.
@ -103,21 +149,6 @@ type Database interface {
// numeric state key IDs for the user IDs who sent them along with the event IDs for the invites.
// Returns an error if there was a problem talking to the database.
GetInvitesForUser(ctx context.Context, roomNID types.RoomNID, targetUserNID types.EventStateKeyNID) (senderUserIDs []types.EventStateKeyNID, eventIDs []string, err error)
// Save a given room alias with the room ID it refers to.
// Returns an error if there was a problem talking to the database.
SetRoomAlias(ctx context.Context, alias string, roomID string, creatorUserID string) error
// Look up the room ID a given alias refers to.
// Returns an error if there was a problem talking to the database.
GetRoomIDForAlias(ctx context.Context, alias string) (string, error)
// Look up all aliases referring to a given room ID.
// Returns an error if there was a problem talking to the database.
GetAliasesForRoomID(ctx context.Context, roomID string) ([]string, error)
// Get the user ID of the creator of an alias.
// Returns an error if there was a problem talking to the database.
GetCreatorIDForAlias(ctx context.Context, alias string) (string, error)
// Remove a given room alias.
// Returns an error if there was a problem talking to the database.
RemoveRoomAlias(ctx context.Context, alias string) error
// Build a membership updater for the target user in a room.
MembershipUpdater(ctx context.Context, roomID, targetUserID string, targetLocal bool, roomVersion gomatrixserverlib.RoomVersion) (*shared.MembershipUpdater, error)
// Lookup the membership of a given user in a given room.
@ -131,14 +162,6 @@ type Database interface {
// joinOnly is set to true.
// Returns an error if there was a problem talking to the database.
GetMembershipEventNIDsForRoom(ctx context.Context, roomNID types.RoomNID, joinOnly bool, localOnly bool) ([]types.EventNID, error)
// EventsFromIDs looks up the Events for a list of event IDs. Does not error if event was
// not found.
// Returns an error if the retrieval went wrong.
EventsFromIDs(ctx context.Context, eventIDs []string) ([]types.Event, error)
// Publish or unpublish a room from the room directory.
PublishRoom(ctx context.Context, roomID string, publish bool) error
// Returns a list of room IDs for rooms which are published.
GetPublishedRooms(ctx context.Context) ([]string, error)
// TODO: factor out - from currentstateserver

View file

@ -79,17 +79,10 @@ const insertEventSQL = "" +
const selectEventSQL = "" +
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
// Bulk lookup of events by string ID.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id = ANY($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateAtEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
" WHERE event_id = ANY($1)"
" WHERE event_id = ANY($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const updateEventStateSQL = "" +
"UPDATE roomserver_events SET state_snapshot_nid = $2 WHERE event_nid = $1"
@ -125,7 +118,6 @@ const selectRoomNIDForEventNIDSQL = "" +
type eventStatements struct {
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
@ -149,7 +141,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
return s, shared.StatementList{
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL},
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
@ -193,48 +184,6 @@ func (s *eventStatements) SelectEvent(
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
func (s *eventStatements) BulkSelectStateEventByID(
ctx context.Context, eventIDs []string,
) ([]types.StateEntry, error) {
rows, err := s.bulkSelectStateEventByIDStmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
// We know that we will only get as many results as event IDs
// because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make([]types.StateEntry, len(eventIDs))
i := 0
for ; rows.Next(); i++ {
result := &results[i]
if err = rows.Scan(
&result.EventTypeNID,
&result.EventStateKeyNID,
&result.EventNID,
); err != nil {
return nil, err
}
}
if err = rows.Err(); err != nil {
return nil, err
}
if i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
// If this turns out to be impossible and we do need the debug information here, it would be better
// to do it as a separate query rather than slowing down/complicating the internal case.
return nil, types.MissingEventError(
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
)
}
return results, nil
}
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.

View file

@ -106,12 +106,6 @@ func (d *Database) EventStateKeyNIDs(
return result, nil
}
func (d *Database) StateEntriesForEventIDs(
ctx context.Context, eventIDs []string,
) ([]types.StateEntry, error) {
return d.EventsTable.BulkSelectStateEventByID(ctx, eventIDs)
}
func (d *Database) StateEntriesForTuples(
ctx context.Context,
stateBlockNIDs []types.StateBlockNID,
@ -173,13 +167,6 @@ func (d *Database) StateAtEventIDs(
return d.EventsTable.BulkSelectStateAtEventByID(ctx, eventIDs)
}
func (d *Database) SnapshotNIDFromEventID(
ctx context.Context, eventID string,
) (types.StateSnapshotNID, error) {
_, stateNID, err := d.EventsTable.SelectEvent(ctx, nil, eventID)
return stateNID, err
}
func (d *Database) EventIDs(
ctx context.Context, eventNIDs []types.EventNID,
) (map[types.EventNID]string, error) {

View file

@ -54,17 +54,10 @@ const insertEventSQL = `
const selectEventSQL = "" +
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
// Bulk lookup of events by string ID.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id IN ($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const bulkSelectStateAtEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid FROM roomserver_events" +
" WHERE event_id IN ($1)"
" WHERE event_id IN ($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
const updateEventStateSQL = "" +
"UPDATE roomserver_events SET state_snapshot_nid = $1 WHERE event_nid = $2"
@ -101,7 +94,6 @@ type eventStatements struct {
db *sql.DB
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
@ -126,7 +118,6 @@ func NewSqliteEventsTable(db *sql.DB) (tables.Events, error) {
return s, shared.StatementList{
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL},
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
@ -179,57 +170,6 @@ func (s *eventStatements) SelectEvent(
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
func (s *eventStatements) BulkSelectStateEventByID(
ctx context.Context, eventIDs []string,
) ([]types.StateEntry, error) {
///////////////
iEventIDs := make([]interface{}, len(eventIDs))
for k, v := range eventIDs {
iEventIDs[k] = v
}
selectOrig := strings.Replace(bulkSelectStateEventByIDSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
selectStmt, err := s.db.Prepare(selectOrig)
if err != nil {
return nil, err
}
///////////////
rows, err := selectStmt.QueryContext(ctx, iEventIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
// We know that we will only get as many results as event IDs
// because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make([]types.StateEntry, len(eventIDs))
i := 0
for ; rows.Next(); i++ {
result := &results[i]
if err = rows.Scan(
&result.EventTypeNID,
&result.EventStateKeyNID,
&result.EventNID,
); err != nil {
return nil, err
}
}
if i != len(eventIDs) {
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
// If this turns out to be impossible and we do need the debug information here, it would be better
// to do it as a separate query rather than slowing down/complicating the internal case.
return nil, types.MissingEventError(
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
)
}
return results, err
}
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.

View file

@ -36,9 +36,6 @@ type EventStateKeys interface {
type Events interface {
InsertEvent(c context.Context, txn *sql.Tx, i types.RoomNID, j types.EventTypeNID, k types.EventStateKeyNID, eventID string, referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64) (types.EventNID, types.StateSnapshotNID, error)
SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error)
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
BulkSelectStateEventByID(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
// BulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.