Add query functions in the federation API

This commit is contained in:
Neil Alexander 2021-12-08 14:47:47 +00:00
parent 61406a6747
commit 739acc1291
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 256 additions and 0 deletions

View file

@ -60,6 +60,25 @@ type FederationInternalAPI interface {
request *QueryJoinedHostServerNamesInRoomRequest,
response *QueryJoinedHostServerNamesInRoomResponse,
) error
QueryEventAuthFromFederation(
ctx context.Context,
request *QueryEventAuthFromFederationRequest,
response *QueryEventAuthFromFederationResponse,
) error
QueryStateIDsFromFederation(
ctx context.Context,
request *QueryStateIDsFromFederationRequest,
response *QueryStateIDsFromFederationResponse,
) error
QueryStateFromFederation(
ctx context.Context,
request *QueryStateFromFederationRequest,
response *QueryStateFromFederationResponse,
) error
// Handle an instruction to make_join & send_join with a remote server.
PerformJoin(
ctx context.Context,
@ -194,6 +213,36 @@ type QueryJoinedHostServerNamesInRoomResponse struct {
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
}
type QueryEventAuthFromFederationRequest struct {
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
}
type QueryEventAuthFromFederationResponse struct {
Events []*gomatrixserverlib.Event `json:"events"`
}
type QueryStateIDsFromFederationRequest struct {
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
}
type QueryStateIDsFromFederationResponse struct {
AuthEventIDs []string `json:"auth_event_ids"`
StateEventIDs []string `json:"state_event_ids"`
}
type QueryStateFromFederationRequest struct {
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
RoomID string `json:"room_id"`
EventID string `json:"event_id"`
}
type QueryStateFromFederationResponse struct {
AuthEvents []*gomatrixserverlib.Event `json:"auth_events"`
StateEvents []*gomatrixserverlib.Event `json:"state_events"`
}
type PerformBroadcastEDURequest struct {
}

View file

@ -25,6 +25,129 @@ func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom(
return
}
func (f *FederationInternalAPI) QueryEventAuthFromFederation(
ctx context.Context,
request *api.QueryEventAuthFromFederationRequest,
response *api.QueryEventAuthFromFederationResponse,
) error {
joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID})
if err != nil {
return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err)
}
tryHost := func(serverName gomatrixserverlib.ServerName) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := f.doRequest(serverName, func() (interface{}, error) {
return f.federation.GetEventAuth(
ctx,
serverName,
request.RoomID,
request.EventID,
)
})
if err != nil {
return fmt.Errorf("f.doRequest: %w", err)
}
tx := ires.(gomatrixserverlib.RespEventAuth)
response.Events = tx.AuthEvents
return nil
}
var lasterr error
for _, host := range joinedHosts {
if lasterr = tryHost(host); lasterr != nil {
continue
}
break
}
return lasterr
}
func (f *FederationInternalAPI) QueryStateIDsFromFederation(
ctx context.Context,
request *api.QueryStateIDsFromFederationRequest,
response *api.QueryStateIDsFromFederationResponse,
) error {
joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID})
if err != nil {
return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err)
}
tryHost := func(serverName gomatrixserverlib.ServerName) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := f.doRequest(serverName, func() (interface{}, error) {
return f.federation.LookupStateIDs(
ctx,
serverName,
request.RoomID,
request.EventID,
)
})
if err != nil {
return fmt.Errorf("f.doRequest: %w", err)
}
tx := ires.(gomatrixserverlib.RespStateIDs)
response.AuthEventIDs = tx.AuthEventIDs
response.StateEventIDs = tx.StateEventIDs
return nil
}
var lasterr error
for _, host := range joinedHosts {
if lasterr = tryHost(host); lasterr != nil {
continue
}
break
}
return lasterr
}
func (f *FederationInternalAPI) QueryStateFromFederation(
ctx context.Context,
request *api.QueryStateFromFederationRequest,
response *api.QueryStateFromFederationResponse,
) error {
joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID})
if err != nil {
return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err)
}
tryHost := func(serverName gomatrixserverlib.ServerName) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := f.doRequest(serverName, func() (interface{}, error) {
return f.federation.LookupState(
ctx,
serverName,
request.RoomID,
request.EventID,
request.RoomVersion,
)
})
if err != nil {
return fmt.Errorf("f.doRequest: %w", err)
}
tx := ires.(gomatrixserverlib.RespState)
response.AuthEvents = tx.AuthEvents
response.StateEvents = tx.StateEvents
return nil
}
var lasterr error
for _, host := range joinedHosts {
if lasterr = tryHost(host); lasterr != nil {
continue
}
break
}
return lasterr
}
func (a *FederationInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

View file

@ -17,6 +17,9 @@ import (
const (
FederationAPIQueryJoinedHostServerNamesInRoomPath = "/federationapi/queryJoinedHostServerNamesInRoom"
FederationAPIQueryServerKeysPath = "/federationapi/queryServerKeys"
FederationAPIQueryEventAuthFromFederationPath = "/federationapi/queryEventAuthFromFederation"
FederationAPIQueryStateIDsFromFederationPath = "/federationapi/queryStateIDsFromFederation"
FederationAPIQueryStateFromFederationPath = "/federationapi/queryStateFromFederation"
FederationAPIPerformDirectoryLookupRequestPath = "/federationapi/performDirectoryLookup"
FederationAPIPerformJoinRequestPath = "/federationapi/performJoinRequest"
@ -120,6 +123,45 @@ func (h *httpFederationInternalAPI) QueryJoinedHostServerNamesInRoom(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryEventAuthFromFederation implements FederationInternalAPI
func (h *httpFederationInternalAPI) QueryEventAuthFromFederation(
ctx context.Context,
request *api.QueryEventAuthFromFederationRequest,
response *api.QueryEventAuthFromFederationResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryEventAuthFromFederation")
defer span.Finish()
apiURL := h.federationAPIURL + FederationAPIQueryEventAuthFromFederationPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryStateIDsFromFederation implements FederationInternalAPI
func (h *httpFederationInternalAPI) QueryStateIDsFromFederation(
ctx context.Context,
request *api.QueryStateIDsFromFederationRequest,
response *api.QueryStateIDsFromFederationResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryStateIDsFromFederation")
defer span.Finish()
apiURL := h.federationAPIURL + FederationAPIQueryStateIDsFromFederationPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// QueryEventAuthFromFederation implements FederationInternalAPI
func (h *httpFederationInternalAPI) QueryStateFromFederation(
ctx context.Context,
request *api.QueryStateFromFederationRequest,
response *api.QueryStateFromFederationResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryStateFromFederation")
defer span.Finish()
apiURL := h.federationAPIURL + FederationAPIQueryStateFromFederationPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// Handle an instruction to make_join & send_join with a remote server.
func (h *httpFederationInternalAPI) PerformJoin(
ctx context.Context,

View file

@ -27,6 +27,48 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationAPIQueryEventAuthFromFederationPath,
httputil.MakeInternalAPI("QueryEventAuthFromFederation", func(req *http.Request) util.JSONResponse {
var request api.QueryEventAuthFromFederationRequest
var response api.QueryEventAuthFromFederationResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := intAPI.QueryEventAuthFromFederation(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationAPIQueryStateIDsFromFederationPath,
httputil.MakeInternalAPI("QueryStateIDsFromFederation", func(req *http.Request) util.JSONResponse {
var request api.QueryStateIDsFromFederationRequest
var response api.QueryStateIDsFromFederationResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := intAPI.QueryStateIDsFromFederation(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationAPIQueryStateFromFederationPath,
httputil.MakeInternalAPI("QueryStateFromFederation", func(req *http.Request) util.JSONResponse {
var request api.QueryStateFromFederationRequest
var response api.QueryStateFromFederationResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := intAPI.QueryStateFromFederation(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(
FederationAPIPerformJoinRequestPath,
httputil.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse {