diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index e2764fbd..380cf10b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -310,6 +310,7 @@ func (d *SyncServerDatabase) CompleteSync( // Build up a /sync response. Add joined rooms. res := types.NewResponse(pos) + var prevBatch types.StreamPosition for _, roomID := range roomIDs { var stateEvents []gomatrixserverlib.Event stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) @@ -332,11 +333,12 @@ func (d *SyncServerDatabase) CompleteSync( recentEvents := StreamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 { - jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() - } else { - jr.Timeline.PrevBatch = types.StreamPosition(1).String() + if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 { + prevBatch = 1 } + jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeStream, prevBatch, + ).String() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = true jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) @@ -466,11 +468,13 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( switch delta.membership { case "join": jr := types.NewJoinResponse() - if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 { - jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() - } else { - jr.Timeline.PrevBatch = types.StreamPosition(1).String() + var prevBatch types.StreamPosition + if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 { + prevBatch = 1 } + jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeStream, prevBatch, + ).String() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) @@ -481,11 +485,13 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 { - lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() - } else { - lr.Timeline.PrevBatch = types.StreamPosition(1).String() + var prevBatch types.StreamPosition + if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 { + prevBatch = 1 } + lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeStream, prevBatch, + ).String() lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) diff --git a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go index 3c1befdd..8fa02645 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/sync/request.go +++ b/src/github.com/matrix-org/dendrite/syncapi/sync/request.go @@ -16,6 +16,7 @@ package sync import ( "context" + "fmt" "net/http" "strconv" "time" @@ -30,6 +31,12 @@ import ( const defaultSyncTimeout = time.Duration(30) * time.Second const defaultTimelineLimit = 20 +var ( + // ErrNotStreamToken is returned if a pagination token isn't of type + // types.PaginationTokenTypeStream + ErrNotStreamToken = fmt.Errorf("The provided pagination token has the wrong prefix (should be s)") +) + // syncRequest represents a /sync request, with sensible defaults/sanity checks applied. type syncRequest struct { ctx context.Context @@ -45,7 +52,7 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" - since, err := getSyncStreamPosition(req.URL.Query().Get("since")) + since, err := getPaginationToken(req.URL.Query().Get("since")) if err != nil { return nil, err } @@ -72,16 +79,19 @@ func getTimeout(timeoutMS string) time.Duration { return time.Duration(i) * time.Millisecond } -// getSyncStreamPosition tries to parse a 'since' token taken from the API to a -// stream position. If the string is empty then (nil, nil) is returned. -func getSyncStreamPosition(since string) (*types.StreamPosition, error) { +// getPaginationToken tries to parse a 'since' token taken from the API to a +// pagination token. If the string is empty then (nil, nil) is returned. +// Returns an error if the parsed token's type isn't types.PaginationTokenTypeStream. +func getPaginationToken(since string) (*types.StreamPosition, error) { if since == "" { return nil, nil } - i, err := strconv.Atoi(since) + p, err := types.NewPaginationTokenFromString(since) if err != nil { return nil, err } - token := types.StreamPosition(i) - return &token, nil + if p.Type != types.PaginationTokenTypeStream { + return nil, ErrNotStreamToken + } + return &(p.Position), nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/types/types.go b/src/github.com/matrix-org/dendrite/syncapi/types/types.go index ec15ee8e..f1c00133 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/syncapi/types/types.go @@ -125,9 +125,9 @@ type Response struct { // NewResponse creates an empty response with initialised maps. func NewResponse(pos StreamPosition) *Response { res := Response{} - // Make sure we send the next_batch as a string. We don't want to confuse clients by sending this - // as an integer even though (at the moment) it is. - res.NextBatch = pos.String() + // Fill next_batch with a pagination token. Since this is a response to a sync request, we can assume + // we'll always return a stream token. + res.NextBatch = NewPaginationTokenFromTypeAndPosition(PaginationTokenTypeStream, pos).String() // Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section, // so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors. res.Rooms.Join = make(map[string]JoinResponse)