mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 13:22:46 +00:00
bugfix: E2EE device keys could sometimes not be sent to remote servers (#2466)
* Fix flakey sytest 'Local device key changes get to remote servers' * Debug logs * Remove internal/test and use /test only Remove a lot of ancient code too. * Use FederationRoomserverAPI in more places * Use more interfaces in federationapi; begin adding regression test * Linting * Add regression test * Unbreak tests * ALL THE LOGS * Fix a race condition which could cause events to not be sent to servers If a new room event which rewrites state arrives, we remove all joined hosts then re-calculate them. This wasn't done in a transaction so for a brief period we would have no joined hosts. During this interim, key change events which arrive would not be sent to destination servers. This would sporadically fail on sytest. * Unbreak new tests * Linting
This commit is contained in:
parent
cd82460513
commit
6de29c1cd2
48 changed files with 566 additions and 618 deletions
|
@ -39,7 +39,7 @@ type KeyChangeConsumer struct {
|
|||
db storage.Database
|
||||
queues *queue.OutgoingQueues
|
||||
serverName gomatrixserverlib.ServerName
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
rsAPI roomserverAPI.FederationRoomserverAPI
|
||||
topic string
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ func NewKeyChangeConsumer(
|
|||
js nats.JetStreamContext,
|
||||
queues *queue.OutgoingQueues,
|
||||
store storage.Database,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
rsAPI roomserverAPI.FederationRoomserverAPI,
|
||||
) *KeyChangeConsumer {
|
||||
return &KeyChangeConsumer{
|
||||
ctx: process.Context(),
|
||||
|
@ -120,6 +120,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
|||
logger.WithError(err).Error("failed to calculate joined rooms for user")
|
||||
return true
|
||||
}
|
||||
logrus.Infof("DEBUG: %v joined rooms for user %v", queryRes.RoomIDs, m.UserID)
|
||||
// send this key change to all servers who share rooms with this user.
|
||||
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||
if err != nil {
|
||||
|
@ -128,6 +129,9 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
|
|||
}
|
||||
|
||||
if len(destinations) == 0 {
|
||||
logger.WithField("num_rooms", len(queryRes.RoomIDs)).Debug("user is in no federated rooms")
|
||||
destinations, err = t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, false)
|
||||
logrus.Infof("GetJoinedHostsForRooms exclude self=false -> %v %v", destinations, err)
|
||||
return true
|
||||
}
|
||||
// Pack the EDU and marshal it
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||
|
@ -36,7 +37,7 @@ import (
|
|||
type OutputRoomEventConsumer struct {
|
||||
ctx context.Context
|
||||
cfg *config.FederationAPI
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
rsAPI api.FederationRoomserverAPI
|
||||
jetstream nats.JetStreamContext
|
||||
durable string
|
||||
db storage.Database
|
||||
|
@ -51,7 +52,7 @@ func NewOutputRoomEventConsumer(
|
|||
js nats.JetStreamContext,
|
||||
queues *queue.OutgoingQueues,
|
||||
store storage.Database,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
rsAPI api.FederationRoomserverAPI,
|
||||
) *OutputRoomEventConsumer {
|
||||
return &OutputRoomEventConsumer{
|
||||
ctx: process.Context(),
|
||||
|
@ -89,15 +90,7 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
|||
switch output.Type {
|
||||
case api.OutputTypeNewRoomEvent:
|
||||
ev := output.NewRoomEvent.Event
|
||||
|
||||
if output.NewRoomEvent.RewritesState {
|
||||
if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
|
||||
log.WithError(err).Errorf("roomserver output log: purge room state failure")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.processMessage(*output.NewRoomEvent); err != nil {
|
||||
if err := s.processMessage(*output.NewRoomEvent, output.NewRoomEvent.RewritesState); err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
|
@ -145,7 +138,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
|
|||
|
||||
// processMessage updates the list of currently joined hosts in the room
|
||||
// and then sends the event to the hosts that were joined before the event.
|
||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
|
||||
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rewritesState bool) error {
|
||||
addsStateEvents, missingEventIDs := ore.NeededStateEventIDs()
|
||||
|
||||
// Ask the roomserver and add in the rest of the results into the set.
|
||||
|
@ -164,7 +157,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
addsStateEvents = append(addsStateEvents, eventsRes.Events...)
|
||||
}
|
||||
|
||||
addsJoinedHosts, err := joinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents))
|
||||
addsJoinedHosts, err := JoinedHostsFromEvents(gomatrixserverlib.UnwrapEventHeaders(addsStateEvents))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -173,13 +166,13 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
// expressed as a delta against the current state.
|
||||
// TODO(#290): handle EventIDMismatchError and recover the current state by
|
||||
// talking to the roomserver
|
||||
logrus.Infof("room %s adds joined hosts: %v removes %v", ore.Event.RoomID(), addsJoinedHosts, ore.RemovesStateEventIDs)
|
||||
oldJoinedHosts, err := s.db.UpdateRoom(
|
||||
s.ctx,
|
||||
ore.Event.RoomID(),
|
||||
ore.LastSentEventID,
|
||||
ore.Event.EventID(),
|
||||
addsJoinedHosts,
|
||||
ore.RemovesStateEventIDs,
|
||||
rewritesState, // if we're re-writing state, nuke all joined hosts before adding
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -238,7 +231,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
combinedAddsJoinedHosts, err := joinedHostsFromEvents(combinedAddsEvents)
|
||||
combinedAddsJoinedHosts, err := JoinedHostsFromEvents(combinedAddsEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -284,10 +277,10 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// joinedHostsFromEvents turns a list of state events into a list of joined hosts.
|
||||
// JoinedHostsFromEvents turns a list of state events into a list of joined hosts.
|
||||
// This errors if one of the events was invalid.
|
||||
// It should be impossible for an invalid event to get this far in the pipeline.
|
||||
func joinedHostsFromEvents(evs []*gomatrixserverlib.Event) ([]types.JoinedHost, error) {
|
||||
func JoinedHostsFromEvents(evs []*gomatrixserverlib.Event) ([]types.JoinedHost, error) {
|
||||
var joinedHosts []types.JoinedHost
|
||||
for _, ev := range evs {
|
||||
if ev.Type() != "m.room.member" || ev.StateKey() == nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue