diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 95360840..6c51a2b7 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) type PDUStreamProvider struct { @@ -146,7 +147,7 @@ func (p *PDUStreamProvider) IncrementalSync( } for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil { + if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Filter.Room.Timeline.Limit, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") return newPos } @@ -179,6 +180,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( if err != nil { return err } + + logrus.WithFields(logrus.Fields{ + "limited": limited, + "delta.stateEvents": len(delta.StateEvents), + "recentStreamEvents": len(recentStreamEvents), + }).Info("isync addRoomDeltaToResponse removeDuplicates") + recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) @@ -250,7 +258,44 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( return } - recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited) + events := make([]string, len(recentStreamEvents)) + for i, v := range recentStreamEvents { + events[i] = string(v.HeaderedEvent.Event.JSON()) + } + + logrus.WithFields(logrus.Fields{ + "filter.Room.Timeline.Limit": filter.Room.Timeline.Limit, + "recentStreamEvents": fmt.Sprintf("%+v", events), + }).Info("getResponseForCompleteSync") + + //recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited) + + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if membership == "join" { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + break + } + } + break + } + } + } + if joinEventIndex != -1 { + // cut all events earlier than the join (but not the join itself) + recentStreamEvents = recentStreamEvents[joinEventIndex:] + limited = false // so clients know not to try to backpaginate + } // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. @@ -382,6 +427,17 @@ func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility( sliceEnd = leaveEventIndex } + events := make([]string, len(recentStreamEvents)) + for i, v := range recentStreamEvents { + events[i] = string(v.HeaderedEvent.Event.JSON()) + } + + logrus.WithFields(logrus.Fields{ + "sliceStart": sliceStart, + "sliceEnd": sliceEnd, + "before recentStreamEvents": fmt.Sprintf("%+v", events), + }).Info("cutting down the events") + return recentStreamEvents[sliceStart:sliceEnd], limited } diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index 3a1d4d4a..96d0bfa0 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -30,7 +30,7 @@ import ( ) const defaultSyncTimeout = time.Duration(0) -const DefaultTimelineLimit = 20 +const defaultTimelineLimit = 20 func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) @@ -76,16 +76,15 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } limit := filter.Room.Timeline.Limit if limit == 0 { - limit = DefaultTimelineLimit + filter.Room.Timeline.Limit = defaultTimelineLimit } - // TODO: Additional query params: set_presence, filter + // TODO: Additional query params: set_presence logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ "user_id": device.UserID, "device_id": device.ID, "since": since, "timeout": timeout, - "limit": limit, }) return &types.SyncRequest{ @@ -96,7 +95,6 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat Filter: filter, // Since: since, // Timeout: timeout, // - Limit: limit, // Rooms: make(map[string]string), // Populated by the PDU stream WantFullState: wantFullState, // }, nil diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go index 164e59a8..93ed1266 100644 --- a/syncapi/types/provider.go +++ b/syncapi/types/provider.go @@ -16,7 +16,6 @@ type SyncRequest struct { Response *Response Filter gomatrixserverlib.Filter Since StreamingToken - Limit int Timeout time.Duration WantFullState bool