From c1bc3df1e0f55d82a9c5e176978e2426bc775202 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 11 Jan 2021 09:42:06 +0000 Subject: [PATCH] Update onOldRoomEvent --- syncapi/consumers/roomserver.go | 15 +++++---------- syncapi/notifier/notifier.go | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 1d47b73a..8333991a 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -207,10 +207,10 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( ctx, ev, []*gomatrixserverlib.HeaderedEvent{}, - []string{}, // adds no state - []string{}, // removes no state - nil, // no transaction - ev.StateKey() != nil, // exclude from sync? + []string{}, // adds no state + []string{}, // removes no state + nil, // no transaction + true, // exclude from sync? ) if err != nil { // panic rather than continue with an inconsistent database @@ -221,13 +221,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( return nil } - if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil { - log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) - return err - } - s.pduStream.Advance(pduPos) - s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) + s.notifier.OnOldEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos}) return nil } diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index d853cc0e..7804f0f2 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -124,6 +124,21 @@ func (n *Notifier) OnNewEvent( } } +// OnOldEvent is called when the sync API receives an "old" event, e.g. +// because it was pulled in by get_missing_events or similar. We need +// to update the stream position, but we don't bother waking up any +// clients because the event will be stored as excluded from sync anyway. +func (n *Notifier) OnOldEvent( + ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, + posUpdate types.StreamingToken, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.currPos.ApplyUpdates(posUpdate) + n.removeEmptyUserStreams() +} + func (n *Notifier) OnNewAccountData( userID string, posUpdate types.StreamingToken, ) {