From 8cd5fd48491bcd8764fc4a345822580246f76380 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 8 Dec 2021 15:29:39 +0000 Subject: [PATCH] Try to check auth events in the RS input queue --- roomserver/internal/api.go | 15 +++--- roomserver/internal/input/input.go | 2 + roomserver/internal/input/input_events.go | 64 ++++++++++++++++++++++- 3 files changed, 73 insertions(+), 8 deletions(-) diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index bda585cf..2feef38b 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -61,13 +61,6 @@ func NewRoomserverAPI( ServerName: cfg.Matrix.ServerName, ServerACLs: serverACLs, }, - Inputer: &input.Inputer{ - DB: roomserverDB, - OutputRoomEventTopic: outputRoomEventTopic, - Producer: producer, - ServerName: cfg.Matrix.ServerName, - ACLs: serverACLs, - }, // perform-er structs get initialised when we have a federation sender to use } return a @@ -86,6 +79,14 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA r.fsAPI = fsAPI r.SetKeyring(fsAPI.KeyRing()) + r.Inputer = &input.Inputer{ + DB: r.DB, + OutputRoomEventTopic: r.OutputRoomEventTopic, + Producer: r.Producer, + ServerName: r.Cfg.Matrix.ServerName, + ACLs: r.ACLs, + FSAPI: r.fsAPI, + } r.Inviter = &perform.Inviter{ DB: r.DB, Cfg: r.Cfg, diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index de40e133..72ebbcaf 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" @@ -45,6 +46,7 @@ type Inputer struct { Producer sarama.SyncProducer ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs + FSAPI fsAPI.FederationInternalAPI OutputRoomEventTopic string workers sync.Map // room ID -> *inputWorker } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index d8ce9727..0c182bb1 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -22,6 +22,7 @@ import ( "fmt" "time" + fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" @@ -62,7 +63,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec( func (r *Inputer) processRoomEvent( ctx context.Context, input *api.InputRoomEvent, -) (eventID string, err error) { +) (string, error) { // Measure how long it takes to process this event. started := time.Now() defer func() { @@ -98,6 +99,12 @@ func (r *Inputer) processRoomEvent( } } + // First of all, check that the auth events of the event are known. + // If they aren't then we will ask the federation API for them. + if err := r.checkForMissingAuthEvents(ctx, input); err != nil { + return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err) + } + // Check that the event passes authentication checks and work out // the numeric IDs for the auth events. isRejected := false @@ -107,10 +114,17 @@ func (r *Inputer) processRoomEvent( isRejected = true } + // Then check if the prev events are known, which we need in order + // to calculate the state before the event. + if err := r.checkForMissingAuthEvents(ctx, input); err != nil { + return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err) + } + var softfail bool if input.Kind == api.KindNew { // Check that the event passes authentication checks based on the // current room state. + var err error softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs) if err != nil { logrus.WithFields(logrus.Fields{ @@ -228,6 +242,54 @@ func (r *Inputer) processRoomEvent( return event.EventID(), nil } +func (r *Inputer) checkForMissingAuthEvents( + ctx context.Context, + input *api.InputRoomEvent, +) error { + authEventIDs := input.Event.AuthEventIDs() + if len(authEventIDs) == 0 { + return nil + } + + knownAuthEventNIDs, err := r.DB.EventNIDs(ctx, authEventIDs) + if err != nil { + return fmt.Errorf("r.DB.EventNIDs: %w", err) + } + + missingAuthEventIDs := make([]string, 0, len(authEventIDs)-len(knownAuthEventNIDs)) + for _, authEventID := range authEventIDs { + if _, ok := knownAuthEventNIDs[authEventID]; !ok { + missingAuthEventIDs = append(missingAuthEventIDs, authEventID) + } + } + + if len(missingAuthEventIDs) > 0 { + req := &fedapi.QueryEventAuthFromFederationRequest{ + RoomID: input.Event.RoomID(), + EventID: input.Event.EventID(), + } + res := &fedapi.QueryEventAuthFromFederationResponse{} + if err := r.FSAPI.QueryEventAuthFromFederation(ctx, req, res); err != nil { + return fmt.Errorf("r.FSAPI.QueryEventAuthFromFederation: %w", err) + } + + authEventNIDs, rejection := helpers.CheckAuthEvents(ctx, r.DB, input.Event, input.AuthEventIDs) + if _, _, _, _, err := r.DB.StoreEvent(ctx, input.Event.Event, authEventNIDs, rejection != nil); err != nil { + return fmt.Errorf("r.DB.StoreEvent: %w", err) + } + } + + return nil +} + +func (r *Inputer) checkForMissingPrevEvents( + ctx context.Context, + input *api.InputRoomEvent, +) error { + + return nil +} + func (r *Inputer) calculateAndSetState( ctx context.Context, input *api.InputRoomEvent,