Merge branch 'master' into 1323-archived-rooms-sync-left-rooms

This commit is contained in:
Neil Alexander 2021-01-13 10:06:21 +00:00
commit 2322d8f027
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
40 changed files with 1644 additions and 1389 deletions

View file

@ -22,8 +22,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -32,15 +32,17 @@ import (
type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
@ -52,7 +54,8 @@ func NewOutputClientDataConsumer(
s := &OutputClientDataConsumer{
clientAPIConsumer: &consumer,
db: store,
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -81,7 +84,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
"room_id": output.RoomID,
}).Info("received data from client API server")
pduPos, err := s.db.UpsertAccountData(
streamPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
)
if err != nil {
@ -92,7 +95,8 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Panicf("could not save account data")
}
s.notifier.OnNewEvent(nil, "", []string{string(msg.Key)}, types.StreamingToken{PDUPosition: pduPos})
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
return nil
}

View file

@ -18,14 +18,13 @@ import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -33,7 +32,8 @@ import (
type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@ -41,8 +41,9 @@ type OutputReceiptEventConsumer struct {
func NewOutputReceiptEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
@ -55,7 +56,8 @@ func NewOutputReceiptEventConsumer(
s := &OutputReceiptEventConsumer{
receiptConsumer: &consumer,
db: store,
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -87,7 +89,8 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
if err != nil {
return err
}
// update stream position
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil

View file

@ -22,8 +22,8 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -35,7 +35,8 @@ type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
notifier *sync.Notifier
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@ -43,8 +44,9 @@ type OutputSendToDeviceEventConsumer struct {
func NewOutputSendToDeviceEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
@ -58,7 +60,8 @@ func NewOutputSendToDeviceEventConsumer(
sendToDeviceConsumer: &consumer,
db: store,
serverName: cfg.Matrix.ServerName,
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -102,6 +105,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
return err
}
s.stream.Advance(streamPos)
s.notifier.OnNewSendToDevice(
output.UserID,
[]string{output.DeviceID},

View file

@ -19,10 +19,11 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
log "github.com/sirupsen/logrus"
)
@ -30,8 +31,9 @@ import (
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
eduCache *cache.EDUCache
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
@ -39,8 +41,10 @@ type OutputTypingEventConsumer struct {
func NewOutputTypingEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
eduCache *cache.EDUCache,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
@ -52,8 +56,9 @@ func NewOutputTypingEventConsumer(
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
db: store,
notifier: n,
eduCache: eduCache,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -63,10 +68,10 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
s.db.SetTypingTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: types.StreamPosition(latestSyncPosition)})
s.eduCache.SetTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
pos := types.StreamPosition(latestSyncPosition)
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: pos})
})
return s.typingConsumer.Start()
}
@ -87,11 +92,17 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
var typingPos types.StreamPosition
typingEvent := output.Event
if typingEvent.Typing {
typingPos = s.db.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime)
typingPos = types.StreamPosition(
s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
)
} else {
typingPos = s.db.RemoveTypingUser(typingEvent.UserID, typingEvent.RoomID)
typingPos = types.StreamPosition(
s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
)
}
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return nil
}

View file

@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -34,12 +34,13 @@ import (
type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
notifier *notifier.Notifier
stream types.PartitionedStreamProvider
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@ -48,10 +49,11 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
stream types.PartitionedStreamProvider,
) *OutputKeyChangeEventConsumer {
consumer := internal.ContinualConsumer{
@ -69,7 +71,8 @@ func NewOutputKeyChangeEventConsumer(
rsAPI: rsAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: n,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
@ -114,14 +117,15 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.StreamingToken{
DeviceListPosition: types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
},
posUpdate := types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
}
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
return nil
}

View file

@ -23,8 +23,8 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
@ -32,19 +32,23 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
notifier *notifier.Notifier,
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
@ -55,11 +59,13 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
cfg: cfg,
rsConsumer: &consumer,
db: store,
notifier: n,
rsAPI: rsAPI,
cfg: cfg,
rsConsumer: &consumer,
db: store,
notifier: notifier,
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@ -180,7 +186,8 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -219,7 +226,8 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
return err
}
s.notifier.OnNewEvent(ev, "", nil, types.StreamingToken{PDUPosition: pduPos})
s.pduStream.Advance(pduPos)
s.notifier.OnNewEvent(ev, ev.RoomID(), nil, types.StreamingToken{PDUPosition: pduPos})
return nil
}
@ -274,7 +282,10 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
}).Panicf("roomserver output log: write invite failure")
return nil
}
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
return nil
}
@ -290,9 +301,11 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
}).Panicf("roomserver output log: remove invite failure")
return nil
}
// Notify any active sync requests that the invite has been retired.
// Invites share the same stream counter as PDUs
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
return nil
}
@ -307,12 +320,13 @@ func (s *OutputRoomEventConsumer) onNewPeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
// tell the notifier about the new peek so it knows to wake up new devices
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}
@ -327,12 +341,13 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
}).Panicf("roomserver output log: write peek failure")
return nil
}
// tell the notifier about the new peek so it knows to wake up new devices
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.StreamingToken{PDUPosition: sp})
// tell the notifier about the new peek so it knows to wake up new devices
// TODO: This only works because the peeks table is reusing the same
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}

View file

@ -49,8 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) {
userID string, res *types.Response, from, to types.LogPosition,
) (newPos types.LogPosition, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
@ -58,7 +58,7 @@ func DeviceListCatchup(
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return false, err
return to, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
@ -73,13 +73,13 @@ func DeviceListCatchup(
offset = sarama.OffsetOldest
// Extract partition/offset from sync token
// TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
if !from.DeviceListPosition.IsEmpty() {
partition = from.DeviceListPosition.Partition
offset = from.DeviceListPosition.Offset
if !from.IsEmpty() {
partition = from.Partition
offset = from.Offset
}
var toOffset int64
toOffset = sarama.OffsetNewest
if toLog := to.DeviceListPosition; toLog.Partition == partition && toLog.Offset > 0 {
if toLog := to; toLog.Partition == partition && toLog.Offset > 0 {
toOffset = toLog.Offset
}
var queryRes api.QueryKeyChangesResponse
@ -91,7 +91,7 @@ func DeviceListCatchup(
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
return hasNew, nil
return to, hasNew, nil
}
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
var sharedUsersMap map[string]int
@ -128,13 +128,12 @@ func DeviceListCatchup(
}
}
// set the new token
to.DeviceListPosition = types.LogPosition{
to = types.LogPosition{
Partition: queryRes.Partition,
Offset: queryRes.Offset,
}
res.NextBatch.ApplyUpdates(to)
return hasNew, nil
return to, hasNew, nil
}
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.

View file

@ -16,12 +16,10 @@ import (
var (
syncingUser = "@alice:localhost"
emptyToken = types.StreamingToken{}
newestToken = types.StreamingToken{
DeviceListPosition: types.LogPosition{
Offset: sarama.OffsetNewest,
Partition: 0,
},
emptyToken = types.LogPosition{}
newestToken = types.LogPosition{
Offset: sarama.OffsetNewest,
Partition: 0,
}
)
@ -180,7 +178,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -203,7 +201,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -226,7 +224,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -248,7 +246,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -307,7 +305,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -335,7 +333,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
_, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -420,7 +418,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(
_, hasNew, err := DeviceListCatchup(
context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
)
if err != nil {

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
package notifier
import (
"context"
@ -48,9 +48,9 @@ type Notifier struct {
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func NewNotifier(pos types.StreamingToken) *Notifier {
func NewNotifier(currPos types.StreamingToken) *Notifier {
return &Notifier{
currPos: pos,
currPos: currPos,
roomIDToJoinedUsers: make(map[string]userIDSet),
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
@ -124,12 +124,24 @@ func (n *Notifier) OnNewEvent(
}
}
func (n *Notifier) OnNewPeek(
roomID, userID, deviceID string,
func (n *Notifier) OnNewAccountData(
userID string, posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.wakeupUsers([]string{userID}, nil, posUpdate)
}
func (n *Notifier) OnNewPeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.addPeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
@ -138,10 +150,12 @@ func (n *Notifier) OnNewPeek(
func (n *Notifier) OnRetirePeek(
roomID, userID, deviceID string,
posUpdate types.StreamingToken,
) {
n.streamLock.Lock()
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
n.removePeekingDevice(roomID, userID, deviceID)
// we don't wake up devices here given the roomserver consumer will do this shortly afterwards
@ -206,7 +220,7 @@ func (n *Notifier) OnNewInvite(
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener {
func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
// Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
// - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
// - Incoming events wake requests for a matching room ID
@ -220,7 +234,7 @@ func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener {
n.removeEmptyUserStreams()
return n.fetchUserDeviceStream(req.device.UserID, req.device.ID, true).GetListener(req.ctx)
return n.fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context)
}
// Load the membership states required to notify users correctly.

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
package notifier
import (
"context"
@ -326,16 +326,16 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
func waitForEvents(n *Notifier, req types.SyncRequest) (types.StreamingToken, error) {
listener := n.GetListener(req)
defer listener.Close()
select {
case <-time.After(5 * time.Second):
return types.StreamingToken{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
"waitForEvents timed out waiting for %s (pos=%v)", req.Device.UserID, req.Since,
)
case <-listener.GetNotifyChannel(req.since):
case <-listener.GetNotifyChannel(req.Since):
p := listener.GetSyncPosition()
return p, nil
}
@ -358,17 +358,17 @@ func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStre
return n.fetchUserDeviceStream(userID, deviceID, true)
}
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syncRequest {
return syncRequest{
device: userapi.Device{
func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest {
return types.SyncRequest{
Device: &userapi.Device{
UserID: userID,
ID: deviceID,
},
timeout: 1 * time.Minute,
since: since,
wantFullState: false,
limit: gomatrixserverlib.DefaultRoomEventFilter().Limit,
log: util.GetLogger(context.TODO()),
ctx: context.TODO(),
Timeout: 1 * time.Minute,
Since: since,
WantFullState: false,
Limit: 20,
Log: util.GetLogger(context.TODO()),
Context: context.TODO(),
}
}

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sync
package notifier
import (
"context"

View file

@ -16,11 +16,9 @@ package storage
import (
"context"
"time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@ -30,6 +28,26 @@ import (
type Database interface {
internal.PartitionStorer
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.
@ -56,18 +74,6 @@ type Database interface {
// Returns an empty slice if no state events could be found for this room.
// Returns an error if there was an issue with the retrieval.
GetStateEventsForRoom(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error)
// SyncPosition returns the latest positions for syncing.
SyncPosition(ctx context.Context) (types.StreamingToken, error)
// IncrementalSync returns all the data needed in order to create an incremental
// sync response for the given user. Events returned will include any client
// transaction IDs associated with the given device. These transaction IDs come
// from when the device sent the event via an API that included a transaction
// ID. A response object must be provided for IncrementaSync to populate - it
// will not create one.
IncrementalSync(ctx context.Context, res *types.Response, device userapi.Device, fromPos, toPos types.StreamingToken, filter *gomatrixserverlib.Filter, wantFullState bool) (*types.Response, error)
// CompleteSync returns a complete /sync API response for the given user. A response object
// must be provided for CompleteSync to populate - it will not create one.
CompleteSync(ctx context.Context, res *types.Response, device userapi.Device, filter *gomatrixserverlib.Filter) (*types.Response, error)
// GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes
@ -97,15 +103,6 @@ type Database interface {
// DeletePeek deletes all peeks for a given room by a given user
// Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
// SetTypingTimeoutCallback sets a callback function that is called right after
// a user is removed from the typing user list due to timeout.
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
// AddTypingUser adds a typing user to the typing cache.
// Returns the newly calculated sync position for typing notifications.
AddTypingUser(userID, roomID string, expireTime *time.Time) types.StreamPosition
// RemoveTypingUser removes a typing user from the typing cache.
// Returns the newly calculated sync position for typing notifications.
RemoveTypingUser(userID, roomID string) types.StreamPosition
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
@ -120,8 +117,6 @@ type Database interface {
// matches the streamevent.transactionID device then the transaction ID gets
// added to the unsigned section of the output event.
StreamEventsToEvents(device *userapi.Device, in []types.StreamEvent) []*gomatrixserverlib.HeaderedEvent
// AddSendToDevice increases the EDU position in the cache and returns the stream position.
AddSendToDevice() types.StreamPosition
// SendToDeviceUpdatesForSync returns a list of send-to-device updates. It returns three lists:
// - "events": a list of send-to-device events that should be included in the sync
// - "changes": a list of send-to-device events that should be updated in the database by

View file

@ -96,7 +96,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
}
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
lastPos := types.StreamPosition(0)
lastPos := streamPos
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
if err != nil {
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)

View file

@ -20,7 +20,6 @@ import (
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
@ -106,7 +105,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
EDUCache: cache.New(),
}
return &d, nil
}

File diff suppressed because it is too large Load diff

View file

@ -101,7 +101,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
lastPos := types.StreamPosition(0)
lastPos := streamPos
params := make([]interface{}, len(roomIDs)+1)
params[0] = streamPos
for k, v := range roomIDs {

View file

@ -21,7 +21,6 @@ import (
// Import the sqlite3 package
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/storage/shared"
@ -119,7 +118,6 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Filter: filter,
SendToDevice: sendToDevice,
Receipts: receipts,
EDUCache: cache.New(),
}
return nil
}

View file

@ -1,5 +1,7 @@
package storage_test
// TODO: Fix these tests
/*
import (
"context"
"crypto/ed25519"
@ -746,3 +748,4 @@ func reversed(in []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.Header
}
return out
}
*/

View file

@ -0,0 +1,132 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
type AccountDataStreamProvider struct {
StreamProvider
userAPI userapi.UserInternalAPI
}
func (p *AccountDataStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *AccountDataStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
dataReq := &userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
}
dataRes := &userapi.QueryAccountDataResponse{}
if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
return p.LatestPosition(ctx)
}
for datatype, databody := range dataRes.GlobalAccountData {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
}
for r, j := range req.Response.Rooms.Join {
for datatype, databody := range dataRes.RoomAccountData[r] {
j.AccountData.Events = append(
j.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
req.Response.Rooms.Join[r] = j
}
}
return p.LatestPosition(ctx)
}
func (p *AccountDataStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
r := types.Range{
From: from,
To: to,
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
dataTypes, err := p.DB.GetAccountDataInRange(
ctx, req.Device.UserID, r, &accountDataFilter,
)
if err != nil {
req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
return from
}
if len(dataTypes) == 0 {
// TODO: this fixes the sytest but is it the right thing to do?
dataTypes[""] = []string{"m.push_rules"}
}
// Iterate over the rooms
for roomID, dataTypes := range dataTypes {
// Request the missing data from the database
for _, dataType := range dataTypes {
dataReq := userapi.QueryAccountDataRequest{
UserID: req.Device.UserID,
RoomID: roomID,
DataType: dataType,
}
dataRes := userapi.QueryAccountDataResponse{}
err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
if err != nil {
req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
continue
}
if roomID == "" {
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
req.Response.AccountData.Events = append(
req.Response.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(globalData),
},
)
}
} else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := req.Response.Rooms.Join[roomID]
joinData.AccountData.Events = append(
joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(roomData),
},
)
req.Response.Rooms.Join[roomID] = joinData
}
}
}
}
return to
}

View file

@ -0,0 +1,43 @@
package streams
import (
"context"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/types"
)
type DeviceListStreamProvider struct {
PartitionedStreamProvider
rsAPI api.RoomserverInternalAPI
keyAPI keyapi.KeyInternalAPI
}
func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.LogPosition {
return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
}
func (p *DeviceListStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.LogPosition,
) types.LogPosition {
var err error
to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
}
err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response)
if err != nil {
req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
return from
}
return to
}

View file

@ -0,0 +1,64 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type InviteStreamProvider struct {
StreamProvider
}
func (p *InviteStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForInvites(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *InviteStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *InviteStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
r := types.Range{
From: from,
To: to,
}
invites, retiredInvites, err := p.DB.InviteEventsInRange(
ctx, req.Device.UserID, r,
)
if err != nil {
req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
return from
}
for roomID, inviteEvent := range invites {
ir := types.NewInviteResponse(inviteEvent)
req.Response.Rooms.Invite[roomID] = *ir
}
for roomID := range retiredInvites {
if _, ok := req.Response.Rooms.Join[roomID]; !ok {
lr := types.NewLeaveResponse()
req.Response.Rooms.Leave[roomID] = *lr
}
}
return to
}

View file

@ -0,0 +1,440 @@
package streams
import (
"context"
"fmt"
"strconv"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type PDUStreamProvider struct {
StreamProvider
}
func (p *PDUStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *PDUStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
from := types.StreamPosition(0)
to := p.LatestPosition(ctx)
// Get the current sync position which we will base the sync response on.
// For complete syncs, we want to start at the most recent events and work
// backwards, so that we show the most recent events in the room.
r := types.Range{
From: to,
To: 0,
Backwards: true,
}
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
}
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &req.Filter, req.Limit, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return from
}
req.Response.Rooms.Join[roomID] = *jr
req.Rooms[roomID] = gomatrixserverlib.Join
}
// Add peeked rooms.
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
return from
}
for _, peek := range peeks {
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &req.Filter, req.Limit, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
return from
}
req.Response.Rooms.Peek[peek.RoomID] = *jr
}
}
if req.Filter.Room.IncludeLeave {
var leaveRoomIDs []string
// Extract room state and recent events for all rooms the user has left
leaveRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Leave)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
}
// Build up a /sync response. Add leave rooms.
for _, roomID := range leaveRoomIDs {
var lr *types.LeaveResponse
lr, err = p.getLeaveResponseForCompleteSync(
ctx, roomID, r, &req.Filter, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getLeaveResponseForCompleteSync failed")
return from
}
req.Response.Rooms.Leave[roomID] = *lr
}
}
return to
}
// nolint:gocyclo
func (p *PDUStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) (newPos types.StreamPosition) {
r := types.Range{
From: from,
To: to,
Backwards: from > to,
}
newPos = to
var err error
var stateDeltas []types.StateDelta
var joinedRooms []string
stateFilter := req.Filter.Room.State
if req.WantFullState {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
return
}
} else {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
return
}
}
for _, roomID := range joinedRooms {
req.Rooms[roomID] = gomatrixserverlib.Join
}
for _, delta := range stateDeltas {
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return newPos
}
}
return r.To
}
func (p *PDUStreamProvider) addRoomDeltaToResponse(
ctx context.Context,
device *userapi.Device,
r types.Range,
delta types.StateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.MembershipPos
}
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, delta.RoomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
return err
}
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil {
return err
}
// XXX: should we ever get this far if we have no recent events or state in this room?
// in practice we do for peeks, but possibly not joins?
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
return nil
}
switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.RoomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.RoomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.RoomID] = *lr
}
return nil
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
roomID string,
r types.Range,
filter *gomatrixserverlib.Filter,
numRecentEventsPerRoom int,
device *userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State)
if err != nil {
return
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = p.DB.RecentEvents(
ctx, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
return
}
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, device, limited)
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
if err != nil {
return
}
prevBatch = &types.TopologyToken{
Depth: backwardTopologyPos,
PDUPosition: backwardStreamPos,
}
prevBatch.Decrement()
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr = types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return jr, nil
}
func (p *PDUStreamProvider) getLeaveResponseForCompleteSync(
ctx context.Context,
roomID string,
r types.Range,
filter *gomatrixserverlib.Filter,
device *userapi.Device,
) (lr *types.LeaveResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State)
if err != nil {
return
}
numRecentEventsPerRoom := filter.Room.Timeline.Limit
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = p.DB.RecentEvents(
ctx, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
return
}
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, device, limited)
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
if err != nil {
return
}
prevBatch = &types.TopologyToken{
Depth: backwardTopologyPos,
PDUPosition: backwardStreamPos,
}
prevBatch.Decrement()
}
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
lr = types.NewLeaveResponse()
lr.Timeline.PrevBatch = prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = limited
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
return lr, nil
}
// nolint:gocyclo
func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility(
recentStreamEvents []types.StreamEvent,
device *userapi.Device,
limited bool,
) ([]types.StreamEvent, bool) {
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
joinEventIndex := -1
leaveEventIndex := -1
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
ev := recentStreamEvents[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
membership, _ := ev.Membership()
if membership == gomatrixserverlib.Join {
joinEventIndex = i
if i > 0 {
// the create event happens before the first join, so we should cut it at that point instead
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
joinEventIndex = i - 1
}
}
break
} else if membership == gomatrixserverlib.Leave {
leaveEventIndex = i
}
if joinEventIndex != -1 && leaveEventIndex != -1 {
break
}
}
}
// Default at the start of the array
sliceStart := 0
// If there is a joinEvent, then cut all events earlier the join (exclude the join itself too)
if joinEventIndex != -1 {
sliceStart = joinEventIndex + 1
limited = false // so clients know not to try to backpaginate
}
// Default to spanning the rest of the array
sliceEnd := len(recentStreamEvents)
// If there is a leaveEvent, then cut all events after the person left (exclude the leave event too)
if leaveEventIndex != -1 {
sliceEnd = leaveEventIndex
}
type somematrixevent struct {
event_id, sender, eventType, origin_server_ts, content string
}
events := make([]somematrixevent, len(recentStreamEvents))
for i, v := range recentStreamEvents {
events[i] = somematrixevent{
event_id: v.HeaderedEvent.Event.EventID(),
sender: v.HeaderedEvent.Event.Sender(),
eventType: v.HeaderedEvent.Event.Type(),
origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10),
content: string(v.HeaderedEvent.Event.Content()),
}
}
logrus.WithFields(logrus.Fields{
"sliceStart": sliceStart,
"sliceEnd": sliceEnd,
"recentStreamEvents": fmt.Sprintf("%+v", events),
}).Info("cutting down the events")
outEvents := recentStreamEvents[sliceStart:sliceEnd]
logrus.WithFields(logrus.Fields{
"recentStreamEvents": fmt.Sprintf("%+v", events[sliceStart:sliceEnd]),
}).Info("cutting down the events after")
return outEvents, limited
}
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
}
// TODO: This is a linear scan over all the current state events in this room. This will
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
// then do a binary search to find matching events, similar to what roomserver does.
for j := 0; j < len(stateEvents); j++ {
if stateEvents[j].EventID() == recentEv.EventID() {
// overwrite the element to remove with the last element then pop the last element.
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
// (we don't care about the order of stateEvents)
stateEvents[j] = stateEvents[len(stateEvents)-1]
stateEvents = stateEvents[:len(stateEvents)-1]
break // there shouldn't be multiple events with the same event ID
}
}
}
return stateEvents
}

View file

@ -0,0 +1,91 @@
package streams
import (
"context"
"encoding/json"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type ReceiptStreamProvider struct {
StreamProvider
}
func (p *ReceiptStreamProvider) Setup() {
p.StreamProvider.Setup()
id, err := p.DB.MaxStreamPositionForReceipts(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *ReceiptStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *ReceiptStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var joinedRooms []string
for roomID, membership := range req.Rooms {
if membership == gomatrixserverlib.Join {
joinedRooms = append(joinedRooms, roomID)
}
}
lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
return from
}
if len(receipts) == 0 || lastPos == 0 {
return to
}
// Group receipts by room, so we can create one ClientEvent for every room
receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
for _, receipt := range receipts {
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
}
for roomID, receipts := range receiptsByRoom {
jr := req.Response.Rooms.Join[roomID]
var ok bool
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MReceipt,
RoomID: roomID,
}
content := make(map[string]eduAPI.ReceiptMRead)
for _, receipt := range receipts {
var read eduAPI.ReceiptMRead
if read, ok = content[receipt.EventID]; !ok {
read = eduAPI.ReceiptMRead{
User: make(map[string]eduAPI.ReceiptTS),
}
}
read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read
}
ev.Content, err = json.Marshal(content)
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
req.Response.Rooms.Join[roomID] = jr
}
return lastPos
}

View file

@ -0,0 +1,51 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
)
type SendToDeviceStreamProvider struct {
StreamProvider
}
func (p *SendToDeviceStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *SendToDeviceStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// See if we have any new tasks to do for the send-to-device messaging.
lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since)
if err != nil {
req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
return from
}
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
// Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database.
err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since)
if err != nil {
req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
return from
}
}
if len(events) > 0 {
// Add the updates into the sync response.
for _, event := range events {
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
}
}
return lastPos
}

View file

@ -0,0 +1,57 @@
package streams
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type TypingStreamProvider struct {
StreamProvider
EDUCache *cache.EDUCache
}
func (p *TypingStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
}
func (p *TypingStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
var err error
for roomID, membership := range req.Rooms {
if membership != gomatrixserverlib.Join {
continue
}
jr := req.Response.Rooms.Join[roomID]
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(from),
); updated {
ev := gomatrixserverlib.ClientEvent{
Type: gomatrixserverlib.MTyping,
}
ev.Content, err = json.Marshal(map[string]interface{}{
"user_ids": users,
})
if err != nil {
req.Log.WithError(err).Error("json.Marshal failed")
return from
}
jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
req.Response.Rooms.Join[roomID] = jr
}
}
return to
}

View file

@ -0,0 +1,78 @@
package streams
import (
"context"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type Streams struct {
PDUStreamProvider types.StreamProvider
TypingStreamProvider types.StreamProvider
ReceiptStreamProvider types.StreamProvider
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
}
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
eduCache *cache.EDUCache,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
EDUCache: eduCache,
},
ReceiptStreamProvider: &ReceiptStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
InviteStreamProvider: &InviteStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
StreamProvider: StreamProvider{DB: d},
},
AccountDataStreamProvider: &AccountDataStreamProvider{
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
rsAPI: rsAPI,
keyAPI: keyAPI,
},
}
streams.PDUStreamProvider.Setup()
streams.TypingStreamProvider.Setup()
streams.ReceiptStreamProvider.Setup()
streams.InviteStreamProvider.Setup()
streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
return streams
}
func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
return types.StreamingToken{
PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx),
InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
}
}

View file

@ -0,0 +1,38 @@
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type PartitionedStreamProvider struct {
DB storage.Database
latest types.LogPosition
latestMutex sync.RWMutex
}
func (p *PartitionedStreamProvider) Setup() {
}
func (p *PartitionedStreamProvider) Advance(
latest types.LogPosition,
) {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest.IsAfter(&p.latest) {
p.latest = latest
}
}
func (p *PartitionedStreamProvider) LatestPosition(
ctx context.Context,
) types.LogPosition {
p.latestMutex.RLock()
defer p.latestMutex.RUnlock()
return p.latest
}

View file

@ -0,0 +1,38 @@
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type StreamProvider struct {
DB storage.Database
latest types.StreamPosition
latestMutex sync.RWMutex
}
func (p *StreamProvider) Setup() {
}
func (p *StreamProvider) Advance(
latest types.StreamPosition,
) {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest > p.latest {
p.latest = latest
}
}
func (p *StreamProvider) LatestPosition(
ctx context.Context,
) types.StreamPosition {
p.latestMutex.RLock()
defer p.latestMutex.RUnlock()
return p.latest
}

View file

@ -15,8 +15,8 @@
package sync
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
@ -26,24 +26,13 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)
const defaultSyncTimeout = time.Duration(0)
const defaultIncludeLeave = false
const DefaultTimelineLimit = 20
// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
type syncRequest struct {
ctx context.Context
device userapi.Device
filter *gomatrixserverlib.Filter
timeout time.Duration
since types.StreamingToken // nil means that no since token was supplied
wantFullState bool
log *log.Entry
}
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) {
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
@ -52,7 +41,7 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
var err error
since, err = types.NewStreamTokenFromString(sinceStr)
if err != nil {
return nil, err
return nil, fmt.Errorf("types.NewStreamTokenFromString: %w", err)
}
}
@ -62,30 +51,50 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
if filterQuery != "" {
if filterQuery[0] == '{' {
// attempt to parse the timeline limit at least
json.Unmarshal([]byte(filterQuery), &f)
if err := json.Unmarshal([]byte(filterQuery), &f); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("json.Unmarshal failed")
return nil, fmt.Errorf("json.Unmarshal: %w", err)
}
} else {
// attempt to load the filter ID
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
return nil, err
return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
f, err = syncDB.GetFilter(req.Context(), localpart, filterQuery)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed")
return nil, err
return nil, fmt.Errorf("syncDB.GetFilter: %w", err)
}
}
}
filter := gomatrixserverlib.DefaultFilter()
if f != nil {
filter = *f
}
// TODO: Additional query params: set_presence, filter
return &syncRequest{
ctx: req.Context(),
device: device,
timeout: timeout,
since: since,
wantFullState: wantFullState,
filter: f,
log: util.GetLogger(req.Context()),
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
"user_id": device.UserID,
"device_id": device.ID,
"since": since,
"timeout": timeout,
"limit": filter.Room.Timeline.Limit,
})
return &types.SyncRequest{
Context: req.Context(), //
Log: logger, //
Device: &device, //
Response: types.NewResponse(), // Populated by all streams
Filter: filter, //
Since: since, //
Timeout: timeout, //
Limit: filter.Room.Timeline.Limit, //
Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, //
}, nil
}

View file

@ -17,8 +17,6 @@
package sync
import (
"context"
"fmt"
"net"
"net/http"
"strings"
@ -30,13 +28,13 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@ -44,19 +42,30 @@ type RequestPool struct {
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
Notifier *Notifier
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(
db storage.Database, cfg *config.SyncAPI, n *Notifier,
db storage.Database, cfg *config.SyncAPI,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
) *RequestPool {
rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}}
rp := &RequestPool{
db: db,
cfg: cfg,
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
lastseen: sync.Map{},
streams: streams,
Notifier: notifier,
}
go rp.cleanLastSeen()
return rp
}
@ -128,8 +137,6 @@ var waitingSyncRequests = prometheus.NewGauge(
// called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out.
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
var syncData *types.Response
// Extract values from request
syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
@ -139,88 +146,108 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
"user_id": device.UserID,
"device_id": device.ID,
"since": syncReq.since,
"timeout": syncReq.timeout,
"filter": syncReq.filter,
})
activeSyncRequests.Inc()
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
currPos := rp.Notifier.CurrentPosition()
if rp.shouldReturnImmediately(syncReq) {
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
}
}
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
// Otherwise, we wait for the notifier to tell us if something *may* have
// happened. We loop in case it turns out that nothing did happen.
currentPos := rp.Notifier.CurrentPosition()
timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
defer timer.Stop()
if !rp.shouldReturnImmediately(syncReq) {
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
defer timer.Stop()
userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close()
userStreamListener := rp.Notifier.GetListener(*syncReq)
defer userStreamListener.Close()
// We need the loop in case userStreamListener wakes up even if there isn't
// anything to send down. In this case, we'll jump out of the select but
// don't want to send anything back until we get some actual content to
// respond with, so we skip the return an go back to waiting for content to
// be sent down or the request timing out.
var hasTimedOut bool
sincePos := syncReq.since
for {
select {
// Wait for notifier to wake us up
case <-userStreamListener.GetNotifyChannel(sincePos):
currPos = userStreamListener.GetSyncPosition()
// Or for timeout to expire
case <-timer.C:
// We just need to ensure we get out of the select after reaching the
// timeout, but there's nothing specific we want to do in this case
// apart from that, so we do nothing except stating we're timing out
// and need to respond.
hasTimedOut = true
// Or for the request to be cancelled
case <-req.Context().Done():
logger.WithError(err).Error("request cancelled")
return jsonerror.InternalServerError()
}
// Note that we don't time out during calculation of sync
// response. This ensures that we don't waste the hard work
// of calculating the sync only to get timed out before we
// can respond
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
if err != nil {
logger.WithError(err).Error("rp.currentSyncForUser failed")
return jsonerror.InternalServerError()
}
if !syncData.IsEmpty() || hasTimedOut {
logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
giveup := func() util.JSONResponse {
syncReq.Response.NextBatch = syncReq.Since
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncData,
JSON: syncReq.Response,
}
}
select {
case <-syncReq.Context.Done(): // Caller gave up
return giveup()
case <-timer.C: // Timeout reached
return giveup()
case <-userStreamListener.GetNotifyChannel(syncReq.Since):
syncReq.Log.Debugln("Responding to sync after wake-up")
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
}
} else {
syncReq.Log.Debugln("Responding to sync immediately")
}
if syncReq.Since.IsEmpty() {
// Complete sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
}
} else {
// Incremental sync
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.PDUPosition, currentPos.PDUPosition,
),
TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.TypingPosition, currentPos.TypingPosition,
),
ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
),
InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.InvitePosition, currentPos.InvitePosition,
),
SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
),
AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
}
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: syncReq.Response,
}
}
@ -247,21 +274,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
}
}
// work out room joins/leaves
var f gomatrixserverlib.Filter
f.Room.Timeline.Limit = 10
res, err := rp.db.IncrementalSync(
req.Context(), types.NewResponse(), *device, fromToken, toToken, &f, false,
)
syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
util.GetLogger(req.Context()).WithError(err).Error("newSyncRequest failed")
return jsonerror.InternalServerError()
}
res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition)
_, _, err = internal.DeviceListCatchup(
req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID,
syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition,
)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
@ -270,199 +294,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
Changed []string `json:"changed"`
Left []string `json:"left"`
}{
Changed: res.DeviceLists.Changed,
Left: res.DeviceLists.Left,
Changed: syncReq.Response.DeviceLists.Changed,
Left: syncReq.Response.DeviceLists.Left,
},
}
}
// nolint:gocyclo
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) {
res := types.NewResponse()
// See if we have any new tasks to do for the send-to-device messaging.
lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since)
if err != nil {
return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
}
// TODO: handle ignored users
if req.since.IsEmpty() {
res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.filter)
if err != nil {
return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
}
} else {
res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.filter, req.wantFullState)
if err != nil {
return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
}
}
accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
if err != nil {
return res, fmt.Errorf("rp.appendAccountData: %w", err)
}
res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos)
if err != nil {
return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
}
err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
if err != nil {
return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
}
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
// Then add the updates into the sync response.
if len(updates) > 0 || len(deletions) > 0 {
// Handle the updates and deletions in the database.
err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since)
if err != nil {
return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
}
}
if len(events) > 0 {
// Add the updates into the sync response.
for _, event := range events {
res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
}
}
res.NextBatch.SendToDevicePosition = lastPos
return res, err
}
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
if err != nil {
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
}
return data, nil
}
// nolint:gocyclo
func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
accountDataFilter *gomatrixserverlib.EventFilter,
) (*types.Response, error) {
// TODO: Account data doesn't have a sync position of its own, meaning that
// account data might be sent multiple time to the client if multiple account
// data keys were set between two message. This isn't a huge issue since the
// duplicate data doesn't represent a huge quantity of data, but an optimisation
// here would be making sure each data is sent only once to the client.
if req.since.IsEmpty() {
// If this is the initial sync, we don't need to check if a data has
// already been sent. Instead, we send the whole batch.
dataReq := &userapi.QueryAccountDataRequest{
UserID: userID,
}
dataRes := &userapi.QueryAccountDataResponse{}
if err := rp.userAPI.QueryAccountData(req.ctx, dataReq, dataRes); err != nil {
return nil, err
}
for datatype, databody := range dataRes.GlobalAccountData {
data.AccountData.Events = append(
data.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
}
for r, j := range data.Rooms.Join {
for datatype, databody := range dataRes.RoomAccountData[r] {
j.AccountData.Events = append(
j.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: datatype,
Content: gomatrixserverlib.RawJSON(databody),
},
)
data.Rooms.Join[r] = j
}
}
return data, nil
}
r := types.Range{
From: req.since.PDUPosition,
To: currentPos,
}
// If both positions are the same, it means that the data was saved after the
// latest room event. In that case, we need to decrement the old position as
// results are exclusive of Low.
if r.Low() == r.High() {
r.From--
}
// Sync is not initial, get all account data since the latest sync
dataTypes, err := rp.db.GetAccountDataInRange(
req.ctx, userID, r, accountDataFilter,
)
if err != nil {
return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
}
if len(dataTypes) == 0 {
// TODO: this fixes the sytest but is it the right thing to do?
dataTypes[""] = []string{"m.push_rules"}
}
// Iterate over the rooms
for roomID, dataTypes := range dataTypes {
// Request the missing data from the database
for _, dataType := range dataTypes {
dataReq := userapi.QueryAccountDataRequest{
UserID: userID,
RoomID: roomID,
DataType: dataType,
}
dataRes := userapi.QueryAccountDataResponse{}
err = rp.userAPI.QueryAccountData(req.ctx, &dataReq, &dataRes)
if err != nil {
continue
}
if roomID == "" {
if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
data.AccountData.Events = append(
data.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(globalData),
},
)
}
} else {
if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
joinData := data.Rooms.Join[roomID]
joinData.AccountData.Events = append(
joinData.AccountData.Events,
gomatrixserverlib.ClientEvent{
Type: dataType,
Content: gomatrixserverlib.RawJSON(roomData),
},
)
data.Rooms.Join[roomID] = joinData
}
}
}
}
return data, nil
}
// shouldReturnImmediately returns whether the /sync request is an initial sync,
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool {
if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState {
return true
}
waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)
return werr == nil && waiting
return false
}

View file

@ -20,6 +20,7 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@ -28,8 +29,10 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/sync"
)
@ -50,57 +53,54 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to connect to sync db")
}
pos, err := syncDB.SyncPosition(context.Background())
if err != nil {
logrus.WithError(err).Panicf("failed to get sync position")
eduCache := cache.New()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
notifier := notifier.NewNotifier(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
notifier := sync.NewNotifier(pos)
err = notifier.Load(context.Background(), syncDB)
if err != nil {
logrus.WithError(err).Panicf("failed to start notifier")
}
requestPool := sync.NewRequestPool(syncDB, cfg, notifier, userAPI, keyAPI, rsAPI)
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, notifier, keyAPI, rsAPI, syncDB,
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI,
cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
clientConsumer := consumers.NewOutputClientDataConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
}
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
)
if err = sendToDeviceConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
cfg, consumer, notifier, syncDB,
cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")

53
syncapi/types/provider.go Normal file
View file

@ -0,0 +1,53 @@
package types
import (
"context"
"time"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
type SyncRequest struct {
Context context.Context
Log *logrus.Entry
Device *userapi.Device
Response *Response
Filter gomatrixserverlib.Filter
Since StreamingToken
Limit int
Timeout time.Duration
WantFullState bool
// Updated by the PDU stream.
Rooms map[string]string
}
type StreamProvider interface {
Setup()
// Advance will update the latest position of the stream based on
// an update and will wake callers waiting on StreamNotifyAfter.
Advance(latest StreamPosition)
// CompleteSync will update the response to include all updates as needed
// for a complete sync. It will always return immediately.
CompleteSync(ctx context.Context, req *SyncRequest) StreamPosition
// IncrementalSync will update the response to include all updates between
// the from and to sync positions. It will always return immediately,
// making no changes if the range contains no updates.
IncrementalSync(ctx context.Context, req *SyncRequest, from, to StreamPosition) StreamPosition
// LatestPosition returns the latest stream position for this stream.
LatestPosition(ctx context.Context) StreamPosition
}
type PartitionedStreamProvider interface {
Setup()
Advance(latest LogPosition)
CompleteSync(ctx context.Context, req *SyncRequest) LogPosition
IncrementalSync(ctx context.Context, req *SyncRequest, from, to LogPosition) LogPosition
LatestPosition(ctx context.Context) LogPosition
}

View file

@ -35,6 +35,15 @@ var (
ErrInvalidSyncTokenLen = fmt.Errorf("Sync token has an invalid length")
)
type StateDelta struct {
RoomID string
StateEvents []*gomatrixserverlib.HeaderedEvent
Membership string
// The PDU stream position of the latest membership event for this user, if applicable.
// Can be 0 if there is no membership event in this delta.
MembershipPos StreamPosition
}
// StreamPosition represents the offset in the sync stream a client is at.
type StreamPosition int64
@ -114,6 +123,7 @@ type StreamingToken struct {
ReceiptPosition StreamPosition
SendToDevicePosition StreamPosition
InvitePosition StreamPosition
AccountDataPosition StreamPosition
DeviceListPosition LogPosition
}
@ -130,10 +140,10 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
"s%d_%d_%d_%d_%d",
"s%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition,
t.InvitePosition, t.AccountDataPosition,
)
if dl := t.DeviceListPosition; !dl.IsEmpty() {
posStr += fmt.Sprintf(".dl-%d-%d", dl.Partition, dl.Offset)
@ -154,6 +164,8 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.InvitePosition > other.InvitePosition:
return true
case t.AccountDataPosition > other.AccountDataPosition:
return true
case t.DeviceListPosition.IsAfter(&other.DeviceListPosition):
return true
}
@ -161,7 +173,7 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
}
func (t *StreamingToken) IsEmpty() bool {
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition == 0 && t.DeviceListPosition.IsEmpty()
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition == 0 && t.DeviceListPosition.IsEmpty()
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@ -178,22 +190,25 @@ func (t *StreamingToken) WithUpdates(other StreamingToken) StreamingToken {
// streaming token contains any positions that are not 0, they are considered updates
// and will overwrite the value in the token.
func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.PDUPosition > 0 {
if other.PDUPosition > t.PDUPosition {
t.PDUPosition = other.PDUPosition
}
if other.TypingPosition > 0 {
if other.TypingPosition > t.TypingPosition {
t.TypingPosition = other.TypingPosition
}
if other.ReceiptPosition > 0 {
if other.ReceiptPosition > t.ReceiptPosition {
t.ReceiptPosition = other.ReceiptPosition
}
if other.SendToDevicePosition > 0 {
if other.SendToDevicePosition > t.SendToDevicePosition {
t.SendToDevicePosition = other.SendToDevicePosition
}
if other.InvitePosition > 0 {
if other.InvitePosition > t.InvitePosition {
t.InvitePosition = other.InvitePosition
}
if other.DeviceListPosition.Offset > 0 {
if other.AccountDataPosition > t.AccountDataPosition {
t.AccountDataPosition = other.AccountDataPosition
}
if other.DeviceListPosition.IsAfter(&t.DeviceListPosition) {
t.DeviceListPosition = other.DeviceListPosition
}
}
@ -286,7 +301,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
}
categories := strings.Split(tok[1:], ".")
parts := strings.Split(categories[0], "_")
var positions [5]StreamPosition
var positions [6]StreamPosition
for i, p := range parts {
if i > len(positions) {
break
@ -304,6 +319,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
ReceiptPosition: positions[2],
SendToDevicePosition: positions[3],
InvitePosition: positions[4],
AccountDataPosition: positions[5],
}
// dl-0-1234
// $log_name-$partition-$offset

View file

@ -10,10 +10,10 @@ import (
func TestNewSyncTokenWithLogs(t *testing.T) {
tests := map[string]*StreamingToken{
"s4_0_0_0_0": {
"s4_0_0_0_0_0": {
PDUPosition: 4,
},
"s4_0_0_0_0.dl-0-123": {
"s4_0_0_0_0_0.dl-0-123": {
PDUPosition: 4,
DeviceListPosition: LogPosition{
Partition: 0,
@ -42,10 +42,10 @@ func TestNewSyncTokenWithLogs(t *testing.T) {
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
"s4_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3_5": StreamingToken{3, 1, 2, 3, 5, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(),
"s4_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, LogPosition{}}.String(),
"s3_1_0_0_0_0.dl-1-2": StreamingToken{3, 1, 0, 0, 0, 0, LogPosition{1, 2}}.String(),
"s3_1_2_3_5_0": StreamingToken{3, 1, 2, 3, 5, 0, LogPosition{}}.String(),
"t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {