Handle case where we get no events from the database and prevent end token < 1

This commit is contained in:
Brendan Abolivier 2018-11-13 17:17:45 +00:00
parent 89aeb21ef7
commit 38475d1489
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
3 changed files with 125 additions and 38 deletions

View file

@ -182,49 +182,83 @@ func retrieveEvents(
return
}
// Check if we have enough events.
isSetLargeEnough := true
if len(streamEvents) < limit {
if backwardOrdering {
if !toDefault {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition)
}
} else {
isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition)
}
}
// Check if the slice contains a backward extremity.
backwardExtremity, err := containsBackwardExtremity(
ctx, db, streamEvents, backwardOrdering,
)
if err != nil {
return
}
var backwardExtremity bool
var events []gomatrixserverlib.Event
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if backwardExtremity && !isSetLargeEnough && backwardOrdering {
var pdus []gomatrixserverlib.Event
// Only ask the remote server for enough events to reach the limit.
pdus, err = backfill(
ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents),
queryAPI, cfg, federation,
// There can be two reasons for streamEvents to be empty: either we've
// reached the oldest event in the room (or the most recent one, depending
// on the ordering), or we've reached a backward extremity.
if len(streamEvents) == 0 {
var evs []storage.StreamEvent
var laterPosition types.StreamPosition
if backwardOrdering {
laterPosition = from.Position + 1
} else {
laterPosition = to.Position + 1
}
evs, err = db.EventsAtTopologicalPosition(ctx, roomID, laterPosition)
if err != nil {
return
}
backwardExtremity, err = containsBackwardExtremity(ctx, db, evs, backwardOrdering)
if err != nil {
return
}
if backwardExtremity {
events, err = backfill(ctx, db, roomID, evs[0].EventID(), limit, queryAPI, cfg, federation)
if err != nil {
return
}
} else {
return []gomatrixserverlib.ClientEvent{}, from, to, nil
}
} else {
// Check if we have enough events.
isSetLargeEnough := true
if len(streamEvents) < limit {
if backwardOrdering {
if !toDefault {
// The condition in the SQL query is a strict "greater than" so
// we need to check against to-1.
isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition)
}
} else {
isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition)
}
}
// Check if the slice contains a backward extremity.
backwardExtremity, err = containsBackwardExtremity(
ctx, db, streamEvents, backwardOrdering,
)
if err != nil {
return
}
// Append the PDUs to the list to send back to the client.
events = append(events, pdus...)
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if backwardExtremity && !isSetLargeEnough && backwardOrdering {
var pdus []gomatrixserverlib.Event
// Only ask the remote server for enough events to reach the limit.
pdus, err = backfill(
ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents),
queryAPI, cfg, federation,
)
if err != nil {
return
}
// Append the PDUs to the list to send back to the client.
events = append(events, pdus...)
}
// Append the events ve previously retrieved locally.
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
}
// Append the events ve previously retrieved locally.
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
// Sort the events to ensure we send them in the right order. We currently
// do that based on the event's timestamp.
if backwardOrdering {
@ -250,6 +284,10 @@ func retrieveEvents(
types.PaginationTokenTypeTopology, streamEvents[len(streamEvents)-1].StreamPosition,
)
if end.Position == types.StreamPosition(0) {
end.Position = types.StreamPosition(1)
}
return
}
@ -286,7 +324,7 @@ func containsBackwardExtremity(
return false, nil
}
// Check if we have all of the events we requested. If not, it means we've
// reached a backard extremity.
// reached a backward extremity.
var eventInDB bool
var id string
// Iterate over the IDs we used in the request.

View file

@ -59,12 +59,17 @@ const selectMaxPositionInTopologySQL = "" +
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1"
const selectEventIDsFromPositionSQL = "" +
"SELECT event_id FROM syncapi_output_room_events_topology" +
" WHERE room_id = $1 AND topological_position = $2"
type outputRoomEventsTopologyStatements struct {
insertEventInTopologyStmt *sql.Stmt
selectEventIDsInRangeASCStmt *sql.Stmt
selectEventIDsInRangeDESCStmt *sql.Stmt
selectPositionInTopologyStmt *sql.Stmt
selectMaxPositionInTopologyStmt *sql.Stmt
selectEventIDsFromPositionStmt *sql.Stmt
}
func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
@ -87,6 +92,9 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
return
}
if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil {
return
}
return
}
@ -153,3 +161,27 @@ func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos)
return
}
// selectEventIDsFromPosition returns the IDs of all events that have a given
// position in the topology of a given room.
func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition(
ctx context.Context, roomID string, pos types.StreamPosition,
) (eventIDs []string, err error) {
// Query the event IDs.
rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos)
if err == sql.ErrNoRows {
// If no event matched the request, return an empty slice.
return []string{}, nil
} else if err != nil {
return
}
// Return the IDs.
var eventID string
for rows.Next() {
if err = rows.Scan(&eventID); err != nil {
return
}
eventIDs = append(eventIDs, eventID)
}
return
}

View file

@ -269,6 +269,19 @@ func (d *SyncServerDatabase) MaxTopologicalPosition(
return d.topology.selectMaxPositionInTopology(ctx, roomID)
}
// EventsAtTopologicalPosition returns all of the events matching a given
// position in the topology of a given room.
func (d *SyncServerDatabase) EventsAtTopologicalPosition(
ctx context.Context, roomID string, pos types.StreamPosition,
) ([]StreamEvent, error) {
eIDs, err := d.topology.selectEventIDsFromPosition(ctx, roomID, pos)
if err != nil {
return nil, err
}
return d.events.selectEvents(ctx, nil, eIDs)
}
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) {
return d.syncStreamPositionTx(ctx, nil)
@ -397,6 +410,8 @@ func (d *SyncServerDatabase) CompleteSync(
}
if backwardTopologyPos-1 <= 0 {
backwardTopologyPos = types.StreamPosition(1)
} else {
backwardTopologyPos = backwardTopologyPos - 1
}
// We don't include a device here as we don't need to send down
@ -405,7 +420,7 @@ func (d *SyncServerDatabase) CompleteSync(
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos-1,
types.PaginationTokenTypeTopology, backwardTopologyPos,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
@ -542,6 +557,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
}
if backwardTopologyPos-1 <= 0 {
backwardTopologyPos = types.StreamPosition(1)
} else {
backwardTopologyPos = backwardTopologyPos - 1
}
switch delta.membership {
@ -549,7 +566,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeTopology, backwardTopologyPos-1,
types.PaginationTokenTypeTopology, backwardTopologyPos,
).String()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
@ -562,7 +579,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
types.PaginationTokenTypeStream, backwardTopologyPos-1,
types.PaginationTokenTypeStream, backwardTopologyPos,
).String()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true