From 1b5460a920d271aa03c798709f2f1b3b710e4e0e Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Oct 2022 13:42:35 +0100 Subject: [PATCH] Ensure we only wake up a given user once (#2775) This ensures that the sync API notifier only wakes up a given user once for a given stream position. --- syncapi/notifier/notifier.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index db18c6b7..27f7c37b 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -48,6 +48,7 @@ type Notifier struct { lastCleanUpTime time.Time // This map is reused to prevent allocations and GC pressure in SharedUsers. _sharedUserMap map[string]struct{} + _wakeupUserMap map[string]struct{} } // NewNotifier creates a new notifier set to the given sync position. @@ -61,6 +62,7 @@ func NewNotifier() *Notifier { lock: &sync.RWMutex{}, lastCleanUpTime: time.Now(), _sharedUserMap: map[string]struct{}{}, + _wakeupUserMap: map[string]struct{}{}, } } @@ -408,12 +410,16 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P // specified user IDs, and also the specified peekingDevices func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { for _, userID := range userIDs { + n._wakeupUserMap[userID] = struct{}{} + } + for userID := range n._wakeupUserMap { for _, stream := range n._fetchUserStreams(userID) { if stream == nil { continue } stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream } + delete(n._wakeupUserMap, userID) } for _, peekingDevice := range peekingDevices {