From e145c2c5064dca4ff44d2c2affdfd5cf8a940772 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 27 Jul 2021 13:17:36 +0100 Subject: [PATCH] Move PDU stream functions to storage package to reuse snapshot transaction --- syncapi/storage/interface.go | 17 +- syncapi/storage/shared/sync_pdu.go | 272 ++++++++++++++++++++++ syncapi/storage/shared/syncserver.go | 23 +- syncapi/streams/stream_pdu.go | 322 +++------------------------ 4 files changed, 322 insertions(+), 312 deletions(-) create mode 100644 syncapi/storage/shared/sync_pdu.go diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 9cff4cad..25e64b83 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -29,6 +29,21 @@ import ( type Database interface { internal.PartitionStorer + PDUCompleteSync( + ctx context.Context, + req *types.SyncRequest, + joinedRoomIDs []string, + r types.Range, + stateFilter *gomatrixserverlib.StateFilter, + eventFilter *gomatrixserverlib.RoomEventFilter, + ) error + PDUIncrementalSync( + ctx context.Context, + req *types.SyncRequest, + r types.Range, + from, to types.StreamPosition, + ) error + MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) @@ -36,8 +51,6 @@ type Database interface { MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) - GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) - GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) diff --git a/syncapi/storage/shared/sync_pdu.go b/syncapi/storage/shared/sync_pdu.go new file mode 100644 index 00000000..c199c31c --- /dev/null +++ b/syncapi/storage/shared/sync_pdu.go @@ -0,0 +1,272 @@ +package shared + +import ( + "context" + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +func (d *Database) PDUCompleteSync( + ctx context.Context, + req *types.SyncRequest, + joinedRoomIDs []string, + r types.Range, + stateFilter *gomatrixserverlib.StateFilter, + eventFilter *gomatrixserverlib.RoomEventFilter, +) error { + txn, err := d.readOnlySnapshot(ctx) + if err != nil { + return fmt.Errorf("d.readOnlySnapshot: %w", err) + } + var succeeded bool + defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) + + for _, roomID := range joinedRoomIDs { + jr, err := d.getJoinResponseForCompleteSync( + ctx, txn, roomID, r, stateFilter, eventFilter, req.WantFullState, req.Device, + ) + if err != nil { + return fmt.Errorf("d.getJoinResponseForCompleteSync: %w", err) + } + + req.Response.Rooms.Join[roomID] = *jr + req.Rooms[roomID] = gomatrixserverlib.Join + } + + return nil +} + +func (d *Database) PDUIncrementalSync( + ctx context.Context, + req *types.SyncRequest, + r types.Range, + from, to types.StreamPosition, +) error { + txn, err := d.readOnlySnapshot(ctx) + if err != nil { + return fmt.Errorf("d.readOnlySnapshot: %w", err) + } + var succeeded bool + defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) + + var stateDeltas []types.StateDelta + var joinedRooms []string + + stateFilter := req.Filter.Room.State + eventFilter := req.Filter.Room.Timeline + + if req.WantFullState { + if stateDeltas, joinedRooms, err = d.getStateDeltasForFullStateSync(ctx, txn, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") + return fmt.Errorf("d.GetStateDeltasForFullStateSync: %w", err) + } + } else { + if stateDeltas, joinedRooms, err = d.getStateDeltas(ctx, txn, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") + return nil + } + } + + for _, roomID := range joinedRooms { + req.Rooms[roomID] = gomatrixserverlib.Join + } + + for _, delta := range stateDeltas { + if err = d.addRoomDeltaToResponse(ctx, txn, req.Device, r, delta, &eventFilter, req.Response); err != nil { + req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") + return nil + } + } + + return nil +} + +func (d *Database) addRoomDeltaToResponse( + ctx context.Context, + txn *sql.Tx, + device *userapi.Device, + r types.Range, + delta types.StateDelta, + eventFilter *gomatrixserverlib.RoomEventFilter, + res *types.Response, +) error { + if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + r.To = delta.MembershipPos + } + recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents( + ctx, txn, delta.RoomID, r, + eventFilter, true, true, + ) + if err != nil { + return err + } + recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) + delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back + prevBatch, err := d.GetBackwardTopologyPos(ctx, recentStreamEvents) + if err != nil { + return err + } + + // XXX: should we ever get this far if we have no recent events or state in this room? + // in practice we do for peeks, but possibly not joins? + if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { + return nil + } + + switch delta.Membership { + case gomatrixserverlib.Join: + jr := types.NewJoinResponse() + jr.Timeline.PrevBatch = &prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.RoomID] = *jr + + case gomatrixserverlib.Peek: + jr := types.NewJoinResponse() + jr.Timeline.PrevBatch = &prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Peek[delta.RoomID] = *jr + + case gomatrixserverlib.Leave: + fallthrough // transitions to leave are the same as ban + + case gomatrixserverlib.Ban: + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.PrevBatch = &prevBatch + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.RoomID] = *lr + } + + return nil +} + +func (d *Database) getJoinResponseForCompleteSync( + ctx context.Context, + txn *sql.Tx, + roomID string, + r types.Range, + stateFilter *gomatrixserverlib.StateFilter, + eventFilter *gomatrixserverlib.RoomEventFilter, + wantFullState bool, + device *userapi.Device, +) (jr *types.JoinResponse, err error) { + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents( + ctx, txn, roomID, r, eventFilter, true, true, + ) + if err != nil { + return + } + + // Get the event IDs of the stream events we fetched. There's no point in us + var excludingEventIDs []string + if !wantFullState { + excludingEventIDs = make([]string, 0, len(recentStreamEvents)) + for _, event := range recentStreamEvents { + if event.StateKey() != nil { + excludingEventIDs = append(excludingEventIDs, event.EventID()) + } + } + } + + stateEvents, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter, excludingEventIDs) + if err != nil { + return + } + + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if membership == "join" { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + break + } + } + break + } + } + } + if joinEventIndex != -1 { + // cut all events earlier than the join (but not the join itself) + recentStreamEvents = recentStreamEvents[joinEventIndex:] + limited = false // so clients know not to try to backpaginate + } + + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatch *types.TopologyToken + if len(recentStreamEvents) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = d.PositionInTopology(ctx, recentStreamEvents[0].EventID()) + if err != nil { + return + } + prevBatch = &types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } + prevBatch.Decrement() + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: + // "Can sync a room with a message with a transaction id" - which does a complete sync to check. + recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr = types.NewJoinResponse() + jr.Timeline.PrevBatch = prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + return jr, nil +} + +func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { + for _, recentEv := range recentEvents { + if recentEv.StateKey() == nil { + continue // not a state event + } + // TODO: This is a linear scan over all the current state events in this room. This will + // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) + // then do a binary search to find matching events, similar to what roomserver does. + for j := 0; j < len(stateEvents); j++ { + if stateEvents[j].EventID() == recentEv.EventID() { + // overwrite the element to remove with the last element then pop the last element. + // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering + // (we don't care about the order of stateEvents) + stateEvents[j] = stateEvents[len(stateEvents)-1] + stateEvents = stateEvents[:len(stateEvents)-1] + break // there shouldn't be multiple events with the same event ID + } + } + } + return stateEvents +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index b8271877..49c29257 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -661,8 +661,8 @@ func (d *Database) fetchMissingStateEvents( // exclusive of oldPos, inclusive of newPos, for the rooms in which // the user has new membership events. // A list of joined room IDs is also returned in case the caller needs it. -func (d *Database) GetStateDeltas( - ctx context.Context, device *userapi.Device, +func (d *Database) getStateDeltas( + ctx context.Context, txn *sql.Tx, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StateDelta, []string, error) { @@ -674,12 +674,6 @@ func (d *Database) GetStateDeltas( // * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block. // * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block. // - Get all CURRENTLY joined rooms, and add them to 'joined' block. - txn, err := d.readOnlySnapshot(ctx) - if err != nil { - return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) - } - var succeeded bool - defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) var deltas []types.StateDelta @@ -764,7 +758,6 @@ func (d *Database) GetStateDeltas( }) } - succeeded = true return deltas, joinedRoomIDs, nil } @@ -772,18 +765,11 @@ func (d *Database) GetStateDeltas( // requests with full_state=true. // Fetches full state for all joined rooms and uses selectStateInRange to get // updates for other rooms. -func (d *Database) GetStateDeltasForFullStateSync( - ctx context.Context, device *userapi.Device, +func (d *Database) getStateDeltasForFullStateSync( + ctx context.Context, txn *sql.Tx, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StateDelta, []string, error) { - txn, err := d.readOnlySnapshot(ctx) - if err != nil { - return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err) - } - var succeeded bool - defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err) - // Use a reasonable initial capacity deltas := make(map[string]types.StateDelta) @@ -860,7 +846,6 @@ func (d *Database) GetStateDeltasForFullStateSync( i++ } - succeeded = true return result, joinedRoomIDs, nil } diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1486ad3c..a333c1b0 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -2,54 +2,17 @@ package streams import ( "context" - "sync" - "time" "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" - "go.uber.org/atomic" ) -// The max number of per-room goroutines to have running. -// Too high and this will consume lots of CPU, too low and complete -// sync responses will take longer to process. -const PDU_STREAM_WORKERS = 256 - -// The maximum number of tasks that can be queued in total before -// backpressure will build up and the rests will start to block. -const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8 - type PDUStreamProvider struct { StreamProvider - - tasks chan func() - workers atomic.Int32 -} - -func (p *PDUStreamProvider) worker() { - defer p.workers.Dec() - for { - select { - case f := <-p.tasks: - f() - case <-time.After(time.Second * 10): - return - } - } -} - -func (p *PDUStreamProvider) queue(f func()) { - if p.workers.Load() < PDU_STREAM_WORKERS { - p.workers.Inc() - go p.worker() - } - p.tasks <- f } func (p *PDUStreamProvider) Setup() { p.StreamProvider.Setup() - p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE) p.latestMutex.Lock() defer p.latestMutex.Unlock() @@ -84,55 +47,40 @@ func (p *PDUStreamProvider) CompleteSync( return from } + // Add peeked rooms. + /* + peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) + if err != nil { + req.Log.WithError(err).Error("p.DB.PeeksInRange failed") + return from + } + */ + stateFilter := req.Filter.Room.State eventFilter := req.Filter.Room.Timeline - // Build up a /sync response. Add joined rooms. - var reqMutex sync.Mutex - var reqWaitGroup sync.WaitGroup - reqWaitGroup.Add(len(joinedRoomIDs)) - for _, room := range joinedRoomIDs { - roomID := room - p.queue(func() { - defer reqWaitGroup.Done() - - var jr *types.JoinResponse - jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, - ) - if err != nil { - req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return - } - - reqMutex.Lock() - defer reqMutex.Unlock() - req.Response.Rooms.Join[roomID] = *jr - req.Rooms[roomID] = gomatrixserverlib.Join - }) - } - - reqWaitGroup.Wait() - - // Add peeked rooms. - peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) - if err != nil { - req.Log.WithError(err).Error("p.DB.PeeksInRange failed") + if err := p.DB.PDUCompleteSync(ctx, req, joinedRoomIDs, r, &stateFilter, &eventFilter); err != nil { + req.Log.WithError(err).Error("p.DB.PDUCompleteSync failed") return from } - for _, peek := range peeks { - if !peek.Deleted { - var jr *types.JoinResponse - jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, - ) - if err != nil { - req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") - return from - } - req.Response.Rooms.Peek[peek.RoomID] = *jr + + /* + for _, peek := range peeks { + p.queue(func() { + if !peek.Deleted { + var jr *types.JoinResponse + jr, err = p.getJoinResponseForCompleteSync( + ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, + ) + if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") + return + } + req.Response.Rooms.Peek[peek.RoomID] = *jr + } + }) } - } + */ return to } @@ -149,218 +97,10 @@ func (p *PDUStreamProvider) IncrementalSync( } newPos = to - var err error - var stateDeltas []types.StateDelta - var joinedRooms []string - - stateFilter := req.Filter.Room.State - eventFilter := req.Filter.Room.Timeline - - if req.WantFullState { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { - req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") - return - } - } else { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { - req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") - return - } - } - - for _, roomID := range joinedRooms { - req.Rooms[roomID] = gomatrixserverlib.Join - } - - for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { - req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return newPos - } + if err := p.DB.PDUIncrementalSync(ctx, req, r, from, to); err != nil { + req.Log.WithError(err).Error("p.DB.PDUIncrementalSync failed") + return from } return r.To } - -func (p *PDUStreamProvider) addRoomDeltaToResponse( - ctx context.Context, - device *userapi.Device, - r types.Range, - delta types.StateDelta, - eventFilter *gomatrixserverlib.RoomEventFilter, - res *types.Response, -) error { - if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { - // make sure we don't leak recent events after the leave event. - // TODO: History visibility makes this somewhat complex to handle correctly. For example: - // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). - // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave - // in a single /sync request - // This is all "okay" assuming history_visibility == "shared" which it is by default. - r.To = delta.MembershipPos - } - recentStreamEvents, limited, err := p.DB.RecentEvents( - ctx, delta.RoomID, r, - eventFilter, true, true, - ) - if err != nil { - return err - } - recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) - delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back - prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) - if err != nil { - return err - } - - // XXX: should we ever get this far if we have no recent events or state in this room? - // in practice we do for peeks, but possibly not joins? - if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { - return nil - } - - switch delta.Membership { - case gomatrixserverlib.Join: - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Join[delta.RoomID] = *jr - - case gomatrixserverlib.Peek: - jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = &prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Peek[delta.RoomID] = *jr - - case gomatrixserverlib.Leave: - fallthrough // transitions to leave are the same as ban - - case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. - lr := types.NewLeaveResponse() - lr.Timeline.PrevBatch = &prevBatch - lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true - lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) - res.Rooms.Leave[delta.RoomID] = *lr - } - - return nil -} - -func (p *PDUStreamProvider) getJoinResponseForCompleteSync( - ctx context.Context, - roomID string, - r types.Range, - stateFilter *gomatrixserverlib.StateFilter, - eventFilter *gomatrixserverlib.RoomEventFilter, - wantFullState bool, - device *userapi.Device, -) (jr *types.JoinResponse, err error) { - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - recentStreamEvents, limited, err := p.DB.RecentEvents( - ctx, roomID, r, eventFilter, true, true, - ) - if err != nil { - return - } - - // Get the event IDs of the stream events we fetched. There's no point in us - var excludingEventIDs []string - if !wantFullState { - excludingEventIDs = make([]string, 0, len(recentStreamEvents)) - for _, event := range recentStreamEvents { - if event.StateKey() != nil { - excludingEventIDs = append(excludingEventIDs, event.EventID()) - } - } - } - - stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs) - if err != nil { - return - } - - // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the - // user shouldn't see, we check the recent events and remove any prior to the join event of the user - // which is equiv to history_visibility: joined - joinEventIndex := -1 - for i := len(recentStreamEvents) - 1; i >= 0; i-- { - ev := recentStreamEvents[i] - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { - membership, _ := ev.Membership() - if membership == "join" { - joinEventIndex = i - if i > 0 { - // the create event happens before the first join, so we should cut it at that point instead - if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { - joinEventIndex = i - 1 - break - } - } - break - } - } - } - if joinEventIndex != -1 { - // cut all events earlier than the join (but not the join itself) - recentStreamEvents = recentStreamEvents[joinEventIndex:] - limited = false // so clients know not to try to backpaginate - } - - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatch *types.TopologyToken - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID()) - if err != nil { - return - } - prevBatch = &types.TopologyToken{ - Depth: backwardTopologyPos, - PDUPosition: backwardStreamPos, - } - prevBatch.Decrement() - } - - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: - // "Can sync a room with a message with a transaction id" - which does a complete sync to check. - recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) - stateEvents = removeDuplicates(stateEvents, recentEvents) - jr = types.NewJoinResponse() - jr.Timeline.PrevBatch = prevBatch - jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = limited - jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) - return jr, nil -} - -func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { - for _, recentEv := range recentEvents { - if recentEv.StateKey() == nil { - continue // not a state event - } - // TODO: This is a linear scan over all the current state events in this room. This will - // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) - // then do a binary search to find matching events, similar to what roomserver does. - for j := 0; j < len(stateEvents); j++ { - if stateEvents[j].EventID() == recentEv.EventID() { - // overwrite the element to remove with the last element then pop the last element. - // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering - // (we don't care about the order of stateEvents) - stateEvents[j] = stateEvents[len(stateEvents)-1] - stateEvents = stateEvents[:len(stateEvents)-1] - break // there shouldn't be multiple events with the same event ID - } - } - } - return stateEvents -}