From 810dd7842ce1ee4ac79edfaced959959eb756f29 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 28 Dec 2020 16:12:04 -0600 Subject: [PATCH] Filter out events after the leave event --- syncapi/storage/shared/syncserver.go | 88 ++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 11 deletions(-) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e65ee7c6..0f8e09cd 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -937,6 +937,7 @@ func (d *Database) getJoinResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + jr = types.NewJoinResponse() jr.Timeline.PrevBatch = prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) @@ -952,23 +953,88 @@ func (d *Database) getLeaveResponseForCompleteSync( filter *gomatrixserverlib.Filter, device userapi.Device, ) (lr *types.LeaveResponse, err error) { - jr, err := d.getJoinResponseForCompleteSync( - ctx, - txn, - roomID, - r, - filter, - device, + var stateEvents []*gomatrixserverlib.HeaderedEvent + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, &filter.Room.State) + if err != nil { + return + } + + numRecentEventsPerRoom := filter.Room.Timeline.Limit + + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []types.StreamEvent + var limited bool + recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents( + ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { return } + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + leaveEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if membership == gomatrixserverlib.Join { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + } + } + break + } else if membership == gomatrixserverlib.Leave { + leaveEventIndex = i - 1 + } + + if joinEventIndex != -1 && leaveEventIndex != -1 { + break + } + } + } + // We can assume that if the person joined, they also left and will only show the events between (inclusive) + if joinEventIndex != -1 && leaveEventIndex != -1 { + // cut all events earlier than the join (but not the join itself) + // and cut all events after the person left + recentStreamEvents = recentStreamEvents[joinEventIndex:leaveEventIndex] + limited = false // so clients know not to try to backpaginate + } + + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatchStr string + if len(recentStreamEvents) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID()) + if err != nil { + return + } + prevBatch := types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } + prevBatch.Decrement() + prevBatchStr = prevBatch.String() + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: + // "Can sync a room with a message with a transaction id" - which does a complete sync to check. + recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + lr = types.NewLeaveResponse() - lr.Timeline.PrevBatch = jr.Timeline.PrevBatch - lr.Timeline.Events = jr.Timeline.Events - lr.Timeline.Limited = jr.Timeline.Limited - lr.State.Events = jr.State.Events + lr.Timeline.PrevBatch = prevBatchStr + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = limited + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) return lr, nil }