Fix prev_batch tokens (#999)

This commit is contained in:
Kegsay 2020-05-01 12:41:38 +01:00 committed by GitHub
parent b28674435e
commit 17e046f18f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 83 additions and 75 deletions

View file

@ -94,9 +94,6 @@ const selectEarlyEventsSQL = "" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
const selectStreamPositionForEventIDSQL = "" +
"SELECT id FROM syncapi_output_room_events WHERE event_id = $1"
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@ -114,14 +111,13 @@ const selectStateInRangeSQL = "" +
" LIMIT $8"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
selectStreamPositionForEventIDStmt *sql.Stmt
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@ -150,18 +146,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return
}
if s.selectStreamPositionForEventIDStmt, err = db.Prepare(selectStreamPositionForEventIDSQL); err != nil {
return
}
return
}
func (s *outputRoomEventsStatements) selectStreamPositionForEventID(ctx context.Context, eventID string) (types.StreamPosition, error) {
var id int64
err := s.selectStreamPositionForEventIDStmt.QueryRowContext(ctx, eventID).Scan(&id)
return types.StreamPosition(id), err
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.

View file

@ -60,7 +60,7 @@ const selectEventIDsInRangeDESCSQL = "" +
" ORDER BY topological_position DESC, stream_position DESC LIMIT $6"
const selectPositionInTopologySQL = "" +
"SELECT topological_position FROM syncapi_output_room_events_topology" +
"SELECT topological_position, stream_position FROM syncapi_output_room_events_topology" +
" WHERE event_id = $1"
// Select the max topological position for the room, then sort by stream position and take the highest,
@ -163,8 +163,8 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
// topology of the room it belongs to.
func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
ctx context.Context, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInTopologyStmt.QueryRowContext(ctx, eventID).Scan(&pos)
) (pos, spos types.StreamPosition, err error) {
err = s.selectPositionInTopologyStmt.QueryRowContext(ctx, eventID).Scan(&pos, &spos)
return
}

View file

@ -320,12 +320,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition(
func (d *SyncServerDatasource) EventPositionInTopology(
ctx context.Context, eventID string,
) (depth types.StreamPosition, stream types.StreamPosition, err error) {
depth, err = d.topology.selectPositionInTopology(ctx, eventID)
if err != nil {
return
}
stream, err = d.events.selectStreamPositionForEventID(ctx, eventID)
return
return d.topology.selectPositionInTopology(ctx, eventID)
}
func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
@ -591,8 +586,8 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
var backwardTopologyPos types.StreamPosition
backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID())
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID())
if backwardTopologyPos-1 <= 0 {
backwardTopologyPos = types.StreamPosition(1)
} else {
@ -605,7 +600,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos,
).String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
@ -720,9 +715,9 @@ func (d *SyncServerDatasource) addInvitesToResponse(
func (d *SyncServerDatasource) getBackwardTopologyPos(
ctx context.Context,
events []types.StreamEvent,
) (pos types.StreamPosition) {
) (pos, spos types.StreamPosition) {
if len(events) > 0 {
pos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID())
pos, spos, _ = d.topology.selectPositionInTopology(ctx, events[0].EventID())
}
if pos-1 <= 0 {
pos = types.StreamPosition(1)
@ -761,14 +756,14 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
}
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
backwardTopologyPos := d.getBackwardTopologyPos(ctx, recentStreamEvents)
backwardTopologyPos, backwardStreamPos := d.getBackwardTopologyPos(ctx, recentStreamEvents)
switch delta.membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos,
).String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
@ -781,7 +776,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos, 0,
types.PaginationTokenTypeTopology, backwardTopologyPos, backwardStreamPos,
).String()
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true