Cross-signing fixes, notifications via sync, federation (#1974)

* Initial work on signing key update EDUs

* Fix build

* Produce/consume EDUs

* Producer logging

* Only produce key change notifications for local users

* Better naming

* Try to notify sync

* Enable feature

* Use key change topic

* Don't bother verifying signatures, validate key lengths if we can, notifier fixes

* Copyright notices

* Remove tests from whitelist until matrix-org/sytest#1117

* Some review comment fixes

* Update to matrix-org/gomatrixserverlib@f9416ac

* Remove unneeded parameter
This commit is contained in:
Neil Alexander 2021-08-17 13:44:30 +01:00 committed by GitHub
parent 8a4b90b7dd
commit ff21675c5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 556 additions and 254 deletions

View file

@ -29,6 +29,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
@ -104,13 +105,23 @@ func (s *OutputKeyChangeEventConsumer) updateOffset(msg *sarama.ConsumerMessage)
func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
defer s.updateOffset(msg)
var output api.DeviceMessage
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server")
sentry.CaptureException(err)
return err
var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)
case api.TypeDeviceKeyUpdate:
fallthrough
default:
return s.onDeviceKeyMessage(m, msg.Offset, msg.Partition)
}
}
func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error {
output := m.DeviceKeys
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
@ -124,8 +135,35 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.LogPosition{
Offset: msg.Offset,
Partition: msg.Partition,
Offset: offset,
Partition: partition,
}
s.stream.Advance(posUpdate)
for userID := range queryRes.UserIDsToCount {
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
return nil
}
func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, offset int64, partition int32) error {
output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
UserID: output.UserID,
}, &queryRes)
if err != nil {
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
return err
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
posUpdate := types.LogPosition{
Offset: offset,
Partition: partition,
}
s.stream.Advance(posUpdate)