diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go index 3db7fdfc..ee88cb80 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go @@ -182,49 +182,83 @@ func retrieveEvents( return } - // Check if we have enough events. - isSetLargeEnough := true - if len(streamEvents) < limit { - if backwardOrdering { - if !toDefault { - // The condition in the SQL query is a strict "greater than" so - // we need to check against to-1. - isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition) - } - } else { - isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition) - } - } - - // Check if the slice contains a backward extremity. - backwardExtremity, err := containsBackwardExtremity( - ctx, db, streamEvents, backwardOrdering, - ) - if err != nil { - return - } - + var backwardExtremity bool var events []gomatrixserverlib.Event - // Backfill is needed if we've reached a backward extremity and need more - // events. It's only needed if the direction is backward. - if backwardExtremity && !isSetLargeEnough && backwardOrdering { - var pdus []gomatrixserverlib.Event - // Only ask the remote server for enough events to reach the limit. - pdus, err = backfill( - ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents), - queryAPI, cfg, federation, + // There can be two reasons for streamEvents to be empty: either we've + // reached the oldest event in the room (or the most recent one, depending + // on the ordering), or we've reached a backward extremity. + if len(streamEvents) == 0 { + var evs []storage.StreamEvent + var laterPosition types.StreamPosition + if backwardOrdering { + laterPosition = from.Position + 1 + } else { + laterPosition = to.Position + 1 + } + + evs, err = db.EventsAtTopologicalPosition(ctx, roomID, laterPosition) + if err != nil { + return + } + + backwardExtremity, err = containsBackwardExtremity(ctx, db, evs, backwardOrdering) + if err != nil { + return + } + + if backwardExtremity { + events, err = backfill(ctx, db, roomID, evs[0].EventID(), limit, queryAPI, cfg, federation) + if err != nil { + return + } + } else { + return []gomatrixserverlib.ClientEvent{}, from, to, nil + } + } else { + // Check if we have enough events. + isSetLargeEnough := true + if len(streamEvents) < limit { + if backwardOrdering { + if !toDefault { + // The condition in the SQL query is a strict "greater than" so + // we need to check against to-1. + isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition) + } + } else { + isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition) + } + } + + // Check if the slice contains a backward extremity. + backwardExtremity, err = containsBackwardExtremity( + ctx, db, streamEvents, backwardOrdering, ) if err != nil { return } - // Append the PDUs to the list to send back to the client. - events = append(events, pdus...) + // Backfill is needed if we've reached a backward extremity and need more + // events. It's only needed if the direction is backward. + if backwardExtremity && !isSetLargeEnough && backwardOrdering { + var pdus []gomatrixserverlib.Event + // Only ask the remote server for enough events to reach the limit. + pdus, err = backfill( + ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents), + queryAPI, cfg, federation, + ) + if err != nil { + return + } + + // Append the PDUs to the list to send back to the client. + events = append(events, pdus...) + } + + // Append the events ve previously retrieved locally. + events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...) } - // Append the events ve previously retrieved locally. - events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...) // Sort the events to ensure we send them in the right order. We currently // do that based on the event's timestamp. if backwardOrdering { @@ -250,6 +284,10 @@ func retrieveEvents( types.PaginationTokenTypeTopology, streamEvents[len(streamEvents)-1].StreamPosition, ) + if end.Position == types.StreamPosition(0) { + end.Position = types.StreamPosition(1) + } + return } @@ -286,7 +324,7 @@ func containsBackwardExtremity( return false, nil } // Check if we have all of the events we requested. If not, it means we've - // reached a backard extremity. + // reached a backward extremity. var eventInDB bool var id string // Iterate over the IDs we used in the request. diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go index ed78a379..9d394606 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go @@ -59,12 +59,17 @@ const selectMaxPositionInTopologySQL = "" + "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" + " WHERE room_id = $1" +const selectEventIDsFromPositionSQL = "" + + "SELECT event_id FROM syncapi_output_room_events_topology" + + " WHERE room_id = $1 AND topological_position = $2" + type outputRoomEventsTopologyStatements struct { insertEventInTopologyStmt *sql.Stmt selectEventIDsInRangeASCStmt *sql.Stmt selectEventIDsInRangeDESCStmt *sql.Stmt selectPositionInTopologyStmt *sql.Stmt selectMaxPositionInTopologyStmt *sql.Stmt + selectEventIDsFromPositionStmt *sql.Stmt } func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { @@ -87,6 +92,9 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { return } + if s.selectEventIDsFromPositionStmt, err = db.Prepare(selectEventIDsFromPositionSQL); err != nil { + return + } return } @@ -153,3 +161,27 @@ func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos) return } + +// selectEventIDsFromPosition returns the IDs of all events that have a given +// position in the topology of a given room. +func (s *outputRoomEventsTopologyStatements) selectEventIDsFromPosition( + ctx context.Context, roomID string, pos types.StreamPosition, +) (eventIDs []string, err error) { + // Query the event IDs. + rows, err := s.selectEventIDsFromPositionStmt.QueryContext(ctx, roomID, pos) + if err == sql.ErrNoRows { + // If no event matched the request, return an empty slice. + return []string{}, nil + } else if err != nil { + return + } + // Return the IDs. + var eventID string + for rows.Next() { + if err = rows.Scan(&eventID); err != nil { + return + } + eventIDs = append(eventIDs, eventID) + } + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index be0159c9..6453365a 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -269,6 +269,19 @@ func (d *SyncServerDatabase) MaxTopologicalPosition( return d.topology.selectMaxPositionInTopology(ctx, roomID) } +// EventsAtTopologicalPosition returns all of the events matching a given +// position in the topology of a given room. +func (d *SyncServerDatabase) EventsAtTopologicalPosition( + ctx context.Context, roomID string, pos types.StreamPosition, +) ([]StreamEvent, error) { + eIDs, err := d.topology.selectEventIDsFromPosition(ctx, roomID, pos) + if err != nil { + return nil, err + } + + return d.events.selectEvents(ctx, nil, eIDs) +} + // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. func (d *SyncServerDatabase) SyncStreamPosition(ctx context.Context) (types.StreamPosition, error) { return d.syncStreamPositionTx(ctx, nil) @@ -397,6 +410,8 @@ func (d *SyncServerDatabase) CompleteSync( } if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) + } else { + backwardTopologyPos = backwardTopologyPos - 1 } // We don't include a device here as we don't need to send down @@ -405,7 +420,7 @@ func (d *SyncServerDatabase) CompleteSync( stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos-1, + types.PaginationTokenTypeTopology, backwardTopologyPos, ).String() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = true @@ -542,6 +557,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( } if backwardTopologyPos-1 <= 0 { backwardTopologyPos = types.StreamPosition(1) + } else { + backwardTopologyPos = backwardTopologyPos - 1 } switch delta.membership { @@ -549,7 +566,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( jr := types.NewJoinResponse() jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeTopology, backwardTopologyPos-1, + types.PaginationTokenTypeTopology, backwardTopologyPos, ).String() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true @@ -562,7 +579,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeStream, backwardTopologyPos-1, + types.PaginationTokenTypeStream, backwardTopologyPos, ).String() lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true