mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-16 16:53:40 +00:00
Implement the use of new pagination tokens in /messages
Also use them to store and retrieve events we got from backfilling
This commit is contained in:
parent
b205931819
commit
89aeb21ef7
4 changed files with 182 additions and 63 deletions
|
@ -17,6 +17,7 @@ package routing
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/httputil"
|
"github.com/matrix-org/dendrite/clientapi/httputil"
|
||||||
|
@ -47,19 +48,17 @@ func OnIncomingMessagesRequest(
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
) util.JSONResponse {
|
) util.JSONResponse {
|
||||||
var from, to int
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
// Extract parameters from the request's URL.
|
// Extract parameters from the request's URL.
|
||||||
// Pagination tokens.
|
// Pagination tokens.
|
||||||
from, err = strconv.Atoi(req.URL.Query().Get("from"))
|
from, err := types.NewPaginationTokenFromString(req.URL.Query().Get("from"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
JSON: jsonerror.InvalidArgumentValue("from could not be parsed into an integer: " + err.Error()),
|
JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fromPos := types.StreamPosition(from)
|
|
||||||
|
|
||||||
// Direction to return events from.
|
// Direction to return events from.
|
||||||
dir := req.URL.Query().Get("dir")
|
dir := req.URL.Query().Get("dir")
|
||||||
|
@ -75,25 +74,25 @@ func OnIncomingMessagesRequest(
|
||||||
|
|
||||||
// Pagination tokens. To is optional, and its default value depends on the
|
// Pagination tokens. To is optional, and its default value depends on the
|
||||||
// direction ("b" or "f").
|
// direction ("b" or "f").
|
||||||
toStr := req.URL.Query().Get("to")
|
var to *types.PaginationToken
|
||||||
var toPos types.StreamPosition
|
var toDefault bool
|
||||||
if len(toStr) > 0 {
|
if s := req.URL.Query().Get("to"); len(s) > 0 {
|
||||||
to, err = strconv.Atoi(toStr)
|
to, err = types.NewPaginationTokenFromString(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusBadRequest,
|
Code: http.StatusBadRequest,
|
||||||
JSON: jsonerror.InvalidArgumentValue("to could not be parsed into an integer: " + err.Error()),
|
JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
toPos = types.StreamPosition(to)
|
|
||||||
} else {
|
} else {
|
||||||
// If "to" isn't provided, it defaults to either the earliest stream
|
// If "to" isn't provided, it defaults to either the earliest stream
|
||||||
// position (if we're going backward) or to the latest one (if we're
|
// position (if we're going backward) or to the latest one (if we're
|
||||||
// going forward).
|
// going forward).
|
||||||
toPos, err = setToDefault(req.Context(), backwardOrdering, db)
|
to, err = setToDefault(req.Context(), db, backwardOrdering, roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return httputil.LogThenError(req, err)
|
return httputil.LogThenError(req, err)
|
||||||
}
|
}
|
||||||
|
toDefault = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maximum number of events to return; defaults to 10.
|
// Maximum number of events to return; defaults to 10.
|
||||||
|
@ -119,7 +118,7 @@ func OnIncomingMessagesRequest(
|
||||||
}
|
}
|
||||||
|
|
||||||
clientEvents, start, end, err := retrieveEvents(
|
clientEvents, start, end, err := retrieveEvents(
|
||||||
req.Context(), db, roomID, fromPos, toPos, toStr, limit, backwardOrdering,
|
req.Context(), db, roomID, from, to, toDefault, limit, backwardOrdering,
|
||||||
federation, queryAPI, cfg,
|
federation, queryAPI, cfg,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -131,28 +130,32 @@ func OnIncomingMessagesRequest(
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: messageResp{
|
JSON: messageResp{
|
||||||
Chunk: clientEvents,
|
Chunk: clientEvents,
|
||||||
Start: start,
|
Start: start.String(),
|
||||||
End: end,
|
End: end.String(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setToDefault returns the default value for the "to" query parameter of a
|
// setToDefault returns the default value for the "to" query parameter of a
|
||||||
// request to /messages if not provided. It defaults to either the earliest
|
// request to /messages if not provided. It defaults to either the earliest
|
||||||
// stream position (if we're going backward) or to the latest one (if we're
|
// topological position (if we're going backward) or to the latest one (if we're
|
||||||
// going forward).
|
// going forward).
|
||||||
// Returns an error if there was an issue with retrieving the latest position
|
// Returns an error if there was an issue with retrieving the latest position
|
||||||
// from the database
|
// from the database
|
||||||
func setToDefault(
|
func setToDefault(
|
||||||
ctx context.Context, backwardOrdering bool, db *storage.SyncServerDatabase,
|
ctx context.Context, db *storage.SyncServerDatabase, backwardOrdering bool,
|
||||||
) (toPos types.StreamPosition, err error) {
|
roomID string,
|
||||||
|
) (to *types.PaginationToken, err error) {
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
toPos = types.StreamPosition(1)
|
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1)
|
||||||
} else {
|
} else {
|
||||||
toPos, err = db.SyncStreamPosition(ctx)
|
var pos types.StreamPosition
|
||||||
|
pos, err = db.MaxTopologicalPosition(ctx, roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -165,15 +168,15 @@ func setToDefault(
|
||||||
// remote homeserver.
|
// remote homeserver.
|
||||||
func retrieveEvents(
|
func retrieveEvents(
|
||||||
ctx context.Context, db *storage.SyncServerDatabase, roomID string,
|
ctx context.Context, db *storage.SyncServerDatabase, roomID string,
|
||||||
fromPos, toPos types.StreamPosition, toStr string, limit int,
|
from, to *types.PaginationToken, toDefault bool, limit int,
|
||||||
backwardOrdering bool,
|
backwardOrdering bool,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
federation *gomatrixserverlib.FederationClient,
|
||||||
queryAPI api.RoomserverQueryAPI,
|
queryAPI api.RoomserverQueryAPI,
|
||||||
cfg *config.Dendrite,
|
cfg *config.Dendrite,
|
||||||
) (clientEvents []gomatrixserverlib.ClientEvent, start string, end string, err error) {
|
) (clientEvents []gomatrixserverlib.ClientEvent, start, end *types.PaginationToken, err error) {
|
||||||
// Retrieve the events from the local database.
|
// Retrieve the events from the local database.
|
||||||
streamEvents, err := db.GetEventsInRange(
|
streamEvents, err := db.GetEventsInRange(
|
||||||
ctx, fromPos, toPos, roomID, limit, backwardOrdering,
|
ctx, from, to, roomID, limit, backwardOrdering,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -183,21 +186,20 @@ func retrieveEvents(
|
||||||
isSetLargeEnough := true
|
isSetLargeEnough := true
|
||||||
if len(streamEvents) < limit {
|
if len(streamEvents) < limit {
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
if len(toStr) > 0 {
|
if !toDefault {
|
||||||
// The condition in the SQL query is a strict "greater than" so
|
// The condition in the SQL query is a strict "greater than" so
|
||||||
// we need to check against to-1.
|
// we need to check against to-1.
|
||||||
isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition)
|
isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition)
|
isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if earliest event is a backward extremity, i.e. if one of its
|
// Check if the slice contains a backward extremity.
|
||||||
// previous events is missing from the database.
|
backwardExtremity, err := containsBackwardExtremity(
|
||||||
// Get the earliest retrieved event's parents.
|
ctx, db, streamEvents, backwardOrdering,
|
||||||
|
)
|
||||||
backwardExtremity, err := isBackwardExtremity(ctx, &(streamEvents[0]), db)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -205,39 +207,79 @@ func retrieveEvents(
|
||||||
var events []gomatrixserverlib.Event
|
var events []gomatrixserverlib.Event
|
||||||
|
|
||||||
// Backfill is needed if we've reached a backward extremity and need more
|
// Backfill is needed if we've reached a backward extremity and need more
|
||||||
// events. It's only needed if the direction is backard.
|
// events. It's only needed if the direction is backward.
|
||||||
if backwardExtremity && !isSetLargeEnough && backwardOrdering {
|
if backwardExtremity && !isSetLargeEnough && backwardOrdering {
|
||||||
var pdus []gomatrixserverlib.Event
|
var pdus []gomatrixserverlib.Event
|
||||||
// Only ask the remote server for enough events to reach the limit.
|
// Only ask the remote server for enough events to reach the limit.
|
||||||
pdus, err = backfill(
|
pdus, err = backfill(
|
||||||
ctx, roomID, streamEvents[0].EventID(), limit-len(streamEvents),
|
ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents),
|
||||||
queryAPI, cfg, federation,
|
queryAPI, cfg, federation,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Append the PDUs to the list to send back to the client.
|
||||||
events = append(events, pdus...)
|
events = append(events, pdus...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append the events we retrieved locally, then convert them into client
|
// Append the events ve previously retrieved locally.
|
||||||
// events.
|
|
||||||
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
|
events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...)
|
||||||
|
// Sort the events to ensure we send them in the right order. We currently
|
||||||
|
// do that based on the event's timestamp.
|
||||||
|
if backwardOrdering {
|
||||||
|
sort.SliceStable(events, func(i int, j int) bool {
|
||||||
|
// Backward ordering is antichronological (latest event to oldest
|
||||||
|
// one).
|
||||||
|
return sortEvents(&(events[j]), &(events[i]))
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
sort.SliceStable(events, func(i int, j int) bool {
|
||||||
|
// Forward ordering is chronological (oldest event to latest one).
|
||||||
|
return sortEvents(&(events[i]), &(events[j]))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert all of the events into client events.
|
||||||
clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll)
|
clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll)
|
||||||
start = streamEvents[0].StreamPosition.String()
|
// Generate pagination tokens to send to the client.
|
||||||
end = streamEvents[len(streamEvents)-1].StreamPosition.String()
|
start = types.NewPaginationTokenFromTypeAndPosition(
|
||||||
|
types.PaginationTokenTypeTopology, streamEvents[0].StreamPosition,
|
||||||
|
)
|
||||||
|
end = types.NewPaginationTokenFromTypeAndPosition(
|
||||||
|
types.PaginationTokenTypeTopology, streamEvents[len(streamEvents)-1].StreamPosition,
|
||||||
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// isBackwardExtremity checks if a given event is a backward extremity. It does
|
// sortEvents is a function to give to sort.SliceStable, and compares the
|
||||||
// so by checking the presence in the database of all of its parent events, and
|
// timestamp of two Matrix events.
|
||||||
// consider the event itself a backward extremity if at least one of the parent
|
// Returns true if the first event happened before the second one, false
|
||||||
|
// otherwise.
|
||||||
|
func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool {
|
||||||
|
t := e1.OriginServerTS().Time()
|
||||||
|
return e2.OriginServerTS().Time().After(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// containsBackwardExtremity checks if a slice of StreamEvent contains a
|
||||||
|
// backward extremity. It does so by selecting the earliest event in the slice
|
||||||
|
// and by checking the presence in the database of all of its parent events, and
|
||||||
|
// considers the event itself a backward extremity if at least one of the parent
|
||||||
// events doesn't exist in the database.
|
// events doesn't exist in the database.
|
||||||
// Returns an error if there was an issue with talking to the database.
|
// Returns an error if there was an issue with talking to the database.
|
||||||
func isBackwardExtremity(
|
func containsBackwardExtremity(
|
||||||
ctx context.Context, ev *storage.StreamEvent, db *storage.SyncServerDatabase,
|
ctx context.Context, db *storage.SyncServerDatabase,
|
||||||
|
events []storage.StreamEvent, backwardOrdering bool,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
|
// Select the earliest retrieved event.
|
||||||
|
var ev *storage.StreamEvent
|
||||||
|
if backwardOrdering {
|
||||||
|
ev = &(events[len(events)-1])
|
||||||
|
} else {
|
||||||
|
ev = &(events[0])
|
||||||
|
}
|
||||||
|
// Get the earliest retrieved event's parents.
|
||||||
prevIDs := ev.PrevEventIDs()
|
prevIDs := ev.PrevEventIDs()
|
||||||
prevs, err := db.Events(ctx, prevIDs)
|
prevs, err := db.Events(ctx, prevIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -269,14 +311,16 @@ func isBackwardExtremity(
|
||||||
// backfill performs a backfill request over the federation on another
|
// backfill performs a backfill request over the federation on another
|
||||||
// homeserver in the room.
|
// homeserver in the room.
|
||||||
// See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid
|
// See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid
|
||||||
|
// It also stores the PDUs retrieved from the remote homeserver's response to
|
||||||
|
// the database.
|
||||||
// Returns with an empty string if the remote homeserver didn't return with any
|
// Returns with an empty string if the remote homeserver didn't return with any
|
||||||
// event, or if there is no remote homeserver to contact.
|
// event, or if there is no remote homeserver to contact.
|
||||||
// Returns an error if there was an issue with retrieving the list of servers in
|
// Returns an error if there was an issue with retrieving the list of servers in
|
||||||
// the room or sending the request.
|
// the room or sending the request.
|
||||||
func backfill(
|
func backfill(
|
||||||
ctx context.Context, roomID, fromEventID string, limit int,
|
ctx context.Context, db *storage.SyncServerDatabase, roomID,
|
||||||
queryAPI api.RoomserverQueryAPI, cfg *config.Dendrite,
|
fromEventID string, limit int, queryAPI api.RoomserverQueryAPI,
|
||||||
federation *gomatrixserverlib.FederationClient,
|
cfg *config.Dendrite, federation *gomatrixserverlib.FederationClient,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.Event, error) {
|
||||||
// Query the list of servers in the room when the earlier event we know
|
// Query the list of servers in the room when the earlier event we know
|
||||||
// of was sent.
|
// of was sent.
|
||||||
|
@ -317,11 +361,18 @@ func backfill(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Store the events in the database. The remaining question to
|
|
||||||
// make this possible is what to assign to the new events' stream
|
|
||||||
// position (negative integers? change the stream position format into a
|
|
||||||
// timestamp-based one?...)
|
|
||||||
pdus = txn.PDUs
|
pdus = txn.PDUs
|
||||||
|
|
||||||
|
// Store the events in the database, while marking them as unfit to show
|
||||||
|
// up in responses to sync requests.
|
||||||
|
for _, pdu := range pdus {
|
||||||
|
if _, err = db.WriteEvent(
|
||||||
|
ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{},
|
||||||
|
nil, true,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pdus, nil
|
return pdus, nil
|
||||||
|
|
|
@ -66,7 +66,7 @@ const insertEventSQL = "" +
|
||||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id"
|
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id"
|
||||||
|
|
||||||
const selectEventsSQL = "" +
|
const selectEventsSQL = "" +
|
||||||
"SELECT id, event_json, exclude_from_sync FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
"SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||||
|
|
||||||
const selectRecentEventsSQL = "" +
|
const selectRecentEventsSQL = "" +
|
||||||
"SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" +
|
"SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" +
|
||||||
|
|
|
@ -55,11 +55,16 @@ const selectPositionInTopologySQL = "" +
|
||||||
"SELECT topological_position FROM syncapi_output_room_events_topology" +
|
"SELECT topological_position FROM syncapi_output_room_events_topology" +
|
||||||
" WHERE event_id = $1"
|
" WHERE event_id = $1"
|
||||||
|
|
||||||
|
const selectMaxPositionInTopologySQL = "" +
|
||||||
|
"SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" +
|
||||||
|
" WHERE room_id = $1"
|
||||||
|
|
||||||
type outputRoomEventsTopologyStatements struct {
|
type outputRoomEventsTopologyStatements struct {
|
||||||
insertEventInTopologyStmt *sql.Stmt
|
insertEventInTopologyStmt *sql.Stmt
|
||||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||||
selectEventIDsInRangeDESCStmt *sql.Stmt
|
selectEventIDsInRangeDESCStmt *sql.Stmt
|
||||||
selectPositionInTopologyStmt *sql.Stmt
|
selectPositionInTopologyStmt *sql.Stmt
|
||||||
|
selectMaxPositionInTopologyStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
|
func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
|
||||||
|
@ -79,6 +84,9 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
|
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,8 +141,15 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
|
||||||
// selectPositionInTopology returns the position of a given event in the
|
// selectPositionInTopology returns the position of a given event in the
|
||||||
// topology of the room it belongs to.
|
// topology of the room it belongs to.
|
||||||
func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
|
func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
|
||||||
eventID string,
|
ctx context.Context, eventID string,
|
||||||
) (pos types.StreamPosition, err error) {
|
) (pos types.StreamPosition, err error) {
|
||||||
err = s.selectPositionInTopologyStmt.QueryRow(eventID).Scan(&pos)
|
err = s.selectPositionInTopologyStmt.QueryRowContext(ctx, eventID).Scan(&pos)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology(
|
||||||
|
ctx context.Context, roomID string,
|
||||||
|
) (pos types.StreamPosition, err error) {
|
||||||
|
err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,18 +202,71 @@ func (d *SyncServerDatabase) GetStateEventsForRoom(
|
||||||
// given extremities and limit.
|
// given extremities and limit.
|
||||||
func (d *SyncServerDatabase) GetEventsInRange(
|
func (d *SyncServerDatabase) GetEventsInRange(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
from, to types.StreamPosition,
|
from, to *types.PaginationToken,
|
||||||
roomID string, limit int,
|
roomID string, limit int,
|
||||||
backwardOrdering bool,
|
backwardOrdering bool,
|
||||||
) (events []StreamEvent, err error) {
|
) (events []StreamEvent, err error) {
|
||||||
|
// If the pagination token's type is types.PaginationTokenTypeTopology, the
|
||||||
|
// events must be retrieved from the rooms' topology table rather than the
|
||||||
|
// table contaning the syncapi server's whole stream of events.
|
||||||
|
if from.Type == types.PaginationTokenTypeTopology {
|
||||||
|
// Determine the backward and forward limit, i.e. the upper and lower
|
||||||
|
// limits to the selection in the room's topology, from the direction.
|
||||||
|
var backwardLimit, forwardLimit types.StreamPosition
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
// We need all events matching to < streamPos < from
|
// Backward ordering is antichronological (latest event to oldest
|
||||||
return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false, false)
|
// one).
|
||||||
|
backwardLimit = to.Position
|
||||||
|
forwardLimit = from.Position
|
||||||
|
} else {
|
||||||
|
// Forward ordering is chronological (oldest event to latest one).
|
||||||
|
backwardLimit = from.Position
|
||||||
|
forwardLimit = to.Position
|
||||||
}
|
}
|
||||||
|
|
||||||
// We need all events from < streamPos < to
|
// Select the event IDs from the defined range.
|
||||||
return d.events.selectEarlyEvents(ctx, nil, roomID, from, to, limit)
|
var eIDs []string
|
||||||
|
eIDs, err = d.topology.selectEventIDsInRange(
|
||||||
|
ctx, roomID, backwardLimit, forwardLimit, !backwardOrdering,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the events' contents using their IDs.
|
||||||
|
events, err = d.events.selectEvents(ctx, nil, eIDs)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the pagination token's type is types.PaginationTokenTypeStream, the
|
||||||
|
// events must be retrieved from the table contaning the syncapi server's
|
||||||
|
// whole stream of events.
|
||||||
|
|
||||||
|
if backwardOrdering {
|
||||||
|
// When using backward ordering, we want the most recent events first.
|
||||||
|
if events, err = d.events.selectRecentEvents(
|
||||||
|
ctx, nil, roomID, to.Position, from.Position, limit, false, false,
|
||||||
|
); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// When using forward ordering, we want the least recent events first.
|
||||||
|
if events, err = d.events.selectEarlyEvents(
|
||||||
|
ctx, nil, roomID, from.Position, to.Position, limit,
|
||||||
|
); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// MaxTopologicalPosition returns the highest topological position for a given
|
||||||
|
// room.
|
||||||
|
func (d *SyncServerDatabase) MaxTopologicalPosition(
|
||||||
|
ctx context.Context, roomID string,
|
||||||
|
) (types.StreamPosition, error) {
|
||||||
|
return d.topology.selectMaxPositionInTopology(ctx, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
// SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet.
|
||||||
|
@ -338,7 +391,7 @@ func (d *SyncServerDatabase) CompleteSync(
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
var backwardTopologyPos types.StreamPosition
|
var backwardTopologyPos types.StreamPosition
|
||||||
backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID())
|
backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -483,7 +536,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
var backwardTopologyPos types.StreamPosition
|
var backwardTopologyPos types.StreamPosition
|
||||||
backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID())
|
backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue