From 89aeb21ef7740ab9190781a12cc6933d16f7cc43 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 12 Nov 2018 18:29:54 +0000 Subject: [PATCH] Implement the use of new pagination tokens in /messages Also use them to store and retrieve events we got from backfilling --- .../dendrite/syncapi/routing/messages.go | 147 ++++++++++++------ .../storage/output_room_events_table.go | 2 +- .../output_room_events_topology_table.go | 27 +++- .../dendrite/syncapi/storage/syncserver.go | 69 +++++++- 4 files changed, 182 insertions(+), 63 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go index 225b05ae..3db7fdfc 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go @@ -17,6 +17,7 @@ package routing import ( "context" "net/http" + "sort" "strconv" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -47,19 +48,17 @@ func OnIncomingMessagesRequest( queryAPI api.RoomserverQueryAPI, cfg *config.Dendrite, ) util.JSONResponse { - var from, to int var err error // Extract parameters from the request's URL. // Pagination tokens. - from, err = strconv.Atoi(req.URL.Query().Get("from")) + from, err := types.NewPaginationTokenFromString(req.URL.Query().Get("from")) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("from could not be parsed into an integer: " + err.Error()), + JSON: jsonerror.InvalidArgumentValue("Invalid from parameter: " + err.Error()), } } - fromPos := types.StreamPosition(from) // Direction to return events from. dir := req.URL.Query().Get("dir") @@ -75,25 +74,25 @@ func OnIncomingMessagesRequest( // Pagination tokens. To is optional, and its default value depends on the // direction ("b" or "f"). - toStr := req.URL.Query().Get("to") - var toPos types.StreamPosition - if len(toStr) > 0 { - to, err = strconv.Atoi(toStr) + var to *types.PaginationToken + var toDefault bool + if s := req.URL.Query().Get("to"); len(s) > 0 { + to, err = types.NewPaginationTokenFromString(s) if err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, - JSON: jsonerror.InvalidArgumentValue("to could not be parsed into an integer: " + err.Error()), + JSON: jsonerror.InvalidArgumentValue("Invalid to parameter: " + err.Error()), } } - toPos = types.StreamPosition(to) } else { // If "to" isn't provided, it defaults to either the earliest stream // position (if we're going backward) or to the latest one (if we're // going forward). - toPos, err = setToDefault(req.Context(), backwardOrdering, db) + to, err = setToDefault(req.Context(), db, backwardOrdering, roomID) if err != nil { return httputil.LogThenError(req, err) } + toDefault = true } // Maximum number of events to return; defaults to 10. @@ -119,7 +118,7 @@ func OnIncomingMessagesRequest( } clientEvents, start, end, err := retrieveEvents( - req.Context(), db, roomID, fromPos, toPos, toStr, limit, backwardOrdering, + req.Context(), db, roomID, from, to, toDefault, limit, backwardOrdering, federation, queryAPI, cfg, ) if err != nil { @@ -131,28 +130,32 @@ func OnIncomingMessagesRequest( Code: http.StatusOK, JSON: messageResp{ Chunk: clientEvents, - Start: start, - End: end, + Start: start.String(), + End: end.String(), }, } } // setToDefault returns the default value for the "to" query parameter of a // request to /messages if not provided. It defaults to either the earliest -// stream position (if we're going backward) or to the latest one (if we're +// topological position (if we're going backward) or to the latest one (if we're // going forward). // Returns an error if there was an issue with retrieving the latest position // from the database func setToDefault( - ctx context.Context, backwardOrdering bool, db *storage.SyncServerDatabase, -) (toPos types.StreamPosition, err error) { + ctx context.Context, db *storage.SyncServerDatabase, backwardOrdering bool, + roomID string, +) (to *types.PaginationToken, err error) { if backwardOrdering { - toPos = types.StreamPosition(1) + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1) } else { - toPos, err = db.SyncStreamPosition(ctx) + var pos types.StreamPosition + pos, err = db.MaxTopologicalPosition(ctx, roomID) if err != nil { return } + + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos) } return @@ -165,15 +168,15 @@ func setToDefault( // remote homeserver. func retrieveEvents( ctx context.Context, db *storage.SyncServerDatabase, roomID string, - fromPos, toPos types.StreamPosition, toStr string, limit int, + from, to *types.PaginationToken, toDefault bool, limit int, backwardOrdering bool, federation *gomatrixserverlib.FederationClient, queryAPI api.RoomserverQueryAPI, cfg *config.Dendrite, -) (clientEvents []gomatrixserverlib.ClientEvent, start string, end string, err error) { +) (clientEvents []gomatrixserverlib.ClientEvent, start, end *types.PaginationToken, err error) { // Retrieve the events from the local database. streamEvents, err := db.GetEventsInRange( - ctx, fromPos, toPos, roomID, limit, backwardOrdering, + ctx, from, to, roomID, limit, backwardOrdering, ) if err != nil { return @@ -183,21 +186,20 @@ func retrieveEvents( isSetLargeEnough := true if len(streamEvents) < limit { if backwardOrdering { - if len(toStr) > 0 { + if !toDefault { // The condition in the SQL query is a strict "greater than" so // we need to check against to-1. - isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition) + isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition) } } else { - isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition) + isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition) } } - // Check if earliest event is a backward extremity, i.e. if one of its - // previous events is missing from the database. - // Get the earliest retrieved event's parents. - - backwardExtremity, err := isBackwardExtremity(ctx, &(streamEvents[0]), db) + // Check if the slice contains a backward extremity. + backwardExtremity, err := containsBackwardExtremity( + ctx, db, streamEvents, backwardOrdering, + ) if err != nil { return } @@ -205,39 +207,79 @@ func retrieveEvents( var events []gomatrixserverlib.Event // Backfill is needed if we've reached a backward extremity and need more - // events. It's only needed if the direction is backard. + // events. It's only needed if the direction is backward. if backwardExtremity && !isSetLargeEnough && backwardOrdering { var pdus []gomatrixserverlib.Event // Only ask the remote server for enough events to reach the limit. pdus, err = backfill( - ctx, roomID, streamEvents[0].EventID(), limit-len(streamEvents), + ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents), queryAPI, cfg, federation, ) if err != nil { return } + // Append the PDUs to the list to send back to the client. events = append(events, pdus...) } - // Append the events we retrieved locally, then convert them into client - // events. + // Append the events ve previously retrieved locally. events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...) + // Sort the events to ensure we send them in the right order. We currently + // do that based on the event's timestamp. + if backwardOrdering { + sort.SliceStable(events, func(i int, j int) bool { + // Backward ordering is antichronological (latest event to oldest + // one). + return sortEvents(&(events[j]), &(events[i])) + }) + } else { + sort.SliceStable(events, func(i int, j int) bool { + // Forward ordering is chronological (oldest event to latest one). + return sortEvents(&(events[i]), &(events[j])) + }) + } + + // Convert all of the events into client events. clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll) - start = streamEvents[0].StreamPosition.String() - end = streamEvents[len(streamEvents)-1].StreamPosition.String() + // Generate pagination tokens to send to the client. + start = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeTopology, streamEvents[0].StreamPosition, + ) + end = types.NewPaginationTokenFromTypeAndPosition( + types.PaginationTokenTypeTopology, streamEvents[len(streamEvents)-1].StreamPosition, + ) return } -// isBackwardExtremity checks if a given event is a backward extremity. It does -// so by checking the presence in the database of all of its parent events, and -// consider the event itself a backward extremity if at least one of the parent +// sortEvents is a function to give to sort.SliceStable, and compares the +// timestamp of two Matrix events. +// Returns true if the first event happened before the second one, false +// otherwise. +func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool { + t := e1.OriginServerTS().Time() + return e2.OriginServerTS().Time().After(t) +} + +// containsBackwardExtremity checks if a slice of StreamEvent contains a +// backward extremity. It does so by selecting the earliest event in the slice +// and by checking the presence in the database of all of its parent events, and +// considers the event itself a backward extremity if at least one of the parent // events doesn't exist in the database. // Returns an error if there was an issue with talking to the database. -func isBackwardExtremity( - ctx context.Context, ev *storage.StreamEvent, db *storage.SyncServerDatabase, +func containsBackwardExtremity( + ctx context.Context, db *storage.SyncServerDatabase, + events []storage.StreamEvent, backwardOrdering bool, ) (bool, error) { + // Select the earliest retrieved event. + var ev *storage.StreamEvent + if backwardOrdering { + ev = &(events[len(events)-1]) + } else { + ev = &(events[0]) + } + // Get the earliest retrieved event's parents. prevIDs := ev.PrevEventIDs() prevs, err := db.Events(ctx, prevIDs) if err != nil { @@ -269,14 +311,16 @@ func isBackwardExtremity( // backfill performs a backfill request over the federation on another // homeserver in the room. // See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid +// It also stores the PDUs retrieved from the remote homeserver's response to +// the database. // Returns with an empty string if the remote homeserver didn't return with any // event, or if there is no remote homeserver to contact. // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. func backfill( - ctx context.Context, roomID, fromEventID string, limit int, - queryAPI api.RoomserverQueryAPI, cfg *config.Dendrite, - federation *gomatrixserverlib.FederationClient, + ctx context.Context, db *storage.SyncServerDatabase, roomID, + fromEventID string, limit int, queryAPI api.RoomserverQueryAPI, + cfg *config.Dendrite, federation *gomatrixserverlib.FederationClient, ) ([]gomatrixserverlib.Event, error) { // Query the list of servers in the room when the earlier event we know // of was sent. @@ -317,11 +361,18 @@ func backfill( return nil, err } - // TODO: Store the events in the database. The remaining question to - // make this possible is what to assign to the new events' stream - // position (negative integers? change the stream position format into a - // timestamp-based one?...) pdus = txn.PDUs + + // Store the events in the database, while marking them as unfit to show + // up in responses to sync requests. + for _, pdu := range pdus { + if _, err = db.WriteEvent( + ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{}, + nil, true, + ); err != nil { + return nil, err + } + } } return pdus, nil diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index 22eb5503..54da893e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -66,7 +66,7 @@ const insertEventSQL = "" + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id" const selectEventsSQL = "" + - "SELECT id, event_json, exclude_from_sync FROM syncapi_output_room_events WHERE event_id = ANY($1)" + "SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + "SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" + diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go index c698c5ad..ed78a379 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go @@ -55,11 +55,16 @@ const selectPositionInTopologySQL = "" + "SELECT topological_position FROM syncapi_output_room_events_topology" + " WHERE event_id = $1" +const selectMaxPositionInTopologySQL = "" + + "SELECT MAX(topological_position) FROM syncapi_output_room_events_topology" + + " WHERE room_id = $1" + type outputRoomEventsTopologyStatements struct { - insertEventInTopologyStmt *sql.Stmt - selectEventIDsInRangeASCStmt *sql.Stmt - selectEventIDsInRangeDESCStmt *sql.Stmt - selectPositionInTopologyStmt *sql.Stmt + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt + selectMaxPositionInTopologyStmt *sql.Stmt } func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { @@ -79,6 +84,9 @@ func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { return } + if s.selectMaxPositionInTopologyStmt, err = db.Prepare(selectMaxPositionInTopologySQL); err != nil { + return + } return } @@ -133,8 +141,15 @@ func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( // selectPositionInTopology returns the position of a given event in the // topology of the room it belongs to. func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( - eventID string, + ctx context.Context, eventID string, ) (pos types.StreamPosition, err error) { - err = s.selectPositionInTopologyStmt.QueryRow(eventID).Scan(&pos) + err = s.selectPositionInTopologyStmt.QueryRowContext(ctx, eventID).Scan(&pos) + return +} + +func (s *outputRoomEventsTopologyStatements) selectMaxPositionInTopology( + ctx context.Context, roomID string, +) (pos types.StreamPosition, err error) { + err = s.selectMaxPositionInTopologyStmt.QueryRowContext(ctx, roomID).Scan(&pos) return } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 5859f96b..be0159c9 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -202,18 +202,71 @@ func (d *SyncServerDatabase) GetStateEventsForRoom( // given extremities and limit. func (d *SyncServerDatabase) GetEventsInRange( ctx context.Context, - from, to types.StreamPosition, + from, to *types.PaginationToken, roomID string, limit int, backwardOrdering bool, ) (events []StreamEvent, err error) { + // If the pagination token's type is types.PaginationTokenTypeTopology, the + // events must be retrieved from the rooms' topology table rather than the + // table contaning the syncapi server's whole stream of events. + if from.Type == types.PaginationTokenTypeTopology { + // Determine the backward and forward limit, i.e. the upper and lower + // limits to the selection in the room's topology, from the direction. + var backwardLimit, forwardLimit types.StreamPosition + if backwardOrdering { + // Backward ordering is antichronological (latest event to oldest + // one). + backwardLimit = to.Position + forwardLimit = from.Position + } else { + // Forward ordering is chronological (oldest event to latest one). + backwardLimit = from.Position + forwardLimit = to.Position + } - if backwardOrdering { - // We need all events matching to < streamPos < from - return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false, false) + // Select the event IDs from the defined range. + var eIDs []string + eIDs, err = d.topology.selectEventIDsInRange( + ctx, roomID, backwardLimit, forwardLimit, !backwardOrdering, + ) + if err != nil { + return + } + + // Retrieve the events' contents using their IDs. + events, err = d.events.selectEvents(ctx, nil, eIDs) + return } - // We need all events from < streamPos < to - return d.events.selectEarlyEvents(ctx, nil, roomID, from, to, limit) + // If the pagination token's type is types.PaginationTokenTypeStream, the + // events must be retrieved from the table contaning the syncapi server's + // whole stream of events. + + if backwardOrdering { + // When using backward ordering, we want the most recent events first. + if events, err = d.events.selectRecentEvents( + ctx, nil, roomID, to.Position, from.Position, limit, false, false, + ); err != nil { + return + } + } else { + // When using forward ordering, we want the least recent events first. + if events, err = d.events.selectEarlyEvents( + ctx, nil, roomID, from.Position, to.Position, limit, + ); err != nil { + return + } + } + + return +} + +// MaxTopologicalPosition returns the highest topological position for a given +// room. +func (d *SyncServerDatabase) MaxTopologicalPosition( + ctx context.Context, roomID string, +) (types.StreamPosition, error) { + return d.topology.selectMaxPositionInTopology(ctx, roomID) } // SyncStreamPosition returns the latest position in the sync stream. Returns 0 if there are no events yet. @@ -338,7 +391,7 @@ func (d *SyncServerDatabase) CompleteSync( // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. var backwardTopologyPos types.StreamPosition - backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID()) + backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) if err != nil { return nil, err } @@ -483,7 +536,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // Retrieve the backward topology position, i.e. the position of the // oldest event in the room's topology. var backwardTopologyPos types.StreamPosition - backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID()) + backwardTopologyPos, err = d.topology.selectPositionInTopology(ctx, recentStreamEvents[0].EventID()) if err != nil { return err }