Add QueryKeyChanges (#1228)

Hook some things up to call it as well.
This commit is contained in:
Kegsay 2020-07-28 18:25:16 +01:00 committed by GitHub
parent adf7b59294
commit 9a5fb489c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 119 additions and 22 deletions

View file

@ -26,16 +26,17 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
currentStateAPI currentstateAPI.CurrentStateInternalAPI
// keyAPI api.KeyInternalAPI
keyChangeConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
currentStateAPI currentstateAPI.CurrentStateInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
}
@ -46,6 +47,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
) *OutputKeyChangeEventConsumer {
@ -60,6 +62,7 @@ func NewOutputKeyChangeEventConsumer(
keyChangeConsumer: &consumer,
db: store,
serverName: serverName,
keyAPI: keyAPI,
currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
@ -115,21 +118,44 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
// be already filled in with join/leave information.
func (s *OutputKeyChangeEventConsumer) Catchup(
ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
) (hasNew bool, err error) {
) (newTok *types.StreamingToken, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return false, err
return nil, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
hasNew = len(changed) > 0 || len(left) > 0
}
// TODO: now also track users who we already share rooms with but who have updated their devices between the two tokens
// now also track users who we already share rooms with but who have updated their devices between the two tokens
// TODO: Extract partition/offset from sync token
var partition int32
var offset int64
var queryRes api.QueryKeyChangesResponse
s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
Partition: partition,
Offset: offset,
}, &queryRes)
if queryRes.Error != nil {
// don't fail the catchup because we may have got useful information by tracking membership
util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
} else {
// TODO: Make a new streaming token using the new offset
userSet := make(map[string]bool)
for _, userID := range res.DeviceLists.Changed {
userSet[userID] = true
}
for _, userID := range queryRes.UserIDs {
if !userSet[userID] {
res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
}
}
}
return
}