diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 87a625b8..c242e552 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -70,7 +70,7 @@ func main() { federationsender.SetupFederationSenderComponent(base, federation, query) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) - syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query) + syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) httpHandler := common.WrapHandlerInCORS(base.APIMux) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index 343d3567..462187e7 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -26,10 +26,11 @@ func main() { deviceDB := base.CreateDeviceDB() accountDB := base.CreateAccountsDB() + federation := base.CreateFederationClient() _, _, query := base.CreateHTTPRoomserverAPIs() - syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query) + syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) base.SetupAndServeHTTP(string(base.Cfg.Listen.SyncAPI)) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index a544f8aa..a990cd0b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -230,6 +230,20 @@ type QueryBackfillResponse struct { Events []gomatrixserverlib.Event `json:"events"` } +// QueryServersInRoomAtEventRequest is a request to QueryServersInRoomAtEvent +type QueryServersInRoomAtEventRequest struct { + // ID of the room to retrieve member servers for. + RoomID string `json:"room_id"` + // ID of the event for which to retrieve member servers. + EventID string `json:"event_id"` +} + +// QueryServersInRoomAtEventResponse is a response to QueryServersInRoomAtEvent +type QueryServersInRoomAtEventResponse struct { + // Servers present in the room for these events. + Servers []gomatrixserverlib.ServerName `json:"servers"` +} + // RoomserverQueryAPI is used to query information from the room server. type RoomserverQueryAPI interface { // Query the latest events and state for a room from the room server. @@ -303,6 +317,12 @@ type RoomserverQueryAPI interface { request *QueryBackfillRequest, response *QueryBackfillResponse, ) error + + QueryServersInRoomAtEvent( + ctx context.Context, + request *QueryServersInRoomAtEventRequest, + response *QueryServersInRoomAtEventResponse, + ) error } // RoomserverQueryLatestEventsAndStatePath is the HTTP path for the QueryLatestEventsAndState API. @@ -333,7 +353,10 @@ const RoomserverQueryMissingEventsPath = "/api/roomserver/queryMissingEvents" const RoomserverQueryStateAndAuthChainPath = "/api/roomserver/queryStateAndAuthChain" // RoomserverQueryBackfillPath is the HTTP path for the QueryMissingEvents API -const RoomserverQueryBackfillPath = "/api/roomserver/QueryBackfill" +const RoomserverQueryBackfillPath = "/api/roomserver/queryBackfill" + +// RoomserverQueryServersInRoomAtEvent is the HTTP path for the QueryServersInRoomAtEvent API +const RoomserverQueryServersInRoomAtEvent = "/api/roomserver/queryServersInRoomAtEvents" // NewRoomserverQueryAPIHTTP creates a RoomserverQueryAPI implemented by talking to a HTTP POST API. // If httpClient is nil then it uses the http.DefaultClient @@ -475,6 +498,19 @@ func (h *httpRoomserverQueryAPI) QueryBackfill( span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") defer span.Finish() - apiURL := h.roomserverURL + RoomserverQueryMissingEventsPath + apiURL := h.roomserverURL + RoomserverQueryBackfillPath + return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +// QueryServersInRoomAtEvent implements RoomServerQueryAPI +func (h *httpRoomserverQueryAPI) QueryServersInRoomAtEvent( + ctx context.Context, + request *QueryServersInRoomAtEventRequest, + response *QueryServersInRoomAtEventResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBackfill") + defer span.Finish() + + apiURL := h.roomserverURL + RoomserverQueryServersInRoomAtEvent return commonHTTP.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 39e9333c..b4ac3891 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -643,6 +643,41 @@ func getAuthChain( return authEvents, nil } +// QueryServersInRoomAtEvent implements api.RoomserverQueryAPI +func (r *RoomserverQueryAPI) QueryServersInRoomAtEvent( + ctx context.Context, + request *api.QueryServersInRoomAtEventRequest, + response *api.QueryServersInRoomAtEventResponse, +) error { + // getMembershipsBeforeEventNID requires a NID, so retrieving the NID for + // the event is necessary. + NIDs, err := r.DB.EventNIDs(ctx, []string{request.EventID}) + if err != nil { + return err + } + + // Retrieve all "m.room.member" state events of "join" membership, which + // contains the list of users in the room before the event, therefore all + // the servers in it at that moment. + events, err := r.getMembershipsBeforeEventNID(ctx, NIDs[request.EventID], true) + if err != nil { + return err + } + + // Store the server names in a temporary map to avoid duplicates. + servers := make(map[gomatrixserverlib.ServerName]bool) + for _, event := range events { + servers[event.Origin()] = true + } + + // Populate the response. + for server := range servers { + response.Servers = append(response.Servers, server) + } + + return nil +} + // SetupHTTP adds the RoomserverQueryAPI handlers to the http.ServeMux. // nolint: gocyclo func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { @@ -786,4 +821,18 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + servMux.Handle( + api.RoomserverQueryServersInRoomAtEvent, + common.MakeInternalAPI("QueryServersInRoomAtEvent", func(req *http.Request) util.JSONResponse { + var request api.QueryServersInRoomAtEventRequest + var response api.QueryServersInRoomAtEventResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := r.QueryServersInRoomAtEvent(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go index 68675e16..4bee3275 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go @@ -15,12 +15,14 @@ package routing import ( - // "encoding/json" + "context" "net/http" "strconv" - // "github.com/matrix-org/dendrite/clientapi/httputil" + "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" @@ -36,9 +38,18 @@ type messageResp struct { const defaultMessagesLimit = 10 -func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase, roomID string) util.JSONResponse { +// OnIncomingMessagesRequest implements the /messages endpoint from the +// client-server API. +// See: https://matrix.org/docs/spec/client_server/r0.4.0.html#get-matrix-client-r0-rooms-roomid-messages +func OnIncomingMessagesRequest( + req *http.Request, db *storage.SyncServerDatabase, roomID string, + federation *gomatrixserverlib.FederationClient, + queryAPI api.RoomserverQueryAPI, + cfg *config.Dendrite, +) util.JSONResponse { var from, to int var err error + // Extract parameters from the request's URL. // Pagination tokens. from, err = strconv.Atoi(req.URL.Query().Get("from")) @@ -58,8 +69,12 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase JSON: jsonerror.MissingArgument("Bad or missing dir query parameter (should be either 'b' or 'f')"), } } + // A boolean is easier to handle in this case, especially since dir is sure + // to have one of the two accepted values (so dir == "f" <=> !backwardOrdering). backwardOrdering := (dir == "b") + // Pagination tokens. To is optional, and its default value depends on the + // direction ("b" or "f"). toStr := req.URL.Query().Get("to") var toPos types.StreamPosition if len(toStr) > 0 { @@ -72,13 +87,12 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase } toPos = types.StreamPosition(to) } else { - if backwardOrdering { - toPos = types.StreamPosition(0) - } else { - toPos, err = db.SyncStreamPosition(req.Context()) - if err != nil { - return jsonerror.InternalServerError() - } + // If "to" isn't provided, it defaults to either the earliest stream + // position (if we're going backward) or to the latest one (if we're + // going forward). + toPos, err = setToDefault(req.Context(), backwardOrdering, db) + if err != nil { + return httputil.LogThenError(req, err) } } @@ -104,14 +118,68 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase } } - streamEvents, err := db.GetEventsInRange( - req.Context(), fromPos, toPos, roomID, limit, backwardOrdering, + clientEvents, start, end, err := retrieveEvents( + req.Context(), db, roomID, fromPos, toPos, toStr, limit, backwardOrdering, + federation, queryAPI, cfg, ) if err != nil { - return jsonerror.InternalServerError() + return httputil.LogThenError(req, err) } - // Check if we don't have enough events, i.e. len(sev) < limit and the events + // Respond with the events. + return util.JSONResponse{ + Code: http.StatusOK, + JSON: messageResp{ + Chunk: clientEvents, + Start: start, + End: end, + }, + } +} + +// setToDefault returns the default value for the "to" query parameter of a +// request to /messages if not provided. It defaults to either the earliest +// stream position (if we're going backward) or to the latest one (if we're +// going forward). +// Returns an error if there was an issue with retrieving the latest position +// from the database +func setToDefault( + ctx context.Context, backwardOrdering bool, db *storage.SyncServerDatabase, +) (toPos types.StreamPosition, err error) { + if backwardOrdering { + toPos = types.StreamPosition(1) + } else { + toPos, err = db.SyncStreamPosition(ctx) + if err != nil { + return + } + } + + return +} + +// retrieveEvents retrieve events from the local database for a request on +// /messages. If there's not enough events to retrieve, it asks another +// homeserver in the room for older events. +// Returns an error if there was an issue talking to the database or with the +// remote homeserver. +func retrieveEvents( + ctx context.Context, db *storage.SyncServerDatabase, roomID string, + fromPos, toPos types.StreamPosition, toStr string, limit int, + backwardOrdering bool, + federation *gomatrixserverlib.FederationClient, + queryAPI api.RoomserverQueryAPI, + cfg *config.Dendrite, +) (clientEvents []gomatrixserverlib.ClientEvent, start string, end string, err error) { + // Retrieve the events from the local database. + streamEvents, err := db.GetEventsInRange( + ctx, fromPos, toPos, roomID, limit, backwardOrdering, + ) + if err != nil { + return + } + + // Check if we have enough events. isSetLargeEnough := true if len(streamEvents) < limit { if backwardOrdering { @@ -121,50 +189,121 @@ func OnIncomingMessagesRequest(req *http.Request, db *storage.SyncServerDatabase isSetLargeEnough = (toPos-1 == streamEvents[0].StreamPosition) } } else { - // We need all events from < streamPos < to isSetLargeEnough = (fromPos-1 == streamEvents[0].StreamPosition) } } + // Check if earliest event is a backward extremity, i.e. if one of its - // previous events is missing from the db. + // previous events is missing from the database. + // Get the earliest retrieved event's parents. prevIDs := streamEvents[0].PrevEventIDs() - prevs, err := db.Events(req.Context(), prevIDs) + prevs, err := db.Events(ctx, prevIDs) + if err != nil { + return + } + // Check if we have all of the events we requested. If not, it means we've + // reached a backard extremity. var eventInDB, isBackwardExtremity bool var id string + // Iterate over the IDs we used in the request. for _, id = range prevIDs { eventInDB = false + // Iterate over the events we got in response. for _, ev := range prevs { if ev.EventID() == id { eventInDB = true } } + // One occurrence of one the event's parents not being present in the + // database is enough to say that the event is a backward extremity. if !eventInDB { isBackwardExtremity = true break } } - if isBackwardExtremity && !isSetLargeEnough { - log.WithFields(log.Fields{ - "limit": limit, - "nb_events": len(streamEvents), - "from": fromPos.String(), - "to": toPos.String(), - "isBackwardExtremity": isBackwardExtremity, - "isSetLargeEnough": isSetLargeEnough, - }).Info("Backfilling!") - println("Backfilling!") + var events []gomatrixserverlib.Event + + // Backfill is needed if we've reached a backward extremity and need more + // events. It's only needed if the direction is backard. + if isBackwardExtremity && !isSetLargeEnough && backwardOrdering { + var pdus []gomatrixserverlib.Event + // Only ask the remote server for enough events to reach the limit. + pdus, err = backfill( + ctx, roomID, streamEvents[0].EventID(), limit-len(streamEvents), + queryAPI, cfg, federation, + ) + if err != nil { + return + } + + events = append(events, pdus...) } - events := storage.StreamEventsToEvents(nil, streamEvents) - clientEvents := gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll) + // Append the events we retrieved locally, then convert them into client + // events. + events = append(events, storage.StreamEventsToEvents(nil, streamEvents)...) + clientEvents = gomatrixserverlib.ToClientEvents(events, gomatrixserverlib.FormatAll) + start = streamEvents[0].StreamPosition.String() + end = streamEvents[len(streamEvents)-1].StreamPosition.String() - return util.JSONResponse{ - Code: http.StatusOK, - JSON: messageResp{ - Chunk: clientEvents, - Start: streamEvents[0].StreamPosition.String(), - End: streamEvents[len(streamEvents)-1].StreamPosition.String(), - }, - } + return +} + +// backfill performs a backfill request over the federation on another +// homeserver in the room. +// See: https://matrix.org/docs/spec/server_server/unstable.html#get-matrix-federation-v1-backfill-roomid +// Returns with an empty string if the remote homeserver didn't return with any +// event, or if there is no remote homeserver to contact. +// Returns an error if there was an issue with retrieving the list of servers in +// the room or sending the request. +func backfill( + ctx context.Context, roomID, fromEventID string, limit int, + queryAPI api.RoomserverQueryAPI, cfg *config.Dendrite, + federation *gomatrixserverlib.FederationClient, +) ([]gomatrixserverlib.Event, error) { + // Query the list of servers in the room when the earlier event we know + // of was sent. + var serversResponse api.QueryServersInRoomAtEventResponse + serversRequest := api.QueryServersInRoomAtEventRequest{ + RoomID: roomID, + EventID: fromEventID, + } + if err := queryAPI.QueryServersInRoomAtEvent(ctx, &serversRequest, &serversResponse); err != nil { + return nil, err + } + + // Use the first server from the response, except if that server is us. + // In that case, use the second one if the roomserver responded with + // enough servers. If not, use an empty string to prevent the backfill + // from happening as there's no server to direct the request towards. + // TODO: Be smarter at selecting the server to direct the request + // towards. + srvToBackfillFrom := serversResponse.Servers[0] + if srvToBackfillFrom == cfg.Matrix.ServerName { + if len(serversResponse.Servers) > 1 { + srvToBackfillFrom = serversResponse.Servers[1] + } else { + srvToBackfillFrom = gomatrixserverlib.ServerName("") + log.Warn("Not enough servers to backfill from") + } + } + + pdus := make([]gomatrixserverlib.Event, 0) + + // If the roomserver responded with at least one server that isn't us, + // send it a request for backfill. + if len(srvToBackfillFrom) > 0 { + txn, err := federation.Backfill( + ctx, srvToBackfillFrom, roomID, limit, []string{fromEventID}, + ) + if err != nil { + return nil, err + } + + // TODO: Store the events in the database. + pdus = txn.PDUs + } + + return pdus, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go index 5a47de73..d2364a7f 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/routing.go @@ -22,15 +22,24 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) const pathPrefixR0 = "/_matrix/client/r0" // Setup configures the given mux with sync-server listeners -func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServerDatabase, deviceDB *devices.Database) { +func Setup( + apiMux *mux.Router, srp *sync.RequestPool, + syncDB *storage.SyncServerDatabase, deviceDB *devices.Database, + federation *gomatrixserverlib.FederationClient, + queryAPI api.RoomserverQueryAPI, + cfg *config.Dendrite, +) { r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() authData := auth.Data{nil, deviceDB, nil} @@ -57,6 +66,6 @@ func Setup(apiMux *mux.Router, srp *sync.RequestPool, syncDB *storage.SyncServer r0mux.Handle("/rooms/{roomID}/messages", common.MakeAuthAPI("room_messages", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { vars := mux.Vars(req) - return OnIncomingMessagesRequest(req, syncDB, vars["roomID"]) + return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], federation, queryAPI, cfg) })).Methods(http.MethodGet, http.MethodOptions) } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index f0d73524..a37000c0 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -329,7 +329,7 @@ func (d *SyncServerDatabase) CompleteSync( recentEvents := StreamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { + if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 { jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() } else { jr.Timeline.PrevBatch = types.StreamPosition(1).String() @@ -463,7 +463,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( switch delta.membership { case "join": jr := types.NewJoinResponse() - if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { + if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 { jr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() } else { jr.Timeline.PrevBatch = types.StreamPosition(1).String() @@ -478,7 +478,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - if prevBatch := recentStreamEvents[0].streamPosition - 1; prevBatch > 0 { + if prevBatch := recentStreamEvents[0].StreamPosition - 1; prevBatch > 0 { lr.Timeline.PrevBatch = types.StreamPosition(prevBatch).String() } else { lr.Timeline.PrevBatch = types.StreamPosition(1).String() diff --git a/src/github.com/matrix-org/dendrite/syncapi/syncapi.go b/src/github.com/matrix-org/dendrite/syncapi/syncapi.go index 2db54c3c..abafdd4e 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/syncapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/syncapi.go @@ -21,6 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" @@ -29,6 +30,8 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" + + "github.com/matrix-org/gomatrixserverlib" ) // SetupSyncAPIComponent sets up and registers HTTP handlers for the SyncAPI @@ -38,6 +41,8 @@ func SetupSyncAPIComponent( deviceDB *devices.Database, accountsDB *accounts.Database, queryAPI api.RoomserverQueryAPI, + federation *gomatrixserverlib.FederationClient, + cfg *config.Dendrite, ) { syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI)) if err != nil { @@ -71,5 +76,5 @@ func SetupSyncAPIComponent( logrus.WithError(err).Panicf("failed to start client data consumer") } - routing.Setup(base.APIMux, requestPool, syncDB, deviceDB) + routing.Setup(base.APIMux, requestPool, syncDB, deviceDB, federation, queryAPI, cfg) }