From 4a3c9555b16f2d81e2be9657dcc2c6ef21a8803b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Tue, 13 Nov 2018 18:07:27 +0000 Subject: [PATCH] Refactor by implementing the messagesReq structure --- .../dendrite/syncapi/routing/messages.go | 311 ++++++++++-------- 1 file changed, 180 insertions(+), 131 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go index ee88cb80..9ed1ca9e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go @@ -31,6 +31,20 @@ import ( log "github.com/sirupsen/logrus" ) +type messagesReq struct { + ctx context.Context + db *storage.SyncServerDatabase + queryAPI api.RoomserverQueryAPI + federation *gomatrixserverlib.FederationClient + cfg *config.Dendrite + roomID string + from *types.PaginationToken + to *types.PaginationToken + wasToProvided bool + limit int + backwardOrdering bool +} + type messageResp struct { Start string `json:"start"` End string `json:"end"` @@ -75,7 +89,7 @@ func OnIncomingMessagesRequest( // Pagination tokens. To is optional, and its default value depends on the // direction ("b" or "f"). var to *types.PaginationToken - var toDefault bool + wasToProvided := true if s := req.URL.Query().Get("to"); len(s) > 0 { to, err = types.NewPaginationTokenFromString(s) if err != nil { @@ -92,7 +106,7 @@ func OnIncomingMessagesRequest( if err != nil { return httputil.LogThenError(req, err) } - toDefault = true + wasToProvided = false } // Maximum number of events to return; defaults to 10. @@ -117,10 +131,21 @@ func OnIncomingMessagesRequest( } } - clientEvents, start, end, err := retrieveEvents( - req.Context(), db, roomID, from, to, toDefault, limit, backwardOrdering, - federation, queryAPI, cfg, - ) + mReq := messagesReq{ + ctx: req.Context(), + db: db, + queryAPI: queryAPI, + federation: federation, + cfg: cfg, + roomID: roomID, + from: from, + to: to, + wasToProvided: wasToProvided, + limit: limit, + backwardOrdering: backwardOrdering, + } + + clientEvents, start, end, err := mReq.retrieveEvents() if err != nil { return httputil.LogThenError(req, err) } @@ -136,132 +161,43 @@ func OnIncomingMessagesRequest( } } -// setToDefault returns the default value for the "to" query parameter of a -// request to /messages if not provided. It defaults to either the earliest -// topological position (if we're going backward) or to the latest one (if we're -// going forward). -// Returns an error if there was an issue with retrieving the latest position -// from the database -func setToDefault( - ctx context.Context, db *storage.SyncServerDatabase, backwardOrdering bool, - roomID string, -) (to *types.PaginationToken, err error) { - if backwardOrdering { - to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1) - } else { - var pos types.StreamPosition - pos, err = db.MaxTopologicalPosition(ctx, roomID) - if err != nil { - return - } - - to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos) - } - - return -} - // retrieveEvents retrieve events from the local database for a request on // /messages. If there's not enough events to retrieve, it asks another // homeserver in the room for older events. // Returns an error if there was an issue talking to the database or with the // remote homeserver. -func retrieveEvents( - ctx context.Context, db *storage.SyncServerDatabase, roomID string, - from, to *types.PaginationToken, toDefault bool, limit int, - backwardOrdering bool, - federation *gomatrixserverlib.FederationClient, - queryAPI api.RoomserverQueryAPI, - cfg *config.Dendrite, -) (clientEvents []gomatrixserverlib.ClientEvent, start, end *types.PaginationToken, err error) { +func (r *messagesReq) retrieveEvents() ( + clientEvents []gomatrixserverlib.ClientEvent, start, + end *types.PaginationToken, err error, +) { // Retrieve the events from the local database. - streamEvents, err := db.GetEventsInRange( - ctx, from, to, roomID, limit, backwardOrdering, + streamEvents, err := r.db.GetEventsInRange( + r.ctx, r.from, r.to, r.roomID, r.limit, r.backwardOrdering, ) if err != nil { return } - var backwardExtremity bool var events []gomatrixserverlib.Event // There can be two reasons for streamEvents to be empty: either we've // reached the oldest event in the room (or the most recent one, depending // on the ordering), or we've reached a backward extremity. if len(streamEvents) == 0 { - var evs []storage.StreamEvent - var laterPosition types.StreamPosition - if backwardOrdering { - laterPosition = from.Position + 1 - } else { - laterPosition = to.Position + 1 - } - - evs, err = db.EventsAtTopologicalPosition(ctx, roomID, laterPosition) - if err != nil { + if events, err = r.handleEmptyEventsSlice(); err != nil { return - } - - backwardExtremity, err = containsBackwardExtremity(ctx, db, evs, backwardOrdering) - if err != nil { - return - } - - if backwardExtremity { - events, err = backfill(ctx, db, roomID, evs[0].EventID(), limit, queryAPI, cfg, federation) - if err != nil { - return - } - } else { - return []gomatrixserverlib.ClientEvent{}, from, to, nil + } else if len(events) == 0 { + return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil } } else { - // Check if we have enough events. - isSetLargeEnough := true - if len(streamEvents) < limit { - if backwardOrdering { - if !toDefault { - // The condition in the SQL query is a strict "greater than" so - // we need to check against to-1. - isSetLargeEnough = (to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition) - } - } else { - isSetLargeEnough = (from.Position-1 == streamEvents[0].StreamPosition) - } - } - - // Check if the slice contains a backward extremity. - backwardExtremity, err = containsBackwardExtremity( - ctx, db, streamEvents, backwardOrdering, - ) - if err != nil { + if events, err = r.handleNonEmptyEventsSlice(streamEvents); err != nil { return } - - // Backfill is needed if we've reached a backward extremity and need more - // events. It's only needed if the direction is backward. - if backwardExtremity && !isSetLargeEnough && backwardOrdering { - var pdus []gomatrixserverlib.Event - // Only ask the remote server for enough events to reach the limit. - pdus, err = backfill( - ctx, db, roomID, streamEvents[0].EventID(), limit-len(streamEvents), - queryAPI, cfg, federation, - ) - if err != nil { - return - } - - // Append the PDUs to the list to send back to the client. - events = append(events, pdus...) - } - - // Append the events ve previously retrieved locally. - events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...) } // Sort the events to ensure we send them in the right order. We currently // do that based on the event's timestamp. - if backwardOrdering { + if r.backwardOrdering { sort.SliceStable(events, func(i int, j int) bool { // Backward ordering is antichronological (latest event to oldest // one). @@ -291,13 +227,99 @@ func retrieveEvents( return } -// sortEvents is a function to give to sort.SliceStable, and compares the -// timestamp of two Matrix events. -// Returns true if the first event happened before the second one, false -// otherwise. -func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool { - t := e1.OriginServerTS().Time() - return e2.OriginServerTS().Time().After(t) +// handleEmptyEventsSlice handles the case where the initial request to the +// database returned an empty slice of events. It does so by checking whether +// the set is empty because we've reached a backward extremity, and if that is +// the case, by retrieving as much events as requested by backfilling from +// another homeserver. +// Returns an error if there was an issue talking with the database or +// backfilling. +func (r *messagesReq) handleEmptyEventsSlice() ( + events []gomatrixserverlib.Event, err error, +) { + var evs []storage.StreamEvent + // Determine what could be the oldest position of interest in the room's + // topology for this. + var laterPosition types.StreamPosition + if r.backwardOrdering { + laterPosition = r.from.Position + 1 + } else { + laterPosition = r.to.Position + 1 + } + + // Retrieve events at that position. + evs, err = r.db.EventsAtTopologicalPosition(r.ctx, r.roomID, laterPosition) + if err != nil { + return + } + + // Check if one of these events is a backward extremity. + backwardExtremity, err := r.containsBackwardExtremity(evs) + if err != nil { + return + } + + // If so, retrieve as much events as requested through backfilling. + if backwardExtremity { + events, err = r.backfill(evs[0].EventID(), r.limit) + if err != nil { + return + } + } else { + // If not, it means the slice was empty because we reached the limit of + // the room's topology, so return an empty slice. + events = []gomatrixserverlib.Event{} + } + + return +} + +// handleNonEmptyEventsSlice handles the case where the initial request to the +// database returned a non-empty slice of events. It does so by checking whether +// events are missing from the expected result, and retrieve missing events +// through backfilling if needed. +// Returns an error if there was an issue while backfilling. +func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []storage.StreamEvent) ( + events []gomatrixserverlib.Event, err error, +) { + // Check if we have enough events. + isSetLargeEnough := true + if len(streamEvents) < r.limit { + if r.backwardOrdering { + if r.wasToProvided { + // The condition in the SQL query is a strict "greater than" so + // we need to check against to-1. + isSetLargeEnough = (r.to.Position-1 == streamEvents[len(streamEvents)-1].StreamPosition) + } + } else { + isSetLargeEnough = (r.from.Position-1 == streamEvents[0].StreamPosition) + } + } + + // Check if the slice contains a backward extremity. + backwardExtremity, err := r.containsBackwardExtremity(streamEvents) + if err != nil { + return + } + + // Backfill is needed if we've reached a backward extremity and need more + // events. It's only needed if the direction is backward. + if backwardExtremity && !isSetLargeEnough && r.backwardOrdering { + var pdus []gomatrixserverlib.Event + // Only ask the remote server for enough events to reach the limit. + pdus, err = r.backfill(streamEvents[0].EventID(), r.limit-len(streamEvents)) + if err != nil { + return + } + + // Append the PDUs to the list to send back to the client. + events = append(events, pdus...) + } + + // Append the events ve previously retrieved locally. + events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...) + + return } // containsBackwardExtremity checks if a slice of StreamEvent contains a @@ -306,20 +328,17 @@ func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool { // considers the event itself a backward extremity if at least one of the parent // events doesn't exist in the database. // Returns an error if there was an issue with talking to the database. -func containsBackwardExtremity( - ctx context.Context, db *storage.SyncServerDatabase, - events []storage.StreamEvent, backwardOrdering bool, -) (bool, error) { +func (r *messagesReq) containsBackwardExtremity(events []storage.StreamEvent) (bool, error) { // Select the earliest retrieved event. var ev *storage.StreamEvent - if backwardOrdering { + if r.backwardOrdering { ev = &(events[len(events)-1]) } else { ev = &(events[0]) } // Get the earliest retrieved event's parents. prevIDs := ev.PrevEventIDs() - prevs, err := db.Events(ctx, prevIDs) + prevs, err := r.db.Events(r.ctx, prevIDs) if err != nil { return false, nil } @@ -355,19 +374,15 @@ func containsBackwardExtremity( // event, or if there is no remote homeserver to contact. // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. -func backfill( - ctx context.Context, db *storage.SyncServerDatabase, roomID, - fromEventID string, limit int, queryAPI api.RoomserverQueryAPI, - cfg *config.Dendrite, federation *gomatrixserverlib.FederationClient, -) ([]gomatrixserverlib.Event, error) { +func (r *messagesReq) backfill(fromEventID string, limit int) ([]gomatrixserverlib.Event, error) { // Query the list of servers in the room when the earlier event we know // of was sent. var serversResponse api.QueryServersInRoomAtEventResponse serversRequest := api.QueryServersInRoomAtEventRequest{ - RoomID: roomID, + RoomID: r.roomID, EventID: fromEventID, } - if err := queryAPI.QueryServersInRoomAtEvent(ctx, &serversRequest, &serversResponse); err != nil { + if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil { return nil, err } @@ -378,7 +393,7 @@ func backfill( // TODO: Be smarter at selecting the server to direct the request // towards. srvToBackfillFrom := serversResponse.Servers[0] - if srvToBackfillFrom == cfg.Matrix.ServerName { + if srvToBackfillFrom == r.cfg.Matrix.ServerName { if len(serversResponse.Servers) > 1 { srvToBackfillFrom = serversResponse.Servers[1] } else { @@ -392,8 +407,8 @@ func backfill( // If the roomserver responded with at least one server that isn't us, // send it a request for backfill. if len(srvToBackfillFrom) > 0 { - txn, err := federation.Backfill( - ctx, srvToBackfillFrom, roomID, limit, []string{fromEventID}, + txn, err := r.federation.Backfill( + r.ctx, srvToBackfillFrom, r.roomID, limit, []string{fromEventID}, ) if err != nil { return nil, err @@ -404,8 +419,8 @@ func backfill( // Store the events in the database, while marking them as unfit to show // up in responses to sync requests. for _, pdu := range pdus { - if _, err = db.WriteEvent( - ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{}, + if _, err = r.db.WriteEvent( + r.ctx, &pdu, []gomatrixserverlib.Event{}, []string{}, []string{}, nil, true, ); err != nil { return nil, err @@ -415,3 +430,37 @@ func backfill( return pdus, nil } + +// setToDefault returns the default value for the "to" query parameter of a +// request to /messages if not provided. It defaults to either the earliest +// topological position (if we're going backward) or to the latest one (if we're +// going forward). +// Returns an error if there was an issue with retrieving the latest position +// from the database +func setToDefault( + ctx context.Context, db *storage.SyncServerDatabase, backwardOrdering bool, + roomID string, +) (to *types.PaginationToken, err error) { + if backwardOrdering { + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1) + } else { + var pos types.StreamPosition + pos, err = db.MaxTopologicalPosition(ctx, roomID) + if err != nil { + return + } + + to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, pos) + } + + return +} + +// sortEvents is a function to give to sort.SliceStable, and compares the +// timestamp of two Matrix events. +// Returns true if the first event happened before the second one, false +// otherwise. +func sortEvents(e1 *gomatrixserverlib.Event, e2 *gomatrixserverlib.Event) bool { + t := e1.OriginServerTS().Time() + return e2.OriginServerTS().Time().After(t) +}