From 739acc12911840f2bdac1ec3332f22422a91bc06 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 8 Dec 2021 14:47:47 +0000 Subject: [PATCH] Add query functions in the federation API --- federationapi/api/api.go | 49 +++++++++++++ federationapi/internal/query.go | 123 ++++++++++++++++++++++++++++++++ federationapi/inthttp/client.go | 42 +++++++++++ federationapi/inthttp/server.go | 42 +++++++++++ 4 files changed, 256 insertions(+) diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 5d4eb884..6309ff21 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -60,6 +60,25 @@ type FederationInternalAPI interface { request *QueryJoinedHostServerNamesInRoomRequest, response *QueryJoinedHostServerNamesInRoomResponse, ) error + + QueryEventAuthFromFederation( + ctx context.Context, + request *QueryEventAuthFromFederationRequest, + response *QueryEventAuthFromFederationResponse, + ) error + + QueryStateIDsFromFederation( + ctx context.Context, + request *QueryStateIDsFromFederationRequest, + response *QueryStateIDsFromFederationResponse, + ) error + + QueryStateFromFederation( + ctx context.Context, + request *QueryStateFromFederationRequest, + response *QueryStateFromFederationResponse, + ) error + // Handle an instruction to make_join & send_join with a remote server. PerformJoin( ctx context.Context, @@ -194,6 +213,36 @@ type QueryJoinedHostServerNamesInRoomResponse struct { ServerNames []gomatrixserverlib.ServerName `json:"server_names"` } +type QueryEventAuthFromFederationRequest struct { + RoomID string `json:"room_id"` + EventID string `json:"event_id"` +} + +type QueryEventAuthFromFederationResponse struct { + Events []*gomatrixserverlib.Event `json:"events"` +} + +type QueryStateIDsFromFederationRequest struct { + RoomID string `json:"room_id"` + EventID string `json:"event_id"` +} + +type QueryStateIDsFromFederationResponse struct { + AuthEventIDs []string `json:"auth_event_ids"` + StateEventIDs []string `json:"state_event_ids"` +} + +type QueryStateFromFederationRequest struct { + RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` + RoomID string `json:"room_id"` + EventID string `json:"event_id"` +} + +type QueryStateFromFederationResponse struct { + AuthEvents []*gomatrixserverlib.Event `json:"auth_events"` + StateEvents []*gomatrixserverlib.Event `json:"state_events"` +} + type PerformBroadcastEDURequest struct { } diff --git a/federationapi/internal/query.go b/federationapi/internal/query.go index bac81333..44305b23 100644 --- a/federationapi/internal/query.go +++ b/federationapi/internal/query.go @@ -25,6 +25,129 @@ func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom( return } +func (f *FederationInternalAPI) QueryEventAuthFromFederation( + ctx context.Context, + request *api.QueryEventAuthFromFederationRequest, + response *api.QueryEventAuthFromFederationResponse, +) error { + joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID}) + if err != nil { + return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err) + } + + tryHost := func(serverName gomatrixserverlib.ServerName) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + ires, err := f.doRequest(serverName, func() (interface{}, error) { + return f.federation.GetEventAuth( + ctx, + serverName, + request.RoomID, + request.EventID, + ) + }) + if err != nil { + return fmt.Errorf("f.doRequest: %w", err) + } + tx := ires.(gomatrixserverlib.RespEventAuth) + response.Events = tx.AuthEvents + return nil + } + + var lasterr error + for _, host := range joinedHosts { + if lasterr = tryHost(host); lasterr != nil { + continue + } + break + } + + return lasterr +} + +func (f *FederationInternalAPI) QueryStateIDsFromFederation( + ctx context.Context, + request *api.QueryStateIDsFromFederationRequest, + response *api.QueryStateIDsFromFederationResponse, +) error { + joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID}) + if err != nil { + return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err) + } + + tryHost := func(serverName gomatrixserverlib.ServerName) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + ires, err := f.doRequest(serverName, func() (interface{}, error) { + return f.federation.LookupStateIDs( + ctx, + serverName, + request.RoomID, + request.EventID, + ) + }) + if err != nil { + return fmt.Errorf("f.doRequest: %w", err) + } + tx := ires.(gomatrixserverlib.RespStateIDs) + response.AuthEventIDs = tx.AuthEventIDs + response.StateEventIDs = tx.StateEventIDs + return nil + } + + var lasterr error + for _, host := range joinedHosts { + if lasterr = tryHost(host); lasterr != nil { + continue + } + break + } + + return lasterr +} + +func (f *FederationInternalAPI) QueryStateFromFederation( + ctx context.Context, + request *api.QueryStateFromFederationRequest, + response *api.QueryStateFromFederationResponse, +) error { + joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID}) + if err != nil { + return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err) + } + + tryHost := func(serverName gomatrixserverlib.ServerName) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + ires, err := f.doRequest(serverName, func() (interface{}, error) { + return f.federation.LookupState( + ctx, + serverName, + request.RoomID, + request.EventID, + request.RoomVersion, + ) + }) + if err != nil { + return fmt.Errorf("f.doRequest: %w", err) + } + tx := ires.(gomatrixserverlib.RespState) + response.AuthEvents = tx.AuthEvents + response.StateEvents = tx.StateEvents + return nil + } + + var lasterr error + for _, host := range joinedHosts { + if lasterr = tryHost(host); lasterr != nil { + continue + } + break + } + + return lasterr +} + func (a *FederationInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index af6b801b..abca8651 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -17,6 +17,9 @@ import ( const ( FederationAPIQueryJoinedHostServerNamesInRoomPath = "/federationapi/queryJoinedHostServerNamesInRoom" FederationAPIQueryServerKeysPath = "/federationapi/queryServerKeys" + FederationAPIQueryEventAuthFromFederationPath = "/federationapi/queryEventAuthFromFederation" + FederationAPIQueryStateIDsFromFederationPath = "/federationapi/queryStateIDsFromFederation" + FederationAPIQueryStateFromFederationPath = "/federationapi/queryStateFromFederation" FederationAPIPerformDirectoryLookupRequestPath = "/federationapi/performDirectoryLookup" FederationAPIPerformJoinRequestPath = "/federationapi/performJoinRequest" @@ -120,6 +123,45 @@ func (h *httpFederationInternalAPI) QueryJoinedHostServerNamesInRoom( return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } +// QueryEventAuthFromFederation implements FederationInternalAPI +func (h *httpFederationInternalAPI) QueryEventAuthFromFederation( + ctx context.Context, + request *api.QueryEventAuthFromFederationRequest, + response *api.QueryEventAuthFromFederationResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryEventAuthFromFederation") + defer span.Finish() + + apiURL := h.federationAPIURL + FederationAPIQueryEventAuthFromFederationPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +// QueryStateIDsFromFederation implements FederationInternalAPI +func (h *httpFederationInternalAPI) QueryStateIDsFromFederation( + ctx context.Context, + request *api.QueryStateIDsFromFederationRequest, + response *api.QueryStateIDsFromFederationResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryStateIDsFromFederation") + defer span.Finish() + + apiURL := h.federationAPIURL + FederationAPIQueryStateIDsFromFederationPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +// QueryEventAuthFromFederation implements FederationInternalAPI +func (h *httpFederationInternalAPI) QueryStateFromFederation( + ctx context.Context, + request *api.QueryStateFromFederationRequest, + response *api.QueryStateFromFederationResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryStateFromFederation") + defer span.Finish() + + apiURL := h.federationAPIURL + FederationAPIQueryStateFromFederationPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + // Handle an instruction to make_join & send_join with a remote server. func (h *httpFederationInternalAPI) PerformJoin( ctx context.Context, diff --git a/federationapi/inthttp/server.go b/federationapi/inthttp/server.go index 7133eddd..f259a065 100644 --- a/federationapi/inthttp/server.go +++ b/federationapi/inthttp/server.go @@ -27,6 +27,48 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle( + FederationAPIQueryEventAuthFromFederationPath, + httputil.MakeInternalAPI("QueryEventAuthFromFederation", func(req *http.Request) util.JSONResponse { + var request api.QueryEventAuthFromFederationRequest + var response api.QueryEventAuthFromFederationResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := intAPI.QueryEventAuthFromFederation(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle( + FederationAPIQueryStateIDsFromFederationPath, + httputil.MakeInternalAPI("QueryStateIDsFromFederation", func(req *http.Request) util.JSONResponse { + var request api.QueryStateIDsFromFederationRequest + var response api.QueryStateIDsFromFederationResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := intAPI.QueryStateIDsFromFederation(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle( + FederationAPIQueryStateFromFederationPath, + httputil.MakeInternalAPI("QueryStateFromFederation", func(req *http.Request) util.JSONResponse { + var request api.QueryStateFromFederationRequest + var response api.QueryStateFromFederationResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := intAPI.QueryStateFromFederation(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle( FederationAPIPerformJoinRequestPath, httputil.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse {