Implement new pagination tokens in sync requests/responses

This commit is contained in:
Brendan Abolivier 2018-11-12 15:04:12 +00:00
parent 9399760086
commit e1ee10c7bb
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
3 changed files with 38 additions and 22 deletions

View file

@ -310,6 +310,7 @@ func (d *SyncServerDatabase) CompleteSync(
// Build up a /sync response. Add joined rooms.
res := types.NewResponse(pos)
var prevBatch types.StreamPosition
for _, roomID := range roomIDs {
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
@ -332,11 +333,12 @@ func (d *SyncServerDatabase) CompleteSync(
recentEvents := StreamEventsToEvents(nil, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 {
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
} else {
jr.Timeline.PrevBatch = types.StreamPosition(1).String()
if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 {
prevBatch = 1
}
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeStream, prevBatch,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
@ -466,11 +468,13 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
switch delta.membership {
case "join":
jr := types.NewJoinResponse()
if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 {
jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
} else {
jr.Timeline.PrevBatch = types.StreamPosition(1).String()
var prevBatch types.StreamPosition
if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 {
prevBatch = 1
}
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeStream, prevBatch,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@ -481,11 +485,13 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 {
lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String()
} else {
lr.Timeline.PrevBatch = types.StreamPosition(1).String()
var prevBatch types.StreamPosition
if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 {
prevBatch = 1
}
lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeStream, prevBatch,
).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)

View file

@ -16,6 +16,7 @@ package sync
import (
"context"
"fmt"
"net/http"
"strconv"
"time"
@ -30,6 +31,12 @@ import (
const defaultSyncTimeout = time.Duration(30) * time.Second
const defaultTimelineLimit = 20
var (
// ErrNotStreamToken is returned if a pagination token isn't of type
// types.PaginationTokenTypeStream
ErrNotStreamToken = fmt.Errorf("The provided pagination token has the wrong prefix (should be s)")
)
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
type syncRequest struct {
ctx context.Context
@ -45,7 +52,7 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
since, err := getSyncStreamPosition(req.URL.Query().Get("since"))
since, err := getPaginationToken(req.URL.Query().Get("since"))
if err != nil {
return nil, err
}
@ -72,16 +79,19 @@ func getTimeout(timeoutMS string) time.Duration {
return time.Duration(i) * time.Millisecond
}
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a
// stream position. If the string is empty then (nil, nil) is returned.
func getSyncStreamPosition(since string) (*types.StreamPosition, error) {
// getPaginationToken tries to parse a 'since' token taken from the API to a
// pagination token. If the string is empty then (nil, nil) is returned.
// Returns an error if the parsed token's type isn't types.PaginationTokenTypeStream.
func getPaginationToken(since string) (*types.StreamPosition, error) {
if since == "" {
return nil, nil
}
i, err := strconv.Atoi(since)
p, err := types.NewPaginationTokenFromString(since)
if err != nil {
return nil, err
}
token := types.StreamPosition(i)
return &token, nil
if p.Type != types.PaginationTokenTypeStream {
return nil, ErrNotStreamToken
}
return &(p.Position), nil
}

View file

@ -125,9 +125,9 @@ type Response struct {
// NewResponse creates an empty response with initialised maps.
func NewResponse(pos StreamPosition) *Response {
res := Response{}
// Make sure we send the next_batch as a string. We don't want to confuse clients by sending this
// as an integer even though (at the moment) it is.
res.NextBatch = pos.String()
// Fill next_batch with a pagination token. Since this is a response to a sync request, we can assume
// we'll always return a stream token.
res.NextBatch = NewPaginationTokenFromTypeAndPosition(PaginationTokenTypeStream, pos).String()
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
res.Rooms.Join = make(map[string]JoinResponse)