From 087efc0e3440e3358fef89ec619bd09cc96dd6ae Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 13 Dec 2021 12:56:42 +0000 Subject: [PATCH] some more stuff --- federationapi/api/api.go | 16 ++++++ federationapi/internal/query.go | 41 +++++++++++++++ federationapi/inthttp/client.go | 14 +++++ federationapi/inthttp/server.go | 14 +++++ roomserver/internal/input/input_events.go | 62 ++++++++++++++++++++++- 5 files changed, 145 insertions(+), 2 deletions(-) diff --git a/federationapi/api/api.go b/federationapi/api/api.go index 6309ff21..7547cf08 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -2,6 +2,7 @@ package api import ( "context" + "encoding/json" "fmt" "time" @@ -61,6 +62,12 @@ type FederationInternalAPI interface { response *QueryJoinedHostServerNamesInRoomResponse, ) error + QueryEventsFromFederation( + ctx context.Context, + request *QueryEventsFromFederationRequest, + response *QueryEventsFromFederationResponse, + ) error + QueryEventAuthFromFederation( ctx context.Context, request *QueryEventAuthFromFederationRequest, @@ -213,6 +220,15 @@ type QueryJoinedHostServerNamesInRoomResponse struct { ServerNames []gomatrixserverlib.ServerName `json:"server_names"` } +type QueryEventsFromFederationRequest struct { + RoomID string `json:"room_id"` + EventIDs []string `json:"event_ids"` +} + +type QueryEventsFromFederationResponse struct { + Events []json.RawMessage `json:"events"` +} + type QueryEventAuthFromFederationRequest struct { RoomID string `json:"room_id"` EventID string `json:"event_id"` diff --git a/federationapi/internal/query.go b/federationapi/internal/query.go index 794a513f..b2c1a872 100644 --- a/federationapi/internal/query.go +++ b/federationapi/internal/query.go @@ -25,6 +25,47 @@ func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom( return } +func (f *FederationInternalAPI) QueryEventsFromFederation( + ctx context.Context, + request *api.QueryEventsFromFederationRequest, + response *api.QueryEventsFromFederationResponse, +) error { + joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID}) + if err != nil { + return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err) + } + + tryHost := func(serverName gomatrixserverlib.ServerName, eventID string) error { + reqctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + ires, err := f.doRequest(serverName, func() (interface{}, error) { + return f.federation.GetEvent( + reqctx, + serverName, + eventID, + ) + }) + if err != nil { + return fmt.Errorf("f.doRequest: %w", err) + } + tx := ires.(gomatrixserverlib.Transaction) + response.Events = append(response.Events, tx.PDUs...) + return nil + } + + var lasterr error + for _, eventID := range request.EventIDs { + for _, host := range joinedHosts { + if lasterr = tryHost(host, eventID); lasterr != nil { + continue + } + break + } + } + + return lasterr +} + func (f *FederationInternalAPI) QueryEventAuthFromFederation( ctx context.Context, request *api.QueryEventAuthFromFederationRequest, diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index abca8651..0ae41ccb 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -17,6 +17,7 @@ import ( const ( FederationAPIQueryJoinedHostServerNamesInRoomPath = "/federationapi/queryJoinedHostServerNamesInRoom" FederationAPIQueryServerKeysPath = "/federationapi/queryServerKeys" + FederationAPIQueryEventsFromFederationPath = "/federationapi/queryEventsFromFederation" FederationAPIQueryEventAuthFromFederationPath = "/federationapi/queryEventAuthFromFederation" FederationAPIQueryStateIDsFromFederationPath = "/federationapi/queryStateIDsFromFederation" FederationAPIQueryStateFromFederationPath = "/federationapi/queryStateFromFederation" @@ -123,6 +124,19 @@ func (h *httpFederationInternalAPI) QueryJoinedHostServerNamesInRoom( return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } +// QueryEventAuthFromFederation implements FederationInternalAPI +func (h *httpFederationInternalAPI) QueryEventsFromFederation( + ctx context.Context, + request *api.QueryEventsFromFederationRequest, + response *api.QueryEventsFromFederationResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryEventsFromFederation") + defer span.Finish() + + apiURL := h.federationAPIURL + FederationAPIQueryEventsFromFederationPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + // QueryEventAuthFromFederation implements FederationInternalAPI func (h *httpFederationInternalAPI) QueryEventAuthFromFederation( ctx context.Context, diff --git a/federationapi/inthttp/server.go b/federationapi/inthttp/server.go index f259a065..18c1640b 100644 --- a/federationapi/inthttp/server.go +++ b/federationapi/inthttp/server.go @@ -27,6 +27,20 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) { return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle( + FederationAPIQueryEventAuthFromFederationPath, + httputil.MakeInternalAPI("QueryEventsFromFederation", func(req *http.Request) util.JSONResponse { + var request api.QueryEventsFromFederationRequest + var response api.QueryEventsFromFederationResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.ErrorResponse(err) + } + if err := intAPI.QueryEventsFromFederation(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) internalAPIMux.Handle( FederationAPIQueryEventAuthFromFederationPath, httputil.MakeInternalAPI("QueryEventAuthFromFederation", func(req *http.Request) util.JSONResponse { diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ea943ea9..6dc8dc8f 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -125,7 +125,7 @@ func (r *Inputer) processRoomEvent( // Then check if the prev events are known, which we need in order // to calculate the state before the event. - if err := r.checkForMissingPrevEvents(ctx, input); err != nil { + if err := r.checkForMissingPrevEvents(ctx, input.Event); err != nil { return "", fmt.Errorf("r.checkForMissingPrevEvents: %w", err) } @@ -299,6 +299,14 @@ func (r *Inputer) checkForMissingAuthEvents( continue } + // Check the signatures of the event. + // TODO: It really makes sense for the federation API to be doing this, + // because then it can attempt another server if one serves up an event + // with an invalid signature. For now this will do. + if err := event.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { + return fmt.Errorf("event.VerifyEventSignatures: %w", err) + } + // Otherwise, we need to store, and that means we need to know the // auth event NIDs. Let's see if we can find those. authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs())) @@ -342,8 +350,58 @@ func (r *Inputer) checkForMissingAuthEvents( func (r *Inputer) checkForMissingPrevEvents( ctx context.Context, - input *api.InputRoomEvent, + event *gomatrixserverlib.HeaderedEvent, ) error { + // First of all, determine if we know of the prev events. + prevEventIDs := event.PrevEventIDs() + prevEventNIDs, err := r.DB.EventNIDs(ctx, prevEventIDs) + if err != nil { + return fmt.Errorf("r.DB.EventNIDs: %w", err) + } + missingPrevEvents := make([]string, 0, len(prevEventIDs)) + for _, eventID := range prevEventIDs { + if _, ok := prevEventNIDs[eventID]; !ok { + missingPrevEvents = append(prevEventIDs, eventID) + } + } + + // If there are no missing prev events then we're all happy and there + // is nothing more to do here. + if len(missingPrevEvents) == 0 { + return nil + } + + return fmt.Errorf("there are %d missing prev events", len(missingPrevEvents)) + + req := &fedapi.QueryEventsFromFederationRequest{ + RoomID: event.RoomID(), + EventIDs: missingPrevEvents, + } + res := &fedapi.QueryEventsFromFederationResponse{} + if err := r.FSAPI.QueryEventsFromFederation(ctx, req, res); err != nil { + return fmt.Errorf("r.FSAPI.QueryEventsFromFederation: %w", err) + } + + for _, js := range res.Events { + ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(js, event.RoomVersion) + if err != nil { + return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", err) + } + + if err := ev.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil { + return fmt.Errorf("event.VerifyEventSignatures: %w", err) + } + + req := &fedapi.QueryStateIDsFromFederationRequest{ + RoomID: ev.RoomID(), + EventID: ev.EventID(), + } + res := &fedapi.QueryStateIDsFromFederationResponse{} + if err := r.FSAPI.QueryStateIDsFromFederation(ctx, req, res); err != nil { + return fmt.Errorf("r.FSAPI.QueryStateIDsFromFederation: %w", err) + } + + } return nil }