Add typing notifications to /sync responses - fixes #635 (#718)

This PR adds a new consumer for typing notifications in syncapi. It also brings changes to syncserver.go and some related files so EDUs can better fit in /sync responses.

Fixes #635.
Fixes #574.
This commit is contained in:
Alex Chen 2019-07-12 22:59:53 +08:00 committed by GitHub
parent f8463063ac
commit 29841bed6b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 712 additions and 252 deletions

View file

@ -26,7 +26,7 @@ import (
)
// Notifier will wake up sleeping requests when there is some new data.
// It does not tell requests what that data is, only the stream position which
// It does not tell requests what that data is, only the sync position which
// they can use to get at it. This is done to prevent races whereby we tell the caller
// the event, but the token has already advanced by the time they fetch it, resulting
// in missed events.
@ -35,18 +35,18 @@ type Notifier struct {
roomIDToJoinedUsers map[string]userIDSet
// Protects currPos and userStreams.
streamLock *sync.Mutex
// The latest sync stream position
currPos types.StreamPosition
// The latest sync position
currPos types.SyncPosition
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
userStreams map[string]*UserStream
// The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time
}
// NewNotifier creates a new notifier set to the given stream position.
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.StreamPosition) *Notifier {
func NewNotifier(pos types.SyncPosition) *Notifier {
return &Notifier{
currPos: pos,
roomIDToJoinedUsers: make(map[string]userIDSet),
@ -58,20 +58,30 @@ func NewNotifier(pos types.StreamPosition) *Notifier {
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current position in the stream incorrectly.
// Can be called either with a *gomatrixserverlib.Event, or with an user ID
func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos types.StreamPosition) {
// current sync position incorrectly.
// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
// (based on the users in the event's room),
// a roomID directly, or a list of user IDs, prioritised by parameter ordering.
// posUpdate contains the latest position(s) for one or more types of events.
// If a position in posUpdate is 0, it means no updates are available of that type.
// Typically a consumer supplies a posUpdate with the latest sync position for the
// event type it handles, leaving other fields as 0.
func (n *Notifier) OnNewEvent(
ev *gomatrixserverlib.Event, roomID string, userIDs []string,
posUpdate types.SyncPosition,
) {
// update the current position then notify relevant /sync streams.
// This needs to be done PRIOR to waking up users as they will read this value.
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos = pos
latestPos := n.currPos.WithUpdates(posUpdate)
n.currPos = latestPos
n.removeEmptyUserStreams()
if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up.
userIDs := n.joinedUsers(ev.RoomID())
usersToNotify := n.joinedUsers(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
targetUserID := *ev.StateKey()
@ -84,11 +94,11 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
// Keep the joined user map up-to-date
switch membership {
case "invite":
userIDs = append(userIDs, targetUserID)
usersToNotify = append(usersToNotify, targetUserID)
case "join":
// Manually append the new user's ID so they get notified
// along all members in the room
userIDs = append(userIDs, targetUserID)
usersToNotify = append(usersToNotify, targetUserID)
n.addJoinedUser(ev.RoomID(), targetUserID)
case "leave":
fallthrough
@ -98,11 +108,15 @@ func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.Event, userID string, pos ty
}
}
for _, toNotifyUserID := range userIDs {
n.wakeupUser(toNotifyUserID, pos)
}
} else if len(userID) > 0 {
n.wakeupUser(userID, pos)
n.wakeupUsers(usersToNotify, latestPos)
} else if roomID != "" {
n.wakeupUsers(n.joinedUsers(roomID), latestPos)
} else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, latestPos)
} else {
log.WithFields(log.Fields{
"posUpdate": posUpdate.String,
}).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
}
}
@ -127,7 +141,7 @@ func (n *Notifier) GetListener(req syncRequest) UserStreamListener {
}
// Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) error {
func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatasource) error {
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil {
return err
@ -136,8 +150,11 @@ func (n *Notifier) Load(ctx context.Context, db *storage.SyncServerDatabase) err
return nil
}
// CurrentPosition returns the current stream position
func (n *Notifier) CurrentPosition() types.StreamPosition {
// CurrentPosition returns the current sync position
func (n *Notifier) CurrentPosition() types.SyncPosition {
n.streamLock.Lock()
defer n.streamLock.Unlock()
return n.currPos
}
@ -156,12 +173,13 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
}
}
func (n *Notifier) wakeupUser(userID string, newPos types.StreamPosition) {
stream := n.fetchUserStream(userID, false)
if stream == nil {
return
func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
for _, userID := range userIDs {
stream := n.fetchUserStream(userID, false)
if stream != nil {
stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
}
}
stream.Broadcast(newPos) // wakeup all goroutines Wait()ing on this stream
}
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,

View file

@ -32,19 +32,40 @@ var (
randomMessageEvent gomatrixserverlib.Event
aliceInviteBobEvent gomatrixserverlib.Event
bobLeaveEvent gomatrixserverlib.Event
syncPositionVeryOld types.SyncPosition
syncPositionBefore types.SyncPosition
syncPositionAfter types.SyncPosition
syncPositionNewEDU types.SyncPosition
syncPositionAfter2 types.SyncPosition
)
var (
streamPositionVeryOld = types.StreamPosition(5)
streamPositionBefore = types.StreamPosition(11)
streamPositionAfter = types.StreamPosition(12)
streamPositionAfter2 = types.StreamPosition(13)
roomID = "!test:localhost"
alice = "@alice:localhost"
bob = "@bob:localhost"
roomID = "!test:localhost"
alice = "@alice:localhost"
bob = "@bob:localhost"
)
func init() {
baseSyncPos := types.SyncPosition{
PDUPosition: 0,
TypingPosition: 0,
}
syncPositionVeryOld = baseSyncPos
syncPositionVeryOld.PDUPosition = 5
syncPositionBefore = baseSyncPos
syncPositionBefore.PDUPosition = 11
syncPositionAfter = baseSyncPos
syncPositionAfter.PDUPosition = 12
syncPositionNewEDU = syncPositionAfter
syncPositionNewEDU.TypingPosition = 1
syncPositionAfter2 = baseSyncPos
syncPositionAfter2.PDUPosition = 13
var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.message",
@ -92,19 +113,19 @@ func init() {
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
n := NewNotifier(streamPositionBefore)
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld))
n := NewNotifier(syncPositionBefore)
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
}
if pos != streamPositionBefore {
t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos)
if pos != syncPositionBefore {
t.Fatalf("TestImmediateNotification want %d, got %d", syncPositionBefore, pos)
}
}
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
n := NewNotifier(streamPositionBefore)
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -112,12 +133,12 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos)
if pos != syncPositionAfter {
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}()
@ -125,14 +146,14 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
}
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
n := NewNotifier(streamPositionBefore)
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -140,12 +161,12 @@ func TestNewInviteEventForUser(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos)
if pos != syncPositionAfter {
t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}()
@ -153,14 +174,42 @@ func TestNewInviteEventForUser(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
wg.Wait()
}
// Test an EDU-only update wakes up the request.
func TestEDUWakeup(t *testing.T) {
n := NewNotifier(syncPositionAfter)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %s", err)
}
if pos != syncPositionNewEDU {
t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionNewEDU, pos)
}
wg.Done()
}()
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
wg.Wait()
}
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
n := NewNotifier(streamPositionBefore)
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -168,12 +217,12 @@ func TestMultipleRequestWakeup(t *testing.T) {
var wg sync.WaitGroup
wg.Add(3)
poll := func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
if pos != syncPositionAfter {
t.Errorf("TestMultipleRequestWakeup want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}
@ -184,7 +233,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 3)
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
@ -198,7 +247,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
n := NewNotifier(streamPositionBefore)
n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@ -208,18 +257,18 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// Make bob leave the room
leaveWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
if pos != streamPositionAfter {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
if pos != syncPositionAfter {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter, pos)
}
leaveWG.Done()
}()
bobStream := n.fetchUserStream(bob, true)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
@ -227,19 +276,19 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
aliceStream := n.fetchUserStream(alice, true)
aliceWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
if pos != streamPositionAfter2 {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
if pos != syncPositionAfter2 {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter2, pos)
}
aliceWG.Done()
}()
go func() {
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
_, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
_, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
if err == nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
}
@ -248,7 +297,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
waitForBlocking(aliceStream, 1)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
aliceWG.Wait()
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
@ -256,18 +305,17 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
// same as Notifier.WaitForEvents but with a timeout.
func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) {
listener := n.GetListener(req)
defer listener.Close()
select {
case <-time.After(5 * time.Second):
return types.StreamPosition(0), fmt.Errorf(
return types.SyncPosition{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
)
case <-listener.GetNotifyChannel(*req.since):
p := listener.GetStreamPosition()
p := listener.GetSyncPosition()
return p, nil
}
}
@ -280,7 +328,7 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
}
}
func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
return syncRequest{
device: authtypes.Device{UserID: userID},
timeout: 1 * time.Minute,

View file

@ -16,8 +16,10 @@ package sync
import (
"context"
"errors"
"net/http"
"strconv"
"strings"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
@ -36,7 +38,7 @@ type syncRequest struct {
device authtypes.Device
limit int
timeout time.Duration
since *types.StreamPosition // nil means that no since token was supplied
since *types.SyncPosition // nil means that no since token was supplied
wantFullState bool
log *log.Entry
}
@ -73,15 +75,41 @@ func getTimeout(timeoutMS string) time.Duration {
}
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a
// stream position. If the string is empty then (nil, nil) is returned.
func getSyncStreamPosition(since string) (*types.StreamPosition, error) {
// types.SyncPosition. If the string is empty then (nil, nil) is returned.
// There are two forms of tokens: The full length form containing all PDU and EDU
// positions separated by "_", and the short form containing only the PDU
// position. Short form can be used for, e.g., `prev_batch` tokens.
func getSyncStreamPosition(since string) (*types.SyncPosition, error) {
if since == "" {
return nil, nil
}
i, err := strconv.Atoi(since)
if err != nil {
return nil, err
posStrings := strings.Split(since, "_")
if len(posStrings) != 2 && len(posStrings) != 1 {
// A token can either be full length or short (PDU-only).
return nil, errors.New("malformed batch token")
}
positions := make([]int64, len(posStrings))
for i, posString := range posStrings {
pos, err := strconv.ParseInt(posString, 10, 64)
if err != nil {
return nil, err
}
positions[i] = pos
}
if len(positions) == 2 {
// Full length token; construct SyncPosition with every entry in
// `positions`. These entries must have the same order with the fields
// in struct SyncPosition, so we disable the govet check below.
return &types.SyncPosition{ //nolint:govet
positions[0], positions[1],
}, nil
} else {
// Token with PDU position only
return &types.SyncPosition{
PDUPosition: positions[0],
}, nil
}
token := types.StreamPosition(i)
return &token, nil
}

View file

@ -31,13 +31,13 @@ import (
// RequestPool manages HTTP long-poll connections for /sync
type RequestPool struct {
db *storage.SyncServerDatabase
db *storage.SyncServerDatasource
accountDB *accounts.Database
notifier *Notifier
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(db *storage.SyncServerDatabase, n *Notifier, adb *accounts.Database) *RequestPool {
func NewRequestPool(db *storage.SyncServerDatasource, n *Notifier, adb *accounts.Database) *RequestPool {
return &RequestPool{db, adb, n}
}
@ -92,11 +92,13 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
sincePos := *syncReq.since
for {
select {
// Wait for notifier to wake us up
case <-userStreamListener.GetNotifyChannel(currPos):
currPos = userStreamListener.GetStreamPosition()
case <-userStreamListener.GetNotifyChannel(sincePos):
currPos = userStreamListener.GetSyncPosition()
sincePos = currPos
// Or for timeout to expire
case <-timer.C:
// We just need to ensure we get out of the select after reaching the
@ -128,24 +130,24 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
}
}
func (rp *RequestPool) currentSyncForUser(req syncRequest, currentPos types.StreamPosition) (res *types.Response, err error) {
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncPosition) (res *types.Response, err error) {
// TODO: handle ignored users
if req.since == nil {
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
} else {
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, currentPos, req.limit)
res, err = rp.db.IncrementalSync(req.ctx, req.device, *req.since, latestPos, req.limit)
}
if err != nil {
return
}
res, err = rp.appendAccountData(res, req.device.UserID, req, currentPos)
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition)
return
}
func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
data *types.Response, userID string, req syncRequest, currentPos int64,
) (*types.Response, error) {
// TODO: Account data doesn't have a sync position of its own, meaning that
// account data might be sent multiple time to the client if multiple account
@ -179,7 +181,7 @@ func (rp *RequestPool) appendAccountData(
}
// Sync is not initial, get all account data since the latest sync
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, *req.since, currentPos)
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since.PDUPosition, currentPos)
if err != nil {
return nil, err
}

View file

@ -34,8 +34,8 @@ type UserStream struct {
lock sync.Mutex
// Closed when there is an update.
signalChannel chan struct{}
// The last stream position that there may have been an update for the suser
pos types.StreamPosition
// The last sync position that there may have been an update for the user
pos types.SyncPosition
// The last time when we had some listeners waiting
timeOfLastChannel time.Time
// The number of listeners waiting
@ -51,7 +51,7 @@ type UserStreamListener struct {
}
// NewUserStream creates a new user stream
func NewUserStream(userID string, currPos types.StreamPosition) *UserStream {
func NewUserStream(userID string, currPos types.SyncPosition) *UserStream {
return &UserStream{
UserID: userID,
timeOfLastChannel: time.Now(),
@ -84,8 +84,8 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
return listener
}
// Broadcast a new stream position for this user.
func (s *UserStream) Broadcast(pos types.StreamPosition) {
// Broadcast a new sync position for this user.
func (s *UserStream) Broadcast(pos types.SyncPosition) {
s.lock.Lock()
defer s.lock.Unlock()
@ -118,9 +118,9 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time {
return s.timeOfLastChannel
}
// GetStreamPosition returns last stream position which the UserStream was
// GetStreamPosition returns last sync position which the UserStream was
// notified about
func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
func (s *UserStreamListener) GetSyncPosition() types.SyncPosition {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
@ -132,11 +132,11 @@ func (s *UserStreamListener) GetStreamPosition() types.StreamPosition {
// sincePos specifies from which point we want to be notified about. If there
// has already been an update after sincePos we'll return a closed channel
// immediately.
func (s *UserStreamListener) GetNotifyChannel(sincePos types.StreamPosition) <-chan struct{} {
func (s *UserStreamListener) GetNotifyChannel(sincePos types.SyncPosition) <-chan struct{} {
s.userStream.lock.Lock()
defer s.userStream.lock.Unlock()
if sincePos < s.userStream.pos {
if s.userStream.pos.IsAfter(sincePos) {
// If the listener is behind, i.e. missed a potential update, then we
// want them to wake up immediately. We do this by returning a new
// closed stream, which returns immediately when selected.