mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Try to check auth events in the RS input queue
This commit is contained in:
parent
739acc1291
commit
8cd5fd4849
3 changed files with 73 additions and 8 deletions
|
@ -61,13 +61,6 @@ func NewRoomserverAPI(
|
||||||
ServerName: cfg.Matrix.ServerName,
|
ServerName: cfg.Matrix.ServerName,
|
||||||
ServerACLs: serverACLs,
|
ServerACLs: serverACLs,
|
||||||
},
|
},
|
||||||
Inputer: &input.Inputer{
|
|
||||||
DB: roomserverDB,
|
|
||||||
OutputRoomEventTopic: outputRoomEventTopic,
|
|
||||||
Producer: producer,
|
|
||||||
ServerName: cfg.Matrix.ServerName,
|
|
||||||
ACLs: serverACLs,
|
|
||||||
},
|
|
||||||
// perform-er structs get initialised when we have a federation sender to use
|
// perform-er structs get initialised when we have a federation sender to use
|
||||||
}
|
}
|
||||||
return a
|
return a
|
||||||
|
@ -86,6 +79,14 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
|
||||||
r.fsAPI = fsAPI
|
r.fsAPI = fsAPI
|
||||||
r.SetKeyring(fsAPI.KeyRing())
|
r.SetKeyring(fsAPI.KeyRing())
|
||||||
|
|
||||||
|
r.Inputer = &input.Inputer{
|
||||||
|
DB: r.DB,
|
||||||
|
OutputRoomEventTopic: r.OutputRoomEventTopic,
|
||||||
|
Producer: r.Producer,
|
||||||
|
ServerName: r.Cfg.Matrix.ServerName,
|
||||||
|
ACLs: r.ACLs,
|
||||||
|
FSAPI: r.fsAPI,
|
||||||
|
}
|
||||||
r.Inviter = &perform.Inviter{
|
r.Inviter = &perform.Inviter{
|
||||||
DB: r.DB,
|
DB: r.DB,
|
||||||
Cfg: r.Cfg,
|
Cfg: r.Cfg,
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
|
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/internal/hooks"
|
"github.com/matrix-org/dendrite/internal/hooks"
|
||||||
"github.com/matrix-org/dendrite/roomserver/acls"
|
"github.com/matrix-org/dendrite/roomserver/acls"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
|
@ -45,6 +46,7 @@ type Inputer struct {
|
||||||
Producer sarama.SyncProducer
|
Producer sarama.SyncProducer
|
||||||
ServerName gomatrixserverlib.ServerName
|
ServerName gomatrixserverlib.ServerName
|
||||||
ACLs *acls.ServerACLs
|
ACLs *acls.ServerACLs
|
||||||
|
FSAPI fsAPI.FederationInternalAPI
|
||||||
OutputRoomEventTopic string
|
OutputRoomEventTopic string
|
||||||
workers sync.Map // room ID -> *inputWorker
|
workers sync.Map // room ID -> *inputWorker
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||||
|
@ -62,7 +63,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
|
||||||
func (r *Inputer) processRoomEvent(
|
func (r *Inputer) processRoomEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
) (eventID string, err error) {
|
) (string, error) {
|
||||||
// Measure how long it takes to process this event.
|
// Measure how long it takes to process this event.
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -98,6 +99,12 @@ func (r *Inputer) processRoomEvent(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First of all, check that the auth events of the event are known.
|
||||||
|
// If they aren't then we will ask the federation API for them.
|
||||||
|
if err := r.checkForMissingAuthEvents(ctx, input); err != nil {
|
||||||
|
return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Check that the event passes authentication checks and work out
|
// Check that the event passes authentication checks and work out
|
||||||
// the numeric IDs for the auth events.
|
// the numeric IDs for the auth events.
|
||||||
isRejected := false
|
isRejected := false
|
||||||
|
@ -107,10 +114,17 @@ func (r *Inputer) processRoomEvent(
|
||||||
isRejected = true
|
isRejected = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Then check if the prev events are known, which we need in order
|
||||||
|
// to calculate the state before the event.
|
||||||
|
if err := r.checkForMissingAuthEvents(ctx, input); err != nil {
|
||||||
|
return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
var softfail bool
|
var softfail bool
|
||||||
if input.Kind == api.KindNew {
|
if input.Kind == api.KindNew {
|
||||||
// Check that the event passes authentication checks based on the
|
// Check that the event passes authentication checks based on the
|
||||||
// current room state.
|
// current room state.
|
||||||
|
var err error
|
||||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
|
@ -228,6 +242,54 @@ func (r *Inputer) processRoomEvent(
|
||||||
return event.EventID(), nil
|
return event.EventID(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Inputer) checkForMissingAuthEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
input *api.InputRoomEvent,
|
||||||
|
) error {
|
||||||
|
authEventIDs := input.Event.AuthEventIDs()
|
||||||
|
if len(authEventIDs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
knownAuthEventNIDs, err := r.DB.EventNIDs(ctx, authEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.EventNIDs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
missingAuthEventIDs := make([]string, 0, len(authEventIDs)-len(knownAuthEventNIDs))
|
||||||
|
for _, authEventID := range authEventIDs {
|
||||||
|
if _, ok := knownAuthEventNIDs[authEventID]; !ok {
|
||||||
|
missingAuthEventIDs = append(missingAuthEventIDs, authEventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(missingAuthEventIDs) > 0 {
|
||||||
|
req := &fedapi.QueryEventAuthFromFederationRequest{
|
||||||
|
RoomID: input.Event.RoomID(),
|
||||||
|
EventID: input.Event.EventID(),
|
||||||
|
}
|
||||||
|
res := &fedapi.QueryEventAuthFromFederationResponse{}
|
||||||
|
if err := r.FSAPI.QueryEventAuthFromFederation(ctx, req, res); err != nil {
|
||||||
|
return fmt.Errorf("r.FSAPI.QueryEventAuthFromFederation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
authEventNIDs, rejection := helpers.CheckAuthEvents(ctx, r.DB, input.Event, input.AuthEventIDs)
|
||||||
|
if _, _, _, _, err := r.DB.StoreEvent(ctx, input.Event.Event, authEventNIDs, rejection != nil); err != nil {
|
||||||
|
return fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Inputer) checkForMissingPrevEvents(
|
||||||
|
ctx context.Context,
|
||||||
|
input *api.InputRoomEvent,
|
||||||
|
) error {
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Inputer) calculateAndSetState(
|
func (r *Inputer) calculateAndSetState(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
|
|
Loading…
Reference in a new issue