Use federation sender for backfill/getting missing events (#1379)

* Use federation sender for backfill and getting missing events

* Fix internal URL paths

* Update go.mod/go.sum for matrix-org/gomatrixserverlib#218

* Add missing server implementations in HTTP interface
This commit is contained in:
Neil Alexander 2020-09-02 15:26:30 +01:00 committed by GitHub
parent e473320e73
commit 096191ca24
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 295 additions and 29 deletions

View file

@ -14,9 +14,12 @@ import (
// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in
// this interface are of type FederationClientError
type FederationClient interface {
gomatrixserverlib.BackfillClient
gomatrixserverlib.FederatedStateClient
GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error)
ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error)
QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error)
GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
}
// FederationClientError is returned from FederationClient methods in the event of a problem.

View file

@ -136,3 +136,51 @@ func (a *FederationSenderInternalAPI) QueryKeys(
}
return ires.(gomatrixserverlib.RespQueryKeys), nil
}
func (a *FederationSenderInternalAPI) Backfill(
ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string,
) (res gomatrixserverlib.Transaction, err error) {
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.Backfill(ctx, s, roomID, limit, eventIDs)
})
if err != nil {
return gomatrixserverlib.Transaction{}, err
}
return ires.(gomatrixserverlib.Transaction), nil
}
func (a *FederationSenderInternalAPI) LookupState(
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (res gomatrixserverlib.RespState, err error) {
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion)
})
if err != nil {
return gomatrixserverlib.RespState{}, err
}
return ires.(gomatrixserverlib.RespState), nil
}
func (a *FederationSenderInternalAPI) LookupStateIDs(
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
) (res gomatrixserverlib.RespStateIDs, err error) {
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.LookupStateIDs(ctx, s, roomID, eventID)
})
if err != nil {
return gomatrixserverlib.RespStateIDs{}, err
}
return ires.(gomatrixserverlib.RespStateIDs), nil
}
func (a *FederationSenderInternalAPI) GetEvent(
ctx context.Context, s gomatrixserverlib.ServerName, eventID string,
) (res gomatrixserverlib.Transaction, err error) {
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.GetEvent(ctx, s, eventID)
})
if err != nil {
return gomatrixserverlib.Transaction{}, err
}
return ires.(gomatrixserverlib.Transaction), nil
}

View file

@ -26,6 +26,10 @@ const (
FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices"
FederationSenderClaimKeysPath = "/federationsender/client/claimKeys"
FederationSenderQueryKeysPath = "/federationsender/client/queryKeys"
FederationSenderBackfillPath = "/federationsender/client/backfill"
FederationSenderLookupStatePath = "/federationsender/client/lookupState"
FederationSenderLookupStateIDsPath = "/federationsender/client/lookupStateIDs"
FederationSenderGetEventPath = "/federationsender/client/getEvent"
)
// NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
@ -228,3 +232,129 @@ func (h *httpFederationSenderInternalAPI) QueryKeys(
}
return *response.Res, nil
}
type backfill struct {
S gomatrixserverlib.ServerName
RoomID string
Limit int
EventIDs []string
Res *gomatrixserverlib.Transaction
Err *api.FederationClientError
}
func (h *httpFederationSenderInternalAPI) Backfill(
ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string,
) (gomatrixserverlib.Transaction, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Backfill")
defer span.Finish()
request := backfill{
S: s,
RoomID: roomID,
Limit: limit,
EventIDs: eventIDs,
}
var response backfill
apiURL := h.federationSenderURL + FederationSenderBackfillPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
if err != nil {
return gomatrixserverlib.Transaction{}, err
}
if response.Err != nil {
return gomatrixserverlib.Transaction{}, response.Err
}
return *response.Res, nil
}
type lookupState struct {
S gomatrixserverlib.ServerName
RoomID string
EventID string
RoomVersion gomatrixserverlib.RoomVersion
Res *gomatrixserverlib.RespState
Err *api.FederationClientError
}
func (h *httpFederationSenderInternalAPI) LookupState(
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (gomatrixserverlib.RespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "LookupState")
defer span.Finish()
request := lookupState{
S: s,
RoomID: roomID,
EventID: eventID,
RoomVersion: roomVersion,
}
var response lookupState
apiURL := h.federationSenderURL + FederationSenderLookupStatePath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
if err != nil {
return gomatrixserverlib.RespState{}, err
}
if response.Err != nil {
return gomatrixserverlib.RespState{}, response.Err
}
return *response.Res, nil
}
type lookupStateIDs struct {
S gomatrixserverlib.ServerName
RoomID string
EventID string
Res *gomatrixserverlib.RespStateIDs
Err *api.FederationClientError
}
func (h *httpFederationSenderInternalAPI) LookupStateIDs(
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
) (gomatrixserverlib.RespStateIDs, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "LookupStateIDs")
defer span.Finish()
request := lookupStateIDs{
S: s,
RoomID: roomID,
EventID: eventID,
}
var response lookupStateIDs
apiURL := h.federationSenderURL + FederationSenderLookupStateIDsPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
if err != nil {
return gomatrixserverlib.RespStateIDs{}, err
}
if response.Err != nil {
return gomatrixserverlib.RespStateIDs{}, response.Err
}
return *response.Res, nil
}
type getEvent struct {
S gomatrixserverlib.ServerName
EventID string
Res *gomatrixserverlib.Transaction
Err *api.FederationClientError
}
func (h *httpFederationSenderInternalAPI) GetEvent(
ctx context.Context, s gomatrixserverlib.ServerName, eventID string,
) (gomatrixserverlib.Transaction, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetEvent")
defer span.Finish()
request := getEvent{
S: s,
EventID: eventID,
}
var response getEvent
apiURL := h.federationSenderURL + FederationSenderGetEventPath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
if err != nil {
return gomatrixserverlib.Transaction{}, err
}
if response.Err != nil {
return gomatrixserverlib.Transaction{}, response.Err
}
return *response.Res, nil
}

View file

@ -175,4 +175,92 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
return util.JSONResponse{Code: http.StatusOK, JSON: request}
}),
)
internalAPIMux.Handle(
FederationSenderBackfillPath,
httputil.MakeInternalAPI("Backfill", func(req *http.Request) util.JSONResponse {
var request backfill
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
res, err := intAPI.Backfill(req.Context(), request.S, request.RoomID, request.Limit, request.EventIDs)
if err != nil {
ferr, ok := err.(*api.FederationClientError)
if ok {
request.Err = ferr
} else {
request.Err = &api.FederationClientError{
Err: err.Error(),
}
}
}
request.Res = &res
return util.JSONResponse{Code: http.StatusOK, JSON: request}
}),
)
internalAPIMux.Handle(
FederationSenderLookupStatePath,
httputil.MakeInternalAPI("LookupState", func(req *http.Request) util.JSONResponse {
var request lookupState
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
res, err := intAPI.LookupState(req.Context(), request.S, request.RoomID, request.EventID, request.RoomVersion)
if err != nil {
ferr, ok := err.(*api.FederationClientError)
if ok {
request.Err = ferr
} else {
request.Err = &api.FederationClientError{
Err: err.Error(),
}
}
}
request.Res = &res
return util.JSONResponse{Code: http.StatusOK, JSON: request}
}),
)
internalAPIMux.Handle(
FederationSenderLookupStateIDsPath,
httputil.MakeInternalAPI("LookupStateIDs", func(req *http.Request) util.JSONResponse {
var request lookupStateIDs
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
res, err := intAPI.LookupStateIDs(req.Context(), request.S, request.RoomID, request.EventID)
if err != nil {
ferr, ok := err.(*api.FederationClientError)
if ok {
request.Err = ferr
} else {
request.Err = &api.FederationClientError{
Err: err.Error(),
}
}
}
request.Res = &res
return util.JSONResponse{Code: http.StatusOK, JSON: request}
}),
)
internalAPIMux.Handle(
FederationSenderGetEventPath,
httputil.MakeInternalAPI("GetEvent", func(req *http.Request) util.JSONResponse {
var request getEvent
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
res, err := intAPI.GetEvent(req.Context(), request.S, request.EventID)
if err != nil {
ferr, ok := err.(*api.FederationClientError)
if ok {
request.Err = ferr
} else {
request.Err = &api.FederationClientError{
Err: err.Error(),
}
}
}
request.Res = &res
return util.JSONResponse{Code: http.StatusOK, JSON: request}
}),
)
}