From 0231199cc8a47d3300709b13a81130ffaf6db27d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 13 Jan 2021 10:24:11 +0000 Subject: [PATCH] Tweaks --- syncapi/streams/stream_pdu.go | 82 +++++++++++++---------------------- syncapi/sync/request.go | 26 ++++++----- 2 files changed, 45 insertions(+), 63 deletions(-) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index e597295a..5dd7c717 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -2,6 +2,7 @@ package streams import ( "context" + "fmt" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -52,7 +53,7 @@ func (p *PDUStreamProvider) CompleteSync( for _, roomID := range joinedRoomIDs { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &req.Filter, req.Limit, req.Device, + ctx, roomID, r, &req.Filter, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -72,7 +73,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &req.Filter, req.Limit, req.Device, + ctx, peek.RoomID, r, &req.Filter, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -193,22 +194,23 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( switch delta.Membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.RoomID] = *jr + case gomatrixserverlib.Peek: jr := types.NewJoinResponse() - jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) res.Rooms.Peek[delta.RoomID] = *jr + case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban + case gomatrixserverlib.Ban: // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. @@ -223,15 +225,16 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return nil } -func (p *PDUStreamProvider) getJoinResponseForCompleteSync( +func (p *PDUStreamProvider) getResponseForCompleteSync( ctx context.Context, roomID string, r types.Range, filter *gomatrixserverlib.Filter, - numRecentEventsPerRoom int, device *userapi.Device, -) (jr *types.JoinResponse, err error) { - var stateEvents []*gomatrixserverlib.HeaderedEvent +) ( + recentEvents, stateEvents []*gomatrixserverlib.HeaderedEvent, + prevBatch *types.TopologyToken, limited bool, err error, +) { stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State) if err != nil { return @@ -239,9 +242,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - var limited bool recentStreamEvents, limited, err = p.DB.RecentEvents( - ctx, roomID, r, numRecentEventsPerRoom, true, true, + ctx, roomID, r, filter.Room.Timeline.Limit, true, true, ) if err != nil { return @@ -251,7 +253,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. - var prevBatch *types.TopologyToken if len(recentStreamEvents) > 0 { var backwardTopologyPos, backwardStreamPos types.StreamPosition backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID()) @@ -268,8 +269,23 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( // We don't include a device here as we don't need to send down // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: // "Can sync a room with a message with a transaction id" - which does a complete sync to check. - recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) + recentEvents = p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) + return +} + +func (p *PDUStreamProvider) getJoinResponseForCompleteSync( + ctx context.Context, + roomID string, + r types.Range, + filter *gomatrixserverlib.Filter, + device *userapi.Device, +) (jr *types.JoinResponse, err error) { + recentEvents, stateEvents, prevBatch, limited, err := p.getResponseForCompleteSync(ctx, roomID, r, filter, device) + if err != nil { + return nil, fmt.Errorf("p.getResponseForCompleteSync: %w", err) + } + jr = types.NewJoinResponse() jr.Timeline.PrevBatch = prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) @@ -285,49 +301,11 @@ func (p *PDUStreamProvider) getLeaveResponseForCompleteSync( filter *gomatrixserverlib.Filter, device *userapi.Device, ) (lr *types.LeaveResponse, err error) { - var stateEvents []*gomatrixserverlib.HeaderedEvent - stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State) + recentEvents, stateEvents, prevBatch, limited, err := p.getResponseForCompleteSync(ctx, roomID, r, filter, device) if err != nil { - return + return nil, fmt.Errorf("p.getResponseForCompleteSync: %w", err) } - numRecentEventsPerRoom := filter.Room.Timeline.Limit - - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - var limited bool - recentStreamEvents, limited, err = p.DB.RecentEvents( - ctx, roomID, r, numRecentEventsPerRoom, true, true, - ) - if err != nil { - return - } - - recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, device, limited) - - // Retrieve the backward topology position, i.e. the position of the - // oldest event in the room's topology. - var prevBatch *types.TopologyToken - if len(recentStreamEvents) > 0 { - var backwardTopologyPos, backwardStreamPos types.StreamPosition - backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID()) - if err != nil { - return - } - prevBatch = &types.TopologyToken{ - Depth: backwardTopologyPos, - PDUPosition: backwardStreamPos, - } - prevBatch.Decrement() - } - - // We don't include a device here as we don't need to send down - // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: - // "Can sync a room with a message with a transaction id" - which does a complete sync to check. - recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) - stateEvents = removeDuplicates(stateEvents, recentEvents) - lr = types.NewLeaveResponse() lr.Timeline.PrevBatch = prevBatch lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 2b368b09..3a1d4d4a 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -74,6 +74,10 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat if f != nil { filter = *f } + limit := filter.Room.Timeline.Limit + if limit == 0 { + limit = DefaultTimelineLimit + } // TODO: Additional query params: set_presence, filter logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ @@ -81,20 +85,20 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat "device_id": device.ID, "since": since, "timeout": timeout, - "limit": filter.Room.Timeline.Limit, + "limit": limit, }) return &types.SyncRequest{ - Context: req.Context(), // - Log: logger, // - Device: &device, // - Response: types.NewResponse(), // Populated by all streams - Filter: filter, // - Since: since, // - Timeout: timeout, // - Limit: filter.Room.Timeline.Limit, // - Rooms: make(map[string]string), // Populated by the PDU stream - WantFullState: wantFullState, // + Context: req.Context(), // + Log: logger, // + Device: &device, // + Response: types.NewResponse(), // Populated by all streams + Filter: filter, // + Since: since, // + Timeout: timeout, // + Limit: limit, // + Rooms: make(map[string]string), // Populated by the PDU stream + WantFullState: wantFullState, // }, nil }