mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Add FederationClient interface to federationsender (#1284)
* Add FederationClient interface to federationsender - Use a shim struct in HTTP mode to keep the same API as `FederationClient`. - Use `federationsender` instead of `FederationClient` in `keyserver`. * Pointers not values * Review comments * Fix unit tests * Rejig backoff * Unbreak test * Remove debug logs * Review comments and linting
This commit is contained in:
parent
068a3d3c9f
commit
6d6bb75137
11 changed files with 326 additions and 27 deletions
|
@ -1,11 +1,16 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/federationsender/queue"
|
||||
"github.com/matrix-org/dendrite/federationsender/statistics"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrix"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
@ -14,7 +19,7 @@ type FederationSenderInternalAPI struct {
|
|||
db storage.Database
|
||||
cfg *config.FederationSender
|
||||
statistics *statistics.Statistics
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
federation *gomatrixserverlib.FederationClient
|
||||
keyRing *gomatrixserverlib.KeyRing
|
||||
queues *queue.OutgoingQueues
|
||||
|
@ -22,7 +27,7 @@ type FederationSenderInternalAPI struct {
|
|||
|
||||
func NewFederationSenderInternalAPI(
|
||||
db storage.Database, cfg *config.FederationSender,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
keyRing *gomatrixserverlib.KeyRing,
|
||||
statistics *statistics.Statistics,
|
||||
|
@ -38,3 +43,96 @@ func NewFederationSenderInternalAPI(
|
|||
queues: queues,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) {
|
||||
stats := a.statistics.ForServer(s)
|
||||
until, blacklisted := stats.BackoffInfo()
|
||||
if blacklisted {
|
||||
return stats, &api.FederationClientError{
|
||||
Blacklisted: true,
|
||||
}
|
||||
}
|
||||
now := time.Now()
|
||||
if until != nil && now.Before(*until) {
|
||||
return stats, &api.FederationClientError{
|
||||
RetryAfter: time.Until(*until),
|
||||
}
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
mxerr, ok := err.(gomatrix.HTTPError)
|
||||
if !ok {
|
||||
return stats.Failure()
|
||||
}
|
||||
if mxerr.Code >= 500 && mxerr.Code < 600 {
|
||||
return stats.Failure()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (a *FederationSenderInternalAPI) doRequest(
|
||||
s gomatrixserverlib.ServerName, request func() (interface{}, error),
|
||||
) (interface{}, error) {
|
||||
stats, err := a.isBlacklistedOrBackingOff(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := request()
|
||||
if err != nil {
|
||||
until, blacklisted := failBlacklistableError(err, stats)
|
||||
now := time.Now()
|
||||
var retryAfter time.Duration
|
||||
if until.After(now) {
|
||||
retryAfter = time.Until(until)
|
||||
}
|
||||
return res, &api.FederationClientError{
|
||||
Err: err.Error(),
|
||||
Blacklisted: blacklisted,
|
||||
RetryAfter: retryAfter,
|
||||
}
|
||||
}
|
||||
stats.Success()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (a *FederationSenderInternalAPI) GetUserDevices(
|
||||
ctx context.Context, s gomatrixserverlib.ServerName, userID string,
|
||||
) (gomatrixserverlib.RespUserDevices, error) {
|
||||
ires, err := a.doRequest(s, func() (interface{}, error) {
|
||||
return a.federation.GetUserDevices(ctx, s, userID)
|
||||
})
|
||||
if err != nil {
|
||||
return gomatrixserverlib.RespUserDevices{}, err
|
||||
}
|
||||
return ires.(gomatrixserverlib.RespUserDevices), nil
|
||||
}
|
||||
|
||||
func (a *FederationSenderInternalAPI) ClaimKeys(
|
||||
ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string,
|
||||
) (gomatrixserverlib.RespClaimKeys, error) {
|
||||
ires, err := a.doRequest(s, func() (interface{}, error) {
|
||||
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
|
||||
})
|
||||
if err != nil {
|
||||
return gomatrixserverlib.RespClaimKeys{}, err
|
||||
}
|
||||
return ires.(gomatrixserverlib.RespClaimKeys), nil
|
||||
}
|
||||
|
||||
func (a *FederationSenderInternalAPI) QueryKeys(
|
||||
ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
|
||||
) (gomatrixserverlib.RespQueryKeys, error) {
|
||||
ires, err := a.doRequest(s, func() (interface{}, error) {
|
||||
return a.federation.QueryKeys(ctx, s, keys)
|
||||
})
|
||||
if err != nil {
|
||||
return gomatrixserverlib.RespQueryKeys{}, err
|
||||
}
|
||||
return ires.(gomatrixserverlib.RespQueryKeys), nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue