Handle state with input event as new events (#1415)

* SendEventWithState events as new

* Use cumulative state IDs for final event

* Error wrapping in calculateAndSetState

* Handle overwriting same event type and state key

* Hacky way to spot historical events

* Don't exclude from sync

* Don't generate output events when rewriting forward extremities

* Update output event check

* Historical output events

* Define output room event type

* Notify key changes on state

* Don't send our membership event twice

* Deduplicate state entries

* Tweaks

* Remove unnecessary nolint

* Fix current state upsert in sync API

* Send auth events as outliers, state events as rewrite

* Sync API don't consume state events

* Process events actually

* Improve outlier check

* Fix local room check

* Remove extra room check, it seems to break the whole damn world

* Fix federated join check

* Fix nil pointer exception

* Better comments on DeduplicateStateEntries

* Reflow forced federated joins

* Don't force federated join for possibly even local invites

* Comment SendEventWithState better

* Rewrite room state in sync API storage

* Add TODO

* Clean up all room data when receiving create event

* Don't generate output events for rewrites, but instead notify that state is rewritten on the final new event

* Rename to PurgeRoom

* Exclude backfilled messages from /sync

* Split out rewriting state from updating state from state res

Co-authored-by: Kegan Dougal <kegan@matrix.org>
This commit is contained in:
Neil Alexander 2020-09-15 11:17:46 +01:00 committed by GitHub
parent 8dc9506210
commit 965f068d1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 616 additions and 30 deletions

View file

@ -36,7 +36,7 @@ func CheckAuthEvents(
if err != nil {
return nil, err
}
// TODO: check for duplicate state keys here.
authStateEntries = types.DeduplicateStateEntries(authStateEntries)
// Work out which of the state events we actually need.
stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event.Unwrap()})

View file

@ -86,7 +86,7 @@ func (r *Inputer) processRoomEvent(
"event_id": event.EventID(),
"type": event.Type(),
"room": event.RoomID(),
}).Info("Stored outlier")
}).Debug("Stored outlier")
return event.EventID(), nil
}
@ -107,6 +107,15 @@ func (r *Inputer) processRoomEvent(
}
}
if input.Kind == api.KindRewrite {
logrus.WithFields(logrus.Fields{
"event_id": event.EventID(),
"type": event.Type(),
"room": event.RoomID(),
}).Debug("Stored rewrite")
return event.EventID(), nil
}
if err = r.updateLatestEvents(
ctx, // context
roomInfo, // room info for the room being updated
@ -114,6 +123,7 @@ func (r *Inputer) processRoomEvent(
event, // event
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
); err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
}
@ -167,19 +177,25 @@ func (r *Inputer) calculateAndSetState(
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
return err
return fmt.Errorf("r.DB.StateEntriesForEventIDs: %w", err)
}
entries = types.DeduplicateStateEntries(entries)
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
return err
return fmt.Errorf("r.DB.AddState: %w", err)
}
} else {
stateAtEvent.Overwrite = false
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event); err != nil {
return err
return fmt.Errorf("roomState.CalculateAndStoreStateBeforeEvent: %w", err)
}
}
return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
err = r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
if err != nil {
return fmt.Errorf("r.DB.SetState: %w", err)
}
return nil
}

View file

@ -54,6 +54,7 @@ func (r *Inputer) updateLatestEvents(
event gomatrixserverlib.Event,
sendAsServer string,
transactionID *api.TransactionID,
rewritesState bool,
) (err error) {
updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil {
@ -71,6 +72,7 @@ func (r *Inputer) updateLatestEvents(
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
rewritesState: rewritesState,
}
if err = u.doUpdateLatestEvents(); err != nil {
@ -93,6 +95,7 @@ type latestEventsUpdater struct {
stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event
transactionID *api.TransactionID
rewritesState bool
// Which server to send this event as.
sendAsServer string
// The eventID of the event that was processed before this one.
@ -178,7 +181,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return fmt.Errorf("u.api.updateMemberships: %w", err)
}
update, err := u.makeOutputNewRoomEvent()
var update *api.OutputEvent
update, err = u.makeOutputNewRoomEvent()
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
@ -305,6 +309,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
ore := api.OutputNewRoomEvent{
Event: u.event.Headered(u.roomInfo.RoomVersion),
RewritesState: u.rewritesState,
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,
@ -337,6 +342,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}
// State is rewritten if the input room event HasState and we actually produced a delta on state events.
// Without this check, /get_missing_events which produce events with associated (but not complete) state
// will incorrectly purge the room and set it to no state. TODO: This is likely flakey, as if /gme produced
// a state conflict res which just so happens to include 2+ events we might purge the room state downstream.
ore.RewritesState = len(ore.AddsStateEventIDs) > 1
return &api.OutputEvent{
Type: api.OutputTypeNewRoomEvent,

View file

@ -183,33 +183,33 @@ func (r *Joiner) performJoinRoomByID(
return "", fmt.Errorf("eb.SetContent: %w", err)
}
// First work out if this is in response to an existing invite
// from a federated server. If it is then we avoid the situation
// where we might think we know about a room in the following
// section but don't know the latest state as all of our users
// have left.
// Force a federated join if we aren't in the room and we've been
// given some server names to try joining by.
serverInRoom, _ := helpers.IsServerCurrentlyInRoom(ctx, r.DB, r.ServerName, req.RoomIDOrAlias)
forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom
// Force a federated join if we're dealing with a pending invite
// and we aren't in the room.
isInvitePending, inviteSender, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
if err == nil && isInvitePending && !serverInRoom {
// Check if there's an invite pending.
if err == nil && isInvitePending {
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
if ierr != nil {
return "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
// Check that the domain isn't ours. If it's local then we don't
// need to do anything as our own copy of the room state will be
// up-to-date.
// If we were invited by someone from another server then we can
// assume they are in the room so we can join via them.
if inviterDomain != r.Cfg.Matrix.ServerName {
// Add the server of the person who invited us to the server list,
// as they should be a fairly good bet.
req.ServerNames = append(req.ServerNames, inviterDomain)
// Perform a federated room join.
return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
forceFederatedJoin = true
}
}
// If we should do a forced federated join then do that.
if forceFederatedJoin {
return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
}
// Try to construct an actual join event from the template.
// If this succeeds then it is a sign that the room already exists
// locally on the homeserver.