From abd16ff4a0fe6e67fdddf31edd61d1ced797c7b8 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 26 Aug 2020 12:03:09 +0100 Subject: [PATCH] Modify DeviceListUpdater to retry requests according to RetryAfter (#1342) * Modify DeviceListUpdater to retry requests according to RetryAfter * Reduce wait time for sytest test pollution --- keyserver/internal/device_list_update.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 36918256..3fbf31f1 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -310,24 +310,25 @@ func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { } } lastProcessed[serverName] = time.Now() - shouldRetry := u.processServer(serverName) + waitTime, shouldRetry := u.processServer(serverName) if shouldRetry { - scheduledRetries[serverName] = time.Now().Add(cooloffPeriod) - go inject(serverName, cooloffPeriod) + scheduledRetries[serverName] = time.Now().Add(waitTime) + go inject(serverName, waitTime) } } } -func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) bool { +func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerName) (time.Duration, bool) { requestTimeout := time.Minute // max amount of time we want to spend on each request ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() logger := util.GetLogger(ctx).WithField("server_name", serverName) + waitTime := 2 * time.Second // fetch stale device lists userIDs, err := u.db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{serverName}) if err != nil { logger.WithError(err).Error("failed to load stale device lists") - return true + return waitTime, true } hasFailures := false for _, userID := range userIDs { @@ -339,6 +340,10 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam res, err := u.fedClient.GetUserDevices(ctx, serverName, userID) if err != nil { logger.WithError(err).WithField("user_id", userID).Error("failed to query device keys for user") + fcerr, ok := err.(*fedsenderapi.FederationClientError) + if ok && fcerr.RetryAfter > 0 { + waitTime = fcerr.RetryAfter + } hasFailures = true continue } @@ -352,7 +357,7 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam // always clear the channel to unblock Update calls regardless of success/failure u.clearChannel(userID) } - return hasFailures + return waitTime, hasFailures } func (u *DeviceListUpdater) updateDeviceList(res *gomatrixserverlib.RespUserDevices) error {