mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-01 13:52:46 +00:00
Refactor StoreEvent
, add MaybeRedactEvent
, create an EventDatabase
(#2989)
This PR changes the following: - `StoreEvent` now only stores an event (and possibly prev event), instead of also doing redactions - Adds a `MaybeRedactEvent` (pulled out from `StoreEvent`), which should be called after storing events - a few other things
This commit is contained in:
parent
1aa70b0f56
commit
6c20f8f742
34 changed files with 488 additions and 420 deletions
|
@ -24,9 +24,10 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
@ -274,8 +275,10 @@ func (r *Inputer) processRoomEvent(
|
|||
|
||||
// Check if the event is allowed by its auth events. If it isn't then
|
||||
// we consider the event to be "rejected" — it will still be persisted.
|
||||
redactAllowed := true
|
||||
if err = gomatrixserverlib.Allowed(event, &authEvents); err != nil {
|
||||
isRejected = true
|
||||
redactAllowed = false
|
||||
rejectionErr = err
|
||||
logger.WithError(rejectionErr).Warnf("Event %s not allowed by auth events", event.EventID())
|
||||
}
|
||||
|
@ -323,7 +326,7 @@ func (r *Inputer) processRoomEvent(
|
|||
// burning CPU time.
|
||||
historyVisibility := gomatrixserverlib.HistoryVisibilityShared // Default to shared.
|
||||
if input.Kind != api.KindOutlier && rejectionErr == nil && !isRejected && !isCreateEvent {
|
||||
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, roomInfo.RoomNID, input, missingPrev)
|
||||
historyVisibility, rejectionErr, err = r.processStateBefore(ctx, roomInfo, input, missingPrev)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.processStateBefore: %w", err)
|
||||
}
|
||||
|
@ -332,9 +335,11 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
}
|
||||
|
||||
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
|
||||
if roomInfo == nil {
|
||||
roomInfo, err = r.DB.GetOrCreateRoomInfo(ctx, event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomInfo: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, event.Type())
|
||||
|
@ -348,15 +353,24 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
|
||||
// Store the event.
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
eventNID, stateAtEvent, err := r.DB.StoreEvent(ctx, event, roomInfo, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// if storing this event results in it being redacted then do so.
|
||||
if !isRejected && redactedEventID == event.EventID() {
|
||||
if err = eventutil.RedactEvent(redactionEvent, event); err != nil {
|
||||
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
var (
|
||||
redactedEventID string
|
||||
redactionEvent *gomatrixserverlib.Event
|
||||
redactedEvent *gomatrixserverlib.Event
|
||||
)
|
||||
if !isRejected && !isCreateEvent {
|
||||
redactionEvent, redactedEvent, err = r.DB.MaybeRedactEvent(ctx, roomInfo, eventNID, event, redactAllowed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if redactedEvent != nil {
|
||||
redactedEventID = redactedEvent.EventID()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -489,7 +503,7 @@ func (r *Inputer) handleRemoteRoomUpgrade(ctx context.Context, event *gomatrixse
|
|||
// nolint:nakedret
|
||||
func (r *Inputer) processStateBefore(
|
||||
ctx context.Context,
|
||||
roomNID types.RoomNID,
|
||||
roomInfo *types.RoomInfo,
|
||||
input *api.InputRoomEvent,
|
||||
missingPrev bool,
|
||||
) (historyVisibility gomatrixserverlib.HistoryVisibility, rejectionErr error, err error) {
|
||||
|
@ -505,7 +519,7 @@ func (r *Inputer) processStateBefore(
|
|||
case input.HasState:
|
||||
// If we're overriding the state then we need to go and retrieve
|
||||
// them from the database. It's a hard error if they are missing.
|
||||
stateEvents, err := r.DB.EventsFromIDs(ctx, roomNID, input.StateEventIDs)
|
||||
stateEvents, err := r.DB.EventsFromIDs(ctx, roomInfo, input.StateEventIDs)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("r.DB.EventsFromIDs: %w", err)
|
||||
}
|
||||
|
@ -604,7 +618,7 @@ func (r *Inputer) fetchAuthEvents(
|
|||
}
|
||||
|
||||
for _, authEventID := range authEventIDs {
|
||||
authEvents, err := r.DB.EventsFromIDs(ctx, roomInfo.RoomNID, []string{authEventID})
|
||||
authEvents, err := r.DB.EventsFromIDs(ctx, roomInfo, []string{authEventID})
|
||||
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
||||
unknown[authEventID] = struct{}{}
|
||||
continue
|
||||
|
@ -690,9 +704,11 @@ nextAuthEvent:
|
|||
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
|
||||
}
|
||||
|
||||
roomNID, err := r.DB.GetOrCreateRoomNID(ctx, authEvent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomNID: %w", err)
|
||||
if roomInfo == nil {
|
||||
roomInfo, err = r.DB.GetOrCreateRoomInfo(ctx, authEvent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetOrCreateRoomInfo: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
eventTypeNID, err := r.DB.GetOrCreateEventTypeNID(ctx, authEvent.Type())
|
||||
|
@ -706,7 +722,7 @@ nextAuthEvent:
|
|||
}
|
||||
|
||||
// Finally, store the event in the database.
|
||||
eventNID, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, roomNID, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
eventNID, _, err := r.DB.StoreEvent(ctx, authEvent, roomInfo, eventTypeNID, eventStateKeyNID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
@ -782,7 +798,7 @@ func (r *Inputer) kickGuests(ctx context.Context, event *gomatrixserverlib.Event
|
|||
return err
|
||||
}
|
||||
|
||||
memberEvents, err := r.DB.Events(ctx, roomInfo.RoomNID, membershipNIDs)
|
||||
memberEvents, err := r.DB.Events(ctx, roomInfo, membershipNIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ func (r *Inputer) updateMemberships(
|
|||
// Load the event JSON so we can look up the "membership" key.
|
||||
// TODO: Maybe add a membership key to the events table so we can load that
|
||||
// key without having to load the entire event JSON?
|
||||
events, err := updater.Events(ctx, 0, eventNIDs)
|
||||
events, err := updater.Events(ctx, nil, eventNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -395,7 +395,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
|||
for _, entry := range stateEntries {
|
||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||
}
|
||||
stateEvents, err := t.db.Events(ctx, t.roomInfo.RoomNID, stateEventNIDs)
|
||||
stateEvents, err := t.db.Events(ctx, t.roomInfo, stateEventNIDs)
|
||||
if err != nil {
|
||||
t.log.WithError(err).Warnf("failed to load state events locally")
|
||||
return nil
|
||||
|
@ -432,7 +432,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
|
|||
missingEventList = append(missingEventList, evID)
|
||||
}
|
||||
t.log.WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo, missingEventList)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -702,7 +702,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
}
|
||||
t.haveEventsMutex.Unlock()
|
||||
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, missingEventList)
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo, missingEventList)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
|
||||
}
|
||||
|
@ -844,7 +844,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
|
|||
|
||||
if localFirst {
|
||||
// fetch from the roomserver
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo.RoomNID, []string{missingEventID})
|
||||
events, err := t.db.EventsFromIDs(ctx, t.roomInfo, []string{missingEventID})
|
||||
if err != nil {
|
||||
t.log.Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
|
||||
} else if len(events) == 1 {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue