From d98ec12422c8498cf710bb34d2ed31f024aa1e15 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Wed, 12 Aug 2020 13:50:54 +0100 Subject: [PATCH] Add sync mechanism to block when updating device lists (#1264) * Add sync mechanism to block when updating device lists With a timeout, mainly for sytest to fix the test "Server correctly handles incoming m.device_list_update" which is flakey because it assumes that when `/send` 200 OKs that the server has updated the device lists in prep for `/keys/query` which is not always true when using workers. * Fix UT * Add new working test --- keyserver/internal/device_list_update.go | 51 ++++++++++++++++--- keyserver/internal/device_list_update_test.go | 10 ---- sytest-whitelist | 1 + 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 85785b07..1c4f0b97 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -67,6 +67,11 @@ type DeviceListUpdater struct { producer KeyChangeProducer fedClient *gomatrixserverlib.FederationClient workerChans []chan gomatrixserverlib.ServerName + + // When device lists are stale for a user, they get inserted into this map with a channel which `Update` will + // block on or timeout via a select. + userIDToChan map[string]chan bool + userIDToChanMu *sync.Mutex } // DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. @@ -98,12 +103,14 @@ func NewDeviceListUpdater( numWorkers int, ) *DeviceListUpdater { return &DeviceListUpdater{ - userIDToMutex: make(map[string]*sync.Mutex), - mu: &sync.Mutex{}, - db: db, - producer: producer, - fedClient: fedClient, - workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + userIDToMutex: make(map[string]*sync.Mutex), + mu: &sync.Mutex{}, + db: db, + producer: producer, + fedClient: fedClient, + workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers), + userIDToChan: make(map[string]chan bool), + userIDToChanMu: &sync.Mutex{}, } } @@ -137,6 +144,8 @@ func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex { return u.userIDToMutex[userID] } +// Update blocks until the update has been stored in the database. It blocks primarily for satisfying sytest, +// which assumes when /send 200 OKs that the device lists have been updated. func (u *DeviceListUpdater) Update(ctx context.Context, event gomatrixserverlib.DeviceListUpdateEvent) error { isDeviceListStale, err := u.update(ctx, event) if err != nil { @@ -213,7 +222,35 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { hash := fnv.New32a() _, _ = hash.Write([]byte(remoteServer)) index := int(hash.Sum32()) % len(u.workerChans) + + ch := u.assignChannel(userID) u.workerChans[index] <- remoteServer + select { + case <-ch: + case <-time.After(10 * time.Second): + // we don't return an error in this case as it's not a failure condition. + // we mainly block for the benefit of sytest anyway + } +} + +func (u *DeviceListUpdater) assignChannel(userID string) chan bool { + u.userIDToChanMu.Lock() + defer u.userIDToChanMu.Unlock() + if ch, ok := u.userIDToChan[userID]; ok { + return ch + } + ch := make(chan bool) + u.userIDToChan[userID] = ch + return ch +} + +func (u *DeviceListUpdater) clearChannel(userID string) { + u.userIDToChanMu.Lock() + defer u.userIDToChanMu.Unlock() + if ch, ok := u.userIDToChan[userID]; ok { + close(ch) + delete(u.userIDToChan, userID) + } } func (u *DeviceListUpdater) worker(ch chan gomatrixserverlib.ServerName) { @@ -285,6 +322,8 @@ func (u *DeviceListUpdater) processServer(serverName gomatrixserverlib.ServerNam if err != nil { logger.WithError(err).WithField("user_id", userID).Error("fetched device list but failed to store/emit it") hasFailures = true + } else { + u.clearChannel(userID) } } return hasFailures diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go index b07148bb..dcb981c4 100644 --- a/keyserver/internal/device_list_update_test.go +++ b/keyserver/internal/device_list_update_test.go @@ -204,16 +204,6 @@ func TestUpdateNoPrevID(t *testing.T) { if err != nil { t.Fatalf("Update returned an error: %s", err) } - // At this point we show have this device list marked as stale and not store the keys or emitted anything - if !db.staleUsers[event.UserID] { - t.Errorf("%s not marked as stale", event.UserID) - } - if len(producer.events) > 0 { - t.Errorf("Update incorrect emitted %d device change events", len(producer.events)) - } - if len(db.storedKeys) > 0 { - t.Errorf("Update incorrect stored %d device change events", len(db.storedKeys)) - } t.Log("waiting for /users/devices to be called...") wg.Wait() // wait a bit for db to be updated... diff --git a/sytest-whitelist b/sytest-whitelist index 4d37b3ee..bbac6972 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -143,6 +143,7 @@ Device deletion propagates over federation If remote user leaves room, changes device and rejoins we see update in sync If remote user leaves room, changes device and rejoins we see update in /keys/changes If remote user leaves room we no longer receive device updates +If a device list update goes missing, the server resyncs on the next one Get left notifs in sync and /keys/changes when other user leaves Can query remote device keys using POST after notification Can add account data