Refactor StoreEvent and create a new RoomDatabase interface (#2985)

This PR changes a few things:
- It pulls out the creation of several NIDs from the `StoreEvent`
function to make the functions more reusable
- Uses more caching when using those NIDs to avoid DB round trips
This commit is contained in:
Till 2023-02-24 09:40:20 +01:00 committed by GitHub
parent e6aa0955ff
commit ad07b169b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 459 additions and 302 deletions

View file

@ -76,7 +76,7 @@ type Inputer struct {
Cfg *config.RoomServer
Base *base.BaseDendrite
ProcessContext *process.ProcessContext
DB storage.Database
DB storage.RoomDatabase
NATSClient *nats.Conn
JetStream nats.JetStreamContext
Durable nats.SubOpt

View file

@ -308,10 +308,10 @@ func (r *Inputer) processRoomEvent(
}
var softfail bool
if input.Kind == api.KindNew {
if input.Kind == api.KindNew && !isCreateEvent {
// Check that the event passes authentication checks based on the
// current room state.
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, roomInfo, headered, input.StateEventIDs)
if err != nil {
logger.WithError(err).Warn("Error authing soft-failed event")
}
@ -322,8 +322,8 @@ func (r *Inputer) processRoomEvent(
// bother doing this if the event was already rejected as it just ends up
// burning CPU time.
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected {
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, input, missingPrev)
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !isCreateEvent {
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, roomInfo.RoomNID, input, missingPrev)
if err != nil {
return fmt.Errorf("r.processStateBefore: %w", err)
}
@ -332,8 +332,23 @@ func (r *Inputer) processRoomEvent(
}
}
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, event)
if err != nil {
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
}
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, event.Type())
if err != nil {
return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err)
}
eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey())
if err != nil {
return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err)
}
// Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
if err != nil {
return fmt.Errorf("updater.StoreEvent: %w", err)
}
@ -474,6 +489,7 @@ func (r *Inputer) handleRemoteRoomUpgrade(ctx context.Context, event *gomatrixse
// nolint:nakedret
func (r *Inputer) processStateBefore(
ctx context.Context,
roomNID types.RoomNID,
input *api.InputRoomEvent,
missingPrev bool,
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
@ -489,7 +505,7 @@ func (r *Inputer) processStateBefore(
case input.HasState:
// If we're overriding the state then we need to go and retrieve
// them from the database. It's a hard error if they are missing.
stateEvents, err := r.DB.EventsFromIDs(ctx, input.StateEventIDs)
stateEvents, err := r.DB.EventsFromIDs(ctx, roomNID, input.StateEventIDs)
if err != nil {
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
}
@ -567,6 +583,7 @@ func (r *Inputer) processStateBefore(
// we've failed to retrieve the auth chain altogether (in which case
// an error is returned) or we've successfully retrieved them all and
// they are now in the database.
// nolint: gocyclo
func (r *Inputer) fetchAuthEvents(
ctx context.Context,
logger *logrus.Entry,
@ -587,7 +604,7 @@ func (r *Inputer) fetchAuthEvents(
}
for _, authEventID := range authEventIDs {
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
authEvents, err := r.DB.EventsFromIDs(ctx, roomInfo.RoomNID, []string{authEventID})
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
unknown[authEventID] = struct{}{}
continue
@ -673,8 +690,23 @@ nextAuthEvent:
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
}
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, authEvent)
if err != nil {
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
}
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, authEvent.Type())
if err != nil {
return fmt.Errorf("r.DB.GetOrCreateEventTypeNID: %w", err)
}
eventStateKeyNID, err := r.DB.GetOrCreateEventStateKeyNID(ctx, event.StateKey())
if err != nil {
return fmt.Errorf("r.DB.GetOrCreateEventStateKeyNID: %w", err)
}
// Finally, store the event in the database.
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
eventNID, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
if err != nil {
return fmt.Errorf("updater.StoreEvent: %w", err)
}
@ -750,7 +782,7 @@ func (r *Inputer) kickGuests(ctx context.Context, event *gomatrixserverlib.Event
return err
}
memberEvents, err := r.DB.Events(ctx, membershipNIDs)
memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, membershipNIDs)
if err != nil {
return err
}

View file

@ -53,7 +53,7 @@ func (r *Inputer) updateMemberships(
// Load the event JSON so we can look up the "membership" key.
// TODO: Maybe add a membership key to the events table so we can load that
// key without having to load the entire event JSON?
events, err := updater.Events(ctx, eventNIDs)
events, err := updater.Events(ctx, 0, eventNIDs)
if err != nil {
return nil, err
}

View file

@ -43,7 +43,7 @@ type missingStateReq struct {
log *logrus.Entry
virtualHost gomatrixserverlib.ServerName
origin gomatrixserverlib.ServerName
db storage.Database
db storage.RoomDatabase
roomInfo *types.RoomInfo
inputer *Inputer
keys gomatrixserverlib.JSONVerifier
@ -395,7 +395,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
for _, entry := range stateEntries {
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEvents, err := t.db.Events(ctx, stateEventNIDs)
stateEvents, err := t.db.Events(ctx, t.roomInfo.RoomNID, stateEventNIDs)
if err != nil {
t.log.WithError(err).Warnf("failed to load state events locally")
return nil
@ -432,7 +432,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
missingEventList = append(missingEventList, evID)
}
t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
events, err := t.db.EventsFromIDs(ctx, missingEventList)
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
if err != nil {
return nil
}
@ -702,7 +702,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
}
t.haveEventsMutex.Unlock()
events, err := t.db.EventsFromIDs(ctx, missingEventList)
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
if err != nil {
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
}
@ -844,7 +844,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
if localFirst {
// fetch from the roomserver
events, err := t.db.EventsFromIDs(ctx, []string{missingEventID})
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, []string{missingEventID})
if err != nil {
t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
} else if len(events) == 1 {