From 6d6bb7513710db1009c474eff031434916feda1b Mon Sep 17 00:00:00 2001 From: Kegsay Date: Thu, 20 Aug 2020 17:03:07 +0100 Subject: [PATCH] Add FederationClient interface to federationsender (#1284) * Add FederationClient interface to federationsender - Use a shim struct in HTTP mode to keep the same API as `FederationClient`. - Use `federationsender` instead of `FederationClient` in `keyserver`. * Pointers not values * Review comments * Fix unit tests * Rejig backoff * Unbreak test * Remove debug logs * Review comments and linting --- cmd/dendrite-monolith-server/main.go | 33 +++---- federationsender/api/api.go | 24 +++++ federationsender/internal/api.go | 104 +++++++++++++++++++++- federationsender/inthttp/client.go | 95 ++++++++++++++++++++ federationsender/inthttp/server.go | 66 ++++++++++++++ federationsender/statistics/statistics.go | 11 +++ go.mod | 2 +- go.sum | 2 + keyserver/internal/device_list_update.go | 9 +- keyserver/internal/internal.go | 3 +- keyserver/keyserver.go | 4 +- 11 files changed, 326 insertions(+), 27 deletions(-) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index bf752708..e2d2de48 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -80,9 +80,6 @@ func main() { serverKeyAPI = base.ServerKeyAPIClient() } keyRing := serverKeyAPI.KeyRing() - keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation, base.KafkaProducer) - userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, keyAPI) - keyAPI.SetUserAPI(userAPI) rsImpl := roomserver.NewInternalAPI( base, keyRing, federation, @@ -99,6 +96,23 @@ func main() { } } + stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer) + + fsAPI := federationsender.NewInternalAPI( + base, federation, rsAPI, stateAPI, keyRing, + ) + if base.UseHTTPAPIs { + federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) + fsAPI = base.FederationSenderHTTPClient() + } + // The underlying roomserver implementation needs to be able to call the fedsender. + // This is different to rsAPI which can be the http client which doesn't need this dependency + rsImpl.SetFederationSenderAPI(fsAPI) + + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI, base.KafkaProducer) + userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Global.ServerName, cfg.Derived.ApplicationServices, keyAPI) + keyAPI.SetUserAPI(userAPI) + eduInputAPI := eduserver.NewInternalAPI( base, cache.New(), userAPI, ) @@ -113,19 +127,6 @@ func main() { asAPI = base.AppserviceHTTPClient() } - stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer) - - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, stateAPI, keyRing, - ) - if base.UseHTTPAPIs { - federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) - fsAPI = base.FederationSenderHTTPClient() - } - // The underlying roomserver implementation needs to be able to call the fedsender. - // This is different to rsAPI which can be the http client which doesn't need this dependency - rsImpl.SetFederationSenderAPI(fsAPI) - monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, diff --git a/federationsender/api/api.go b/federationsender/api/api.go index 9f9c2645..cea0010d 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -2,14 +2,38 @@ package api import ( "context" + "fmt" + "time" "github.com/matrix-org/dendrite/federationsender/types" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) +// FederationClient is a subset of gomatrixserverlib.FederationClient functions which the fedsender +// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in +// this interface are of type FederationClientError +type FederationClient interface { + GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error) + ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error) + QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error) +} + +// FederationClientError is returned from FederationClient methods in the event of a problem. +type FederationClientError struct { + Err string + RetryAfter time.Duration + Blacklisted bool +} + +func (e *FederationClientError) Error() string { + return fmt.Sprintf("%s - (retry_after=%d, blacklisted=%v)", e.Err, e.RetryAfter, e.Blacklisted) +} + // FederationSenderInternalAPI is used to query information from the federation sender. type FederationSenderInternalAPI interface { + FederationClient + // PerformDirectoryLookup looks up a remote room ID from a room alias. PerformDirectoryLookup( ctx context.Context, diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 647e3fcb..6b5f4c34 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -1,11 +1,16 @@ package internal import ( + "context" + "time" + + "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/federationsender/queue" "github.com/matrix-org/dendrite/federationsender/statistics" "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/roomserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) @@ -14,7 +19,7 @@ type FederationSenderInternalAPI struct { db storage.Database cfg *config.FederationSender statistics *statistics.Statistics - rsAPI api.RoomserverInternalAPI + rsAPI roomserverAPI.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient keyRing *gomatrixserverlib.KeyRing queues *queue.OutgoingQueues @@ -22,7 +27,7 @@ type FederationSenderInternalAPI struct { func NewFederationSenderInternalAPI( db storage.Database, cfg *config.FederationSender, - rsAPI api.RoomserverInternalAPI, + rsAPI roomserverAPI.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, statistics *statistics.Statistics, @@ -38,3 +43,96 @@ func NewFederationSenderInternalAPI( queues: queues, } } + +func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) { + stats := a.statistics.ForServer(s) + until, blacklisted := stats.BackoffInfo() + if blacklisted { + return stats, &api.FederationClientError{ + Blacklisted: true, + } + } + now := time.Now() + if until != nil && now.Before(*until) { + return stats, &api.FederationClientError{ + RetryAfter: time.Until(*until), + } + } + + return stats, nil +} + +func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) { + if err == nil { + return + } + mxerr, ok := err.(gomatrix.HTTPError) + if !ok { + return stats.Failure() + } + if mxerr.Code >= 500 && mxerr.Code < 600 { + return stats.Failure() + } + return +} + +func (a *FederationSenderInternalAPI) doRequest( + s gomatrixserverlib.ServerName, request func() (interface{}, error), +) (interface{}, error) { + stats, err := a.isBlacklistedOrBackingOff(s) + if err != nil { + return nil, err + } + res, err := request() + if err != nil { + until, blacklisted := failBlacklistableError(err, stats) + now := time.Now() + var retryAfter time.Duration + if until.After(now) { + retryAfter = time.Until(until) + } + return res, &api.FederationClientError{ + Err: err.Error(), + Blacklisted: blacklisted, + RetryAfter: retryAfter, + } + } + stats.Success() + return res, nil +} + +func (a *FederationSenderInternalAPI) GetUserDevices( + ctx context.Context, s gomatrixserverlib.ServerName, userID string, +) (gomatrixserverlib.RespUserDevices, error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.GetUserDevices(ctx, s, userID) + }) + if err != nil { + return gomatrixserverlib.RespUserDevices{}, err + } + return ires.(gomatrixserverlib.RespUserDevices), nil +} + +func (a *FederationSenderInternalAPI) ClaimKeys( + ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, +) (gomatrixserverlib.RespClaimKeys, error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.ClaimKeys(ctx, s, oneTimeKeys) + }) + if err != nil { + return gomatrixserverlib.RespClaimKeys{}, err + } + return ires.(gomatrixserverlib.RespClaimKeys), nil +} + +func (a *FederationSenderInternalAPI) QueryKeys( + ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, +) (gomatrixserverlib.RespQueryKeys, error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.QueryKeys(ctx, s, keys) + }) + if err != nil { + return gomatrixserverlib.RespQueryKeys{}, err + } + return ires.(gomatrixserverlib.RespQueryKeys), nil +} diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 13c2c45a..79e220c3 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/gomatrix" + "github.com/matrix-org/gomatrixserverlib" "github.com/opentracing/opentracing-go" ) @@ -21,6 +22,10 @@ const ( FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" + + FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices" + FederationSenderClaimKeysPath = "/federationsender/client/claimKeys" + FederationSenderQueryKeysPath = "/federationsender/client/queryKeys" ) // NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. @@ -133,3 +138,93 @@ func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU( apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } + +type getUserDevices struct { + S gomatrixserverlib.ServerName + UserID string + Res *gomatrixserverlib.RespUserDevices + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) GetUserDevices( + ctx context.Context, s gomatrixserverlib.ServerName, userID string, +) (gomatrixserverlib.RespUserDevices, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetUserDevices") + defer span.Finish() + + var result gomatrixserverlib.RespUserDevices + request := getUserDevices{ + S: s, + UserID: userID, + } + var response getUserDevices + apiURL := h.federationSenderURL + FederationSenderGetUserDevicesPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} + +type claimKeys struct { + S gomatrixserverlib.ServerName + OneTimeKeys map[string]map[string]string + Res *gomatrixserverlib.RespClaimKeys + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) ClaimKeys( + ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, +) (gomatrixserverlib.RespClaimKeys, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "ClaimKeys") + defer span.Finish() + + var result gomatrixserverlib.RespClaimKeys + request := claimKeys{ + S: s, + OneTimeKeys: oneTimeKeys, + } + var response claimKeys + apiURL := h.federationSenderURL + FederationSenderClaimKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} + +type queryKeys struct { + S gomatrixserverlib.ServerName + Keys map[string][]string + Res *gomatrixserverlib.RespQueryKeys + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) QueryKeys( + ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, +) (gomatrixserverlib.RespQueryKeys, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeys") + defer span.Finish() + + var result gomatrixserverlib.RespQueryKeys + request := queryKeys{ + S: s, + Keys: keys, + } + var response queryKeys + apiURL := h.federationSenderURL + FederationSenderQueryKeysPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return result, err + } + if response.Err != nil { + return result, response.Err + } + return *response.Res, nil +} diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go index f02cbd12..b1825576 100644 --- a/federationsender/inthttp/server.go +++ b/federationsender/inthttp/server.go @@ -109,4 +109,70 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route return util.JSONResponse{Code: http.StatusOK, JSON: &response} }), ) + internalAPIMux.Handle( + FederationSenderGetUserDevicesPath, + httputil.MakeInternalAPI("GetUserDevices", func(req *http.Request) util.JSONResponse { + var request getUserDevices + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.GetUserDevices(req.Context(), request.S, request.UserID) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderClaimKeysPath, + httputil.MakeInternalAPI("ClaimKeys", func(req *http.Request) util.JSONResponse { + var request claimKeys + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.ClaimKeys(req.Context(), request.S, request.OneTimeKeys) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderQueryKeysPath, + httputil.MakeInternalAPI("QueryKeys", func(req *http.Request) util.JSONResponse { + var request queryKeys + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.QueryKeys(req.Context(), request.S, request.Keys) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) } diff --git a/federationsender/statistics/statistics.go b/federationsender/statistics/statistics.go index a574ceff..03ef64e9 100644 --- a/federationsender/statistics/statistics.go +++ b/federationsender/statistics/statistics.go @@ -126,8 +126,19 @@ func (s *ServerStatistics) Failure() (time.Time, bool) { return until, false } +// BackoffInfo returns information about the current or previous backoff. +// Returns the last backoffUntil time and whether the server is currently blacklisted or not. +func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) { + until, ok := s.backoffUntil.Load().(time.Time) + if ok { + return &until, s.blacklisted.Load() + } + return nil, s.blacklisted.Load() +} + // BackoffIfRequired will block for as long as the current // backoff requires, if needed. Otherwise it will do nothing. +// Returns the amount of time to backoff for and whether to give up or not. func (s *ServerStatistics) BackoffIfRequired(backingOff atomic.Bool, interrupt <-chan bool) (time.Duration, bool) { if started := s.backoffStarted.Load(); !started { return 0, false diff --git a/go.mod b/go.mod index 1f408706..3125cf58 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/Shopify/sarama v1.26.1 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/gologme/log v1.2.0 - github.com/gorilla/mux v1.7.3 + github.com/gorilla/mux v1.7.4 github.com/hashicorp/golang-lru v0.5.4 github.com/lib/pq v1.2.0 github.com/libp2p/go-libp2p v0.6.0 diff --git a/go.sum b/go.sum index 06c29523..ef4ba1c4 100644 --- a/go.sum +++ b/go.sum @@ -151,6 +151,8 @@ github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE0 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= +github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index dd8fb700..36918256 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -22,6 +22,7 @@ import ( "sync" "time" + fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -65,7 +66,7 @@ type DeviceListUpdater struct { db DeviceListUpdaterDatabase producer KeyChangeProducer - fedClient *gomatrixserverlib.FederationClient + fedClient fedsenderapi.FederationClient workerChans []chan gomatrixserverlib.ServerName // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will @@ -103,7 +104,7 @@ type KeyChangeProducer interface { // NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale. func NewDeviceListUpdater( - db DeviceListUpdaterDatabase, producer KeyChangeProducer, fedClient *gomatrixserverlib.FederationClient, + db DeviceListUpdaterDatabase, producer KeyChangeProducer, fedClient fedsenderapi.FederationClient, numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ @@ -304,7 +305,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { continue } else { scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) // TODO: Backoff? + go inject(serverName, cooloffPeriod) continue } } @@ -312,7 +313,7 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { shouldRetry := u.processServer(serverName) if shouldRetry { scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) // TODO: Backoff? + go inject(serverName, cooloffPeriod) } } } diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index 31fb1236..53afe0a6 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -22,6 +22,7 @@ import ( "sync" "time" + fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" @@ -36,7 +37,7 @@ import ( type KeyInternalAPI struct { DB storage.Database ThisServer gomatrixserverlib.ServerName - FedClient *gomatrixserverlib.FederationClient + FedClient fedsenderapi.FederationClient UserAPI userapi.UserInternalAPI Producer *producers.KeyChange Updater *DeviceListUpdater diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 04136938..2e561363 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -17,13 +17,13 @@ package keyserver import ( "github.com/Shopify/sarama" "github.com/gorilla/mux" + fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -36,7 +36,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - cfg *config.KeyServer, fedClient *gomatrixserverlib.FederationClient, producer sarama.SyncProducer, + cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, producer sarama.SyncProducer, ) api.KeyInternalAPI { db, err := storage.NewDatabase(&cfg.Database) if err != nil {