mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-26 15:08:28 +00:00
Make 'Device list doesn't change if remote server is down' pass (#1268)
- As a last resort, query the DB when exhausting all possible remote query endpoints, but keep the field in `failures` so clients can detect that this is stale data. - Unblock `DeviceListUpdater.Update` on failures rather than timing out. - Use a mutex when writing directly to `res`, not just for failures.
This commit is contained in:
parent
4c4732a9c9
commit
20c8f252a7
3 changed files with 78 additions and 58 deletions
|
@ -342,10 +342,12 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
|
logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it")
|
||||||
hasFailures = true
|
hasFailures = true
|
||||||
} else {
|
|
||||||
u.clearChannel(userID)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
// always clear the channel to unblock Update calls regardless of success/failure
|
||||||
|
u.clearChannel(userID)
|
||||||
|
}
|
||||||
return hasFailures
|
return hasFailures
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -318,65 +318,12 @@ func (a *KeyInternalAPI) queryRemoteKeys(
|
||||||
// allows us to wait until all federation servers have been poked
|
// allows us to wait until all federation servers have been poked
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(len(domainToDeviceKeys))
|
wg.Add(len(domainToDeviceKeys))
|
||||||
// mutex for failures
|
// mutex for writing directly to res (e.g failures)
|
||||||
var failMu sync.Mutex
|
var respMu sync.Mutex
|
||||||
|
|
||||||
// fan out
|
// fan out
|
||||||
for domain, deviceKeys := range domainToDeviceKeys {
|
for domain, deviceKeys := range domainToDeviceKeys {
|
||||||
go func(serverName string, devKeys map[string][]string) {
|
go a.queryRemoteKeysOnServer(ctx, domain, deviceKeys, &wg, &respMu, timeout, resultCh, res)
|
||||||
defer wg.Done()
|
|
||||||
fedCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
||||||
defer cancel()
|
|
||||||
// for users who we do not have any knowledge about, try to start doing device list updates for them
|
|
||||||
// by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but
|
|
||||||
// lack a stream ID.
|
|
||||||
var userIDsForAllDevices []string
|
|
||||||
for userID, deviceIDs := range devKeys {
|
|
||||||
if len(deviceIDs) == 0 {
|
|
||||||
userIDsForAllDevices = append(userIDsForAllDevices, userID)
|
|
||||||
delete(devKeys, userID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, userID := range userIDsForAllDevices {
|
|
||||||
err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
logrus.ErrorKey: err,
|
|
||||||
"user_id": userID,
|
|
||||||
"server": serverName,
|
|
||||||
}).Error("Failed to manually update device lists for user")
|
|
||||||
// try to do it via /keys/query
|
|
||||||
devKeys[userID] = []string{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this
|
|
||||||
// user so the fact that we're populating all devices here isn't a problem so long as we have devices.
|
|
||||||
err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
logrus.ErrorKey: err,
|
|
||||||
"user_id": userID,
|
|
||||||
"server": serverName,
|
|
||||||
}).Error("Failed to manually update device lists for user")
|
|
||||||
// try to do it via /keys/query
|
|
||||||
devKeys[userID] = []string{}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(devKeys) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys)
|
|
||||||
if err != nil {
|
|
||||||
failMu.Lock()
|
|
||||||
res.Failures[serverName] = map[string]interface{}{
|
|
||||||
"message": err.Error(),
|
|
||||||
}
|
|
||||||
failMu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resultCh <- &queryKeysResp
|
|
||||||
}(domain, deviceKeys)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close the result channel when the goroutines have quit so the for .. range exits
|
// Close the result channel when the goroutines have quit so the for .. range exits
|
||||||
|
@ -399,6 +346,76 @@ func (a *KeyInternalAPI) queryRemoteKeys(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *KeyInternalAPI) queryRemoteKeysOnServer(
|
||||||
|
ctx context.Context, serverName string, devKeys map[string][]string, wg *sync.WaitGroup,
|
||||||
|
respMu *sync.Mutex, timeout time.Duration, resultCh chan<- *gomatrixserverlib.RespQueryKeys,
|
||||||
|
res *api.QueryKeysResponse,
|
||||||
|
) {
|
||||||
|
defer wg.Done()
|
||||||
|
fedCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer cancel()
|
||||||
|
// for users who we do not have any knowledge about, try to start doing device list updates for them
|
||||||
|
// by hitting /users/devices - otherwise fallback to /keys/query which has nicer bulk properties but
|
||||||
|
// lack a stream ID.
|
||||||
|
var userIDsForAllDevices []string
|
||||||
|
for userID, deviceIDs := range devKeys {
|
||||||
|
if len(deviceIDs) == 0 {
|
||||||
|
userIDsForAllDevices = append(userIDsForAllDevices, userID)
|
||||||
|
delete(devKeys, userID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, userID := range userIDsForAllDevices {
|
||||||
|
err := a.Updater.ManualUpdate(context.Background(), gomatrixserverlib.ServerName(serverName), userID)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
logrus.ErrorKey: err,
|
||||||
|
"user_id": userID,
|
||||||
|
"server": serverName,
|
||||||
|
}).Error("Failed to manually update device lists for user")
|
||||||
|
// try to do it via /keys/query
|
||||||
|
devKeys[userID] = []string{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// refresh entries from DB: unlike remoteKeysFromDatabase we know we previously had no device info for this
|
||||||
|
// user so the fact that we're populating all devices here isn't a problem so long as we have devices.
|
||||||
|
respMu.Lock()
|
||||||
|
err = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, nil)
|
||||||
|
respMu.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
logrus.ErrorKey: err,
|
||||||
|
"user_id": userID,
|
||||||
|
"server": serverName,
|
||||||
|
}).Error("Failed to manually update device lists for user")
|
||||||
|
// try to do it via /keys/query
|
||||||
|
devKeys[userID] = []string{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(devKeys) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
queryKeysResp, err := a.FedClient.QueryKeys(fedCtx, gomatrixserverlib.ServerName(serverName), devKeys)
|
||||||
|
if err == nil {
|
||||||
|
resultCh <- &queryKeysResp
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respMu.Lock()
|
||||||
|
res.Failures[serverName] = map[string]interface{}{
|
||||||
|
"message": err.Error(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// last ditch, use the cache only. This is good for when clients hit /keys/query and the remote server
|
||||||
|
// is down, better to return something than nothing at all. Clients can know about the failure by
|
||||||
|
// inspecting the failures map though so they can know it's a cached response.
|
||||||
|
for userID, dkeys := range devKeys {
|
||||||
|
// drop the error as it's already a failure at this point
|
||||||
|
_ = a.populateResponseWithDeviceKeysFromDatabase(ctx, res, userID, dkeys)
|
||||||
|
}
|
||||||
|
respMu.Unlock()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (a *KeyInternalAPI) populateResponseWithDeviceKeysFromDatabase(
|
func (a *KeyInternalAPI) populateResponseWithDeviceKeysFromDatabase(
|
||||||
ctx context.Context, res *api.QueryKeysResponse, userID string, deviceIDs []string,
|
ctx context.Context, res *api.QueryKeysResponse, userID string, deviceIDs []string,
|
||||||
) error {
|
) error {
|
||||||
|
|
|
@ -148,6 +148,7 @@ Get left notifs in sync and /keys/changes when other user leaves
|
||||||
Can query remote device keys using POST after notification
|
Can query remote device keys using POST after notification
|
||||||
Server correctly resyncs when client query keys and there is no remote cache
|
Server correctly resyncs when client query keys and there is no remote cache
|
||||||
Server correctly resyncs when server leaves and rejoins a room
|
Server correctly resyncs when server leaves and rejoins a room
|
||||||
|
Device list doesn't change if remote server is down
|
||||||
Can add account data
|
Can add account data
|
||||||
Can add account data to room
|
Can add account data to room
|
||||||
Can get account data without syncing
|
Can get account data without syncing
|
||||||
|
|
Loading…
Reference in a new issue