Sync refactor — Part 1 (#1688)

* It's half-alive

* Wakeups largely working

* Other tweaks, typing works

* Fix bugs, add receipt stream

* Delete notifier, other tweaks

* Dedupe a bit, add a template for the invite stream

* Clean up, add templates for other streams

* Don't leak channels

* Bring forward some more PDU logic, clean up other places

* Add some more wakeups

* Use addRoomDeltaToResponse

* Log tweaks, typing fixed?

* Fix timed out syncs

* Don't reset next batch position on timeout

* Add account data stream/position

* End of day

* Fix complete sync for receipt, typing

* Streams package

* Clean up a bit

* Complete sync send-to-device

* Don't drop errors

* More lightweight notifications

* Fix typing positions

* Don't advance position on remove again unless needed

* Device list updates

* Advance account data position

* Use limit for incremental sync

* Limit fixes, amongst other things

* Remove some fmt.Println

* Tweaks

* Re-add notifier

* Fix invite position

* Fixes

* Notify account data without advancing PDU position in notifier

* Apply account data position

* Get initial position for account data

* Fix position update

* Fix complete sync positions

* Review comments @Kegsay

* Room consumer parameters
This commit is contained in:
Neil Alexander 2021-01-08 16:59:06 +00:00 committed by GitHub
parent 56a7839aed
commit b5a8935042
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1452 additions and 1116 deletions

View file

@ -0,0 +1,481 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"context"
"sync"
"time"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
// Notifier will wake up sleeping requests when there is some new data.
// It does not tell requests what that data is, only the sync position which
// they can use to get at it. This is done to prevent races whereby we tell the caller
// the event, but the token has already advanced by the time they fetch it, resulting
// in missed events.
type Notifier struct {
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
roomIDToJoinedUsers map[string]userIDSet
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
roomIDToPeekingDevices map[string]peekingDeviceSet
// Protects currPos and userStreams.
streamLock *sync.Mutex
// The latest sync position
currPos types.StreamingToken
// A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request.
userDeviceStreams map[string]map[string]*UserDeviceStream
// The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time
}
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(currPos types.StreamingToken) *Notifier {
return &Notifier{
currPos: currPos,
roomIDToJoinedUsers: make(map[string]userIDSet),
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
streamLock: &sync.Mutex{},
lastCleanUpTime: time.Now(),
}
}
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current sync position incorrectly.
// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
// (based on the users in the event's room),
// a roomID directly, or a list of user IDs, prioritised by parameter ordering.
// posUpdate contains the latest position(s) for one or more types of events.
// If a position in posUpdate is 0, it means no updates are available of that type.
// Typically a consumer supplies a posUpdate with the latest sync position for the
// event type it handles, leaving other fields as 0.
func (n *Notifier) OnNewEvent(
ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
posUpdate types.StreamingToken,
) {
// update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.removeEmptyUserStreams()
if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up.
usersToNotify := n.joinedUsers(ev.RoomID())
// Map this event's room_id to a list of peeking devices, and wake them up.
peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
targetUserID := *ev.StateKey()
membership, err := ev.Membership()
if err != nil {
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
"Notifier.OnNewEvent: Failed to unmarshal member event",
)
} else {
// Keep the joined user map up-to-date
switch membership {
case gomatrixserverlib.Invite:
usersToNotify = append(usersToNotify, targetUserID)
case gomatrixserverlib.Join:
// Manually append the new user's ID so they get notified
// along all members in the room
usersToNotify = append(usersToNotify, targetUserID)
n.addJoinedUser(ev.RoomID(), targetUserID)
case gomatrixserverlib.Leave:
fallthrough
case gomatrixserverlib.Ban:
n.removeJoinedUser(ev.RoomID(), targetUserID)
}
}
}
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
} else if roomID != "" {
n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
} else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, nil, n.currPos)
} else {
log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
}).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
}
}
func (n *Notifier) OnNewAccountData(
userID string, posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{userID}, nil, posUpdate)
}
func (n *Notifier) OnNewPeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.addPeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
// by calling OnNewEvent.
}
func (n *Notifier) OnRetirePeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.removePeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
// by calling OnRetireEvent.
}
func (n *Notifier) OnNewSendToDevice(
userID string, deviceIDs []string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUserDevice(userID, deviceIDs, n.currPos)
}
// OnNewReceipt updates the current position
func (n *Notifier) OnNewTyping(
roomID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
// OnNewReceipt updates the current position
func (n *Notifier) OnNewReceipt(
roomID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
}
func (n *Notifier) OnNewKeyChange(
posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
}
func (n *Notifier) OnNewInvite(
posUpdate types.StreamingToken, wakeUserID string,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
}
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID
// - Incoming events wake requests for a matching user ID (needed for invites)
// TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
// but given we don't do /events, let's pretend it doesn't exist.
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.removeEmptyUserStreams()
return n.fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context)
}
// Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil {
return err
}
n.setUsersJoinedToRooms(roomToUsers)
roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx)
if err != nil {
return err
}
n.setPeekingDevices(roomToPeekingDevices)
return nil
}
// CurrentPosition returns the current sync position
func (n *Notifier) CurrentPosition() types.StreamingToken {
n.streamLock.Lock()
defer n.streamLock.Unlock()
return n.currPos
}
// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
// OnNewEvent (eg on startup) to prevent racing.
func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
// This is just the bulk form of addJoinedUser
for roomID, userIDs := range roomIDToUserIDs {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
for _, userID := range userIDs {
n.roomIDToJoinedUsers[roomID].add(userID)
}
}
}
// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from
// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to
// OnNewEvent (eg on startup) to prevent racing.
func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) {
// This is just the bulk form of addPeekingDevice
for roomID, peekingDevices := range roomIDToPeekingDevices {
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
for _, peekingDevice := range peekingDevices {
n.roomIDToPeekingDevices[roomID].add(peekingDevice)
}
}
}
// wakeupUsers will wake up the sync strems for all of the devices for all of the
// specified user IDs, and also the specified peekingDevices
func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
for _, userID := range userIDs {
for _, stream := range n.fetchUserStreams(userID) {
if stream == nil {
continue
}
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
}
for _, peekingDevice := range peekingDevices {
// TODO: don't bother waking up for devices whose users we already woke up
if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
}
}
// wakeupUserDevice will wake up the sync stream for a specific user device. Other
// device streams will be left alone.
// nolint:unused
func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
for _, deviceID := range deviceIDs {
if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil {
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
}
}
// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
// a stream will be made for this device if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
_, ok := n.userDeviceStreams[userID]
if !ok {
if !makeIfNotExists {
return nil
}
n.userDeviceStreams[userID] = map[string]*UserDeviceStream{}
}
stream, ok := n.userDeviceStreams[userID][deviceID]
if !ok {
if !makeIfNotExists {
return nil
}
// TODO: Unbounded growth of streams (1 per user)
if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil {
n.userDeviceStreams[userID][deviceID] = stream
}
}
return stream
}
// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
user, ok := n.userDeviceStreams[userID]
if !ok {
return []*UserDeviceStream{}
}
streams := []*UserDeviceStream{}
for _, stream := range user {
streams = append(streams, stream)
}
return streams
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addJoinedUser(roomID, userID string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
n.roomIDToJoinedUsers[roomID].add(userID)
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) removeJoinedUser(roomID, userID string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
n.roomIDToJoinedUsers[roomID].remove(userID)
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
return
}
return n.roomIDToJoinedUsers[roomID].values()
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
// nolint:unused
func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
// XXX: is this going to work as a key?
n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
return
}
return n.roomIDToPeekingDevices[roomID].values()
}
// removeEmptyUserStreams iterates through the user stream map and removes any
// that have been empty for a certain amount of time. This is a crude way of
// ensuring that the userStreams map doesn't grow forver.
// This should be called when the notifier gets called for whatever reason,
// the function itself is responsible for ensuring it doesn't iterate too
// often.
// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) removeEmptyUserStreams() {
// Only clean up now and again
now := time.Now()
if n.lastCleanUpTime.Add(time.Minute).After(now) {
return
}
n.lastCleanUpTime = now
deleteBefore := now.Add(-5 * time.Minute)
for user, byUser := range n.userDeviceStreams {
for device, stream := range byUser {
if stream.TimeOfLastNonEmpty().Before(deleteBefore) {
delete(n.userDeviceStreams[user], device)
}
if len(n.userDeviceStreams[user]) == 0 {
delete(n.userDeviceStreams, user)
}
}
}
}
// A string set, mainly existing for improving clarity of structs in this file.
type userIDSet map[string]bool
func (s userIDSet) add(str string) {
s[str] = true
}
func (s userIDSet) remove(str string) {
delete(s, str)
}
func (s userIDSet) values() (vals []string) {
for str := range s {
vals = append(vals, str)
}
return
}
// A set of PeekingDevices, similar to userIDSet
type peekingDeviceSet map[types.PeekingDevice]bool
func (s peekingDeviceSet) add(d types.PeekingDevice) {
s[d] = true
}
// nolint:unused
func (s peekingDeviceSet) remove(d types.PeekingDevice) {
delete(s, d)
}
func (s peekingDeviceSet) values() (vals []types.PeekingDevice) {
for d := range s {
vals = append(vals, d)
}
return
}

View file

@ -0,0 +1,374 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
var (
randomMessageEvent gomatrixserverlib.HeaderedEvent
aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
bobLeaveEvent gomatrixserverlib.HeaderedEvent
syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
syncPositionBefore = types.StreamingToken{PDUPosition: 11}
syncPositionAfter = types.StreamingToken{PDUPosition: 12}
//syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil)
syncPositionAfter2 = types.StreamingToken{PDUPosition: 13}
)
var (
roomID = "!test:localhost"
alice = "@alice:localhost"
aliceDev = "alicedevice"
bob = "@bob:localhost"
bobDev = "bobdev"
)
func init() {
var err error
err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.message",
"content": {
"body": "Hello World",
"msgtype": "m.text"
},
"sender": "@noone:localhost",
"room_id": "`+roomID+`",
"origin": "localhost",
"origin_server_ts": 12345,
"event_id": "$randomMessageEvent:localhost"
}`), &randomMessageEvent)
if err != nil {
panic(err)
}
err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.member",
"state_key": "`+bob+`",
"content": {
"membership": "invite"
},
"sender": "`+alice+`",
"room_id": "`+roomID+`",
"origin": "localhost",
"origin_server_ts": 12345,
"event_id": "$aliceInviteBobEvent:localhost"
}`), &aliceInviteBobEvent)
if err != nil {
panic(err)
}
err = json.Unmarshal([]byte(`{
"_room_version": "1",
"type": "m.room.member",
"state_key": "`+bob+`",
"content": {
"membership": "leave"
},
"sender": "`+bob+`",
"room_id": "`+roomID+`",
"origin": "localhost",
"origin_server_ts": 12345,
"event_id": "$bobLeaveEvent:localhost"
}`), &bobLeaveEvent)
if err != nil {
panic(err)
}
}
func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
if got.String() != want.String() {
t.Fatalf("mustEqualPositions got %s want %s", got.String(), want.String())
}
}
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
n := NewNotifier(syncPositionBefore)
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
}
mustEqualPositions(t, pos, syncPositionBefore)
}
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %w", err)
}
mustEqualPositions(t, pos, syncPositionAfter)
wg.Done()
}()
stream := lockedFetchUserStream(n, bob, bobDev)
waitForBlocking(stream, 1)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
}
func TestCorrectStream(t *testing.T) {
n := NewNotifier(syncPositionBefore)
stream := lockedFetchUserStream(n, bob, bobDev)
if stream.UserID != bob {
t.Fatalf("expected user %q, got %q", bob, stream.UserID)
}
if stream.DeviceID != bobDev {
t.Fatalf("expected device %q, got %q", bobDev, stream.DeviceID)
}
}
func TestCorrectStreamWakeup(t *testing.T) {
n := NewNotifier(syncPositionBefore)
awoken := make(chan string)
streamone := lockedFetchUserStream(n, alice, "one")
streamtwo := lockedFetchUserStream(n, alice, "two")
go func() {
select {
case <-streamone.signalChannel:
awoken <- "one"
case <-streamtwo.signalChannel:
awoken <- "two"
}
}()
time.Sleep(1 * time.Second)
wake := "two"
n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
if result := <-awoken; result != wake {
t.Fatalf("expected to wake %q, got %q", wake, result)
}
}
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %w", err)
}
mustEqualPositions(t, pos, syncPositionAfter)
wg.Done()
}()
stream := lockedFetchUserStream(n, bob, bobDev)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
wg.Wait()
}
// Test an EDU-only update wakes up the request.
// TODO: Fix this test, invites wake up with an incremented
// PDU position, not EDU position
/*
func TestEDUWakeup(t *testing.T) {
n := NewNotifier(syncPositionAfter)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %w", err)
}
mustEqualPositions(t, pos, syncPositionNewEDU)
wg.Done()
}()
stream := lockedFetchUserStream(n, bob, bobDev)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
wg.Wait()
}
*/
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
var wg sync.WaitGroup
wg.Add(3)
poll := func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %w", err)
}
mustEqualPositions(t, pos, syncPositionAfter)
wg.Done()
}
go poll()
go poll()
go poll()
stream := lockedFetchUserStream(n, bob, bobDev)
waitForBlocking(stream, 3)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
numWaiting := stream.NumWaiting()
if numWaiting != 0 {
t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
}
}
// Test that you stop getting woken up when you leave a room.
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
var leaveWG sync.WaitGroup
// Make bob leave the room
leaveWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
}
mustEqualPositions(t, pos, syncPositionAfter)
leaveWG.Done()
}()
bobStream := lockedFetchUserStream(n, bob, bobDev)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
var aliceWG sync.WaitGroup
aliceStream := lockedFetchUserStream(n, alice, aliceDev)
aliceWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
}
mustEqualPositions(t, pos, syncPositionAfter2)
aliceWG.Done()
}()
go func() {
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
_, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
if err == nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
}
}()
waitForBlocking(aliceStream, 1)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
aliceWG.Wait()
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
// for a fraction of a second to account for this race
time.Sleep(1 * time.Millisecond)
}
func waitForEvents(n *Notifier, req types.SyncRequest) (types.StreamingToken, error) {
listener := n.GetListener(req)
defer listener.Close()
select {
case <-time.After(5 * time.Second):
return types.StreamingToken{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%v)", req.Device.UserID, req.Since,
)
case <-listener.GetNotifyChannel(req.Since):
p := listener.GetSyncPosition()
return p, nil
}
}
// Wait until something is Wait()ing on the user stream.
func waitForBlocking(s *UserDeviceStream, numBlocking uint) {
for numBlocking != s.NumWaiting() {
// This is horrible but I don't want to add a signalling mechanism JUST for testing.
time.Sleep(1 * time.Microsecond)
}
}
// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
// A new stream is made if it doesn't exist already.
func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream {
n.streamLock.Lock()
defer n.streamLock.Unlock()
return n.fetchUserDeviceStream(userID, deviceID, true)
}
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest {
return types.SyncRequest{
Device: &userapi.Device{
UserID: userID,
ID: deviceID,
},
Timeout: 1 * time.Minute,
Since: since,
WantFullState: false,
Limit: 20,
Log: util.GetLogger(context.TODO()),
Context: context.TODO(),
}
}

View file

@ -0,0 +1,162 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"context"
"runtime"
"sync"
"time"
"github.com/matrix-org/dendrite/syncapi/types"
)
// UserDeviceStream represents a communication mechanism between the /sync request goroutine
// and the underlying sync server goroutines.
// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast()
// updates.
type UserDeviceStream struct {
UserID string
DeviceID string
// The lock that protects changes to this struct
lock sync.Mutex
// Closed when there is an update.
signalChannel chan struct{}
// The last sync position that there may have been an update for the user
pos types.StreamingToken
// The last time when we had some listeners waiting
timeOfLastChannel time.Time
// The number of listeners waiting
numWaiting uint
}
// UserDeviceStreamListener allows a sync request to wait for updates for a user.
type UserDeviceStreamListener struct {
userStream *UserDeviceStream
// Whether the stream has been closed
hasClosed bool
}
// NewUserDeviceStream creates a new user stream
func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream {
return &UserDeviceStream{
UserID: userID,
DeviceID: deviceID,
timeOfLastChannel: time.Now(),
pos: currPos,
signalChannel: make(chan struct{}),
}
}
// GetListener returns UserStreamListener that a sync request can use to wait
// for new updates with.
// UserStreamListener must be closed
func (s *UserDeviceStream) GetListener(ctx context.Context) UserDeviceStreamListener {
s.lock.Lock()
defer s.lock.Unlock()
s.numWaiting++ // We decrement when UserStreamListener is closed
listener := UserDeviceStreamListener{
userStream: s,
}
// Lets be a bit paranoid here and check that Close() is being called
runtime.SetFinalizer(&listener, func(l *UserDeviceStreamListener) {
if !l.hasClosed {
l.Close()
}
})
return listener
}
// Broadcast a new sync position for this user.
func (s *UserDeviceStream) Broadcast(pos types.StreamingToken) {
s.lock.Lock()
defer s.lock.Unlock()
s.pos = pos
close(s.signalChannel)
s.signalChannel = make(chan struct{})
}
// NumWaiting returns the number of goroutines waiting for waiting for updates.
// Used for metrics and testing.
func (s *UserDeviceStream) NumWaiting() uint {
s.lock.Lock()
defer s.lock.Unlock()
return s.numWaiting
}
// TimeOfLastNonEmpty returns the last time that the number of waiting listeners
// was non-empty, may be time.Now() if number of waiting listeners is currently
// non-empty.
func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time {
s.lock.Lock()
defer s.lock.Unlock()
if s.numWaiting > 0 {
return time.Now()
}
return s.timeOfLastChannel
}
// GetSyncPosition returns last sync position which the UserStream was
// notified about
func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
return s.userStream.pos
}
// GetNotifyChannel returns a channel that is closed when there may be an
// update for the user.
// sincePos specifies from which point we want to be notified about. If there
// has already been an update after sincePos we'll return a closed channel
// immediately.
func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{} {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
if s.userStream.pos.IsAfter(sincePos) {
// If the listener is behind, i.e. missed a potential update, then we
// want them to wake up immediately. We do this by returning a new
// closed stream, which returns immediately when selected.
closedChannel := make(chan struct{})
close(closedChannel)
return closedChannel
}
return s.userStream.signalChannel
}
// Close cleans up resources used
func (s *UserDeviceStreamListener) Close() {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
if !s.hasClosed {
s.userStream.numWaiting--
s.userStream.timeOfLastChannel = time.Now()
}
s.hasClosed = true
}