mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 06:12:45 +00:00
Roomserver/federation input refactor (#2104)
* Put federation client functions into their own file
* Look for missing auth events in RS input
* Remove retrieveMissingAuthEvents from federation API
* Logging
* Sorta transplanted the code over
* Use event origin failing all else
* Don't get stuck on mutexes:
* Add verifier
* Don't mark state events with zero snapshot NID as not existing
* Check missing state if not an outlier before storing the event
* Reject instead of soft-fail, don't copy roominfo so much
* Use synchronous contexts, limit time to fetch missing events
* Clean up some commented out bits
* Simplify `/send` endpoint significantly
* Submit async
* Report errors on sending to RS input
* Set max payload in NATS to 16MB
* Tweak metrics
* Add `workerForRoom` for tidiness
* Try skipping unmarshalling errors for RespMissingEvents
* Track missing prev events separately to avoid calculating state when not possible
* Tweak logic around checking missing state
* Care about state when checking missing prev events
* Don't check missing state for create events
* Try that again
* Handle create events better
* Send create room events as new
* Use given event kind when sending auth/state events
* Revert "Use given event kind when sending auth/state events"
This reverts commit 089d64d271
.
* Only search for missing prev events or state for new events
* Tweaks
* We only have missing prev if we don't supply state
* Room version tweaks
* Allow async inputs again
* Apply backpressure to consumers/synchronous requests to hopefully stop things being overwhelmed
* Set timeouts on roomserver input tasks (need to decide what timeout makes sense)
* Use work queue policy, deliver all on restart
* Reduce chance of duplicates being sent by NATS
* Limit the number of servers we attempt to reduce backpressure
* Some review comment fixes
* Tidy up a couple things
* Don't limit servers, randomise order using map
* Some context refactoring
* Update gmsl
* Don't resend create events
* Set stateIDs length correctly or else the roomserver thinks there are missing events when there aren't
* Exclude our own servername
* Try backing off servers
* Make excluding self behaviour optional
* Exclude self from g_m_e
* Update sytest-whitelist
* Update consumers for the roomserver output stream
* Remember to send outliers for state returned from /gme
* Make full HTTP tests less upsetti
* Remove 'If a device list update goes missing, the server resyncs on the next one' from the sytest blacklist
* Remove debugging test
* Fix blacklist again, remove unnecessary duplicate context
* Clearer contexts, don't use background in case there's something happening there
* Don't queue up events more than once in memory
* Correctly identify create events when checking for state
* Fill in gaps again in /gme code
* Remove `AuthEventIDs` from `InputRoomEvent`
* Remove stray field
Co-authored-by: Kegan Dougal <kegan@matrix.org>
This commit is contained in:
parent
5b4999afa9
commit
a763cbb0e1
46 changed files with 1549 additions and 1285 deletions
|
@ -22,6 +22,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||
|
@ -37,6 +39,9 @@ func init() {
|
|||
prometheus.MustRegister(processRoomEventDuration)
|
||||
}
|
||||
|
||||
// TODO: Does this value make sense?
|
||||
const MaximumProcessingTime = time.Minute * 2
|
||||
|
||||
var processRoomEventDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: "dendrite",
|
||||
|
@ -60,9 +65,25 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
|
|||
// TODO: Break up function - we should probably do transaction ID checks before calling this.
|
||||
// nolint:gocyclo
|
||||
func (r *Inputer) processRoomEvent(
|
||||
ctx context.Context,
|
||||
inctx context.Context,
|
||||
input *api.InputRoomEvent,
|
||||
) (err error) {
|
||||
select {
|
||||
case <-inctx.Done():
|
||||
// Before we do anything, make sure the context hasn't expired for this pending task.
|
||||
// If it has then we'll give up straight away — it's probably a synchronous input
|
||||
// request and the caller has already given up, but the inbox task was still queued.
|
||||
return context.DeadlineExceeded
|
||||
default:
|
||||
}
|
||||
|
||||
// Wrap the context with a time limit. We'll allow no more than MaximumProcessingTime for
|
||||
// everything that we need to do for this event, or it's possible that we could end up wedging
|
||||
// the roomserver for a very long time.
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel := context.WithTimeout(inctx, MaximumProcessingTime)
|
||||
defer cancel()
|
||||
|
||||
// Measure how long it takes to process this event.
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
|
@ -75,6 +96,11 @@ func (r *Inputer) processRoomEvent(
|
|||
// Parse and validate the event JSON
|
||||
headered := input.Event
|
||||
event := headered.Unwrap()
|
||||
logger := util.GetLogger(ctx).WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"room_id": event.RoomID(),
|
||||
"type": event.Type(),
|
||||
})
|
||||
|
||||
// if we have already got this event then do not process it again, if the input kind is an outlier.
|
||||
// Outliers contain no extra information which may warrant a re-processing.
|
||||
|
@ -87,24 +113,67 @@ func (r *Inputer) processRoomEvent(
|
|||
switch idFormat {
|
||||
case gomatrixserverlib.EventIDFormatV1:
|
||||
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
||||
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the event passes authentication checks and work out
|
||||
// the numeric IDs for the auth events.
|
||||
missingRes := &api.QueryMissingAuthPrevEventsResponse{}
|
||||
serverRes := &fedapi.QueryJoinedHostServerNamesInRoomResponse{}
|
||||
if event.Type() != gomatrixserverlib.MRoomCreate || !event.StateKeyEquals("") {
|
||||
missingReq := &api.QueryMissingAuthPrevEventsRequest{
|
||||
RoomID: event.RoomID(),
|
||||
AuthEventIDs: event.AuthEventIDs(),
|
||||
PrevEventIDs: event.PrevEventIDs(),
|
||||
}
|
||||
if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
|
||||
return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
}
|
||||
}
|
||||
if len(missingRes.MissingAuthEventIDs) > 0 || len(missingRes.MissingPrevEventIDs) > 0 {
|
||||
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
|
||||
RoomID: event.RoomID(),
|
||||
ExcludeSelf: true,
|
||||
}
|
||||
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
}
|
||||
}
|
||||
if input.Origin != "" {
|
||||
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
|
||||
}
|
||||
|
||||
// First of all, check that the auth events of the event are known.
|
||||
// If they aren't then we will ask the federation API for them.
|
||||
isRejected := false
|
||||
authEventNIDs, rejectionErr := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
|
||||
if rejectionErr != nil {
|
||||
logrus.WithError(rejectionErr).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("helpers.CheckAuthEvents failed for event, rejecting event")
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
knownEvents := map[string]*types.Event{}
|
||||
if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||
return fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
||||
}
|
||||
|
||||
// Check if the event is allowed by its auth events. If it isn't then
|
||||
// we consider the event to be "rejected" — it will still be persisted.
|
||||
var rejectionErr error
|
||||
if rejectionErr = gomatrixserverlib.Allowed(event, &authEvents); rejectionErr != nil {
|
||||
isRejected = true
|
||||
logger.WithError(rejectionErr).Warnf("Event %s rejected", event.EventID())
|
||||
}
|
||||
|
||||
// Accumulate the auth event NIDs.
|
||||
authEventIDs := event.AuthEventIDs()
|
||||
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
|
||||
for _, authEventID := range authEventIDs {
|
||||
if _, ok := knownEvents[authEventID]; !ok {
|
||||
return fmt.Errorf("missing auth event %s", authEventID)
|
||||
}
|
||||
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
|
||||
}
|
||||
|
||||
var softfail bool
|
||||
|
@ -113,11 +182,50 @@ func (r *Inputer) processRoomEvent(
|
|||
// current room state.
|
||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
}).WithError(err).Info("Error authing soft-failed event")
|
||||
logger.WithError(err).Info("Error authing soft-failed event")
|
||||
}
|
||||
}
|
||||
|
||||
// At this point we are checking whether we know all of the prev events, and
|
||||
// if we know the state before the prev events. This is necessary before we
|
||||
// try to do `calculateAndSetState` on the event later, otherwise it will fail
|
||||
// with missing event NIDs. If there's anything missing then we'll go and fetch
|
||||
// the prev events and state from the federation. Note that we only do this if
|
||||
// we weren't already told what the state before the event should be — if the
|
||||
// HasState option was set and a state set was provided (as is the case in a
|
||||
// typical federated room join) then we won't bother trying to fetch prev events
|
||||
// because we may not be allowed to see them and we have no choice but to trust
|
||||
// the state event IDs provided to us in the join instead.
|
||||
missingPrev := !input.HasState && len(missingRes.MissingPrevEventIDs) > 0
|
||||
if missingPrev && input.Kind == api.KindNew {
|
||||
// Don't do this for KindOld events, otherwise old events that we fetch
|
||||
// to satisfy missing prev events/state will end up recursively calling
|
||||
// processRoomEvent.
|
||||
if len(serverRes.ServerNames) > 0 {
|
||||
missingState := missingStateReq{
|
||||
origin: input.Origin,
|
||||
inputer: r,
|
||||
queryer: r.Queryer,
|
||||
db: r.DB,
|
||||
federation: r.FSAPI,
|
||||
keys: r.KeyRing,
|
||||
roomsMu: internal.NewMutexByRoom(),
|
||||
servers: map[gomatrixserverlib.ServerName]struct{}{},
|
||||
hadEvents: map[string]bool{},
|
||||
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
|
||||
}
|
||||
for _, serverName := range serverRes.ServerNames {
|
||||
missingState.servers[serverName] = struct{}{}
|
||||
}
|
||||
if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
|
||||
isRejected = true
|
||||
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
|
||||
} else {
|
||||
missingPrev = false
|
||||
}
|
||||
} else {
|
||||
isRejected = true
|
||||
rejectionErr = fmt.Errorf("missing prev events and no other servers to ask")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,12 +248,7 @@ func (r *Inputer) processRoomEvent(
|
|||
// doesn't have any associated state to store and we don't need to
|
||||
// notify anyone about it.
|
||||
if input.Kind == api.KindOutlier {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
"sender": event.Sender(),
|
||||
}).Debug("Stored outlier")
|
||||
logger.Debug("Stored outlier")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -157,24 +260,18 @@ func (r *Inputer) processRoomEvent(
|
|||
return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
|
||||
}
|
||||
|
||||
if stateAtEvent.BeforeStateSnapshotNID == 0 {
|
||||
if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 {
|
||||
// We haven't calculated a state for this event yet.
|
||||
// Lets calculate one.
|
||||
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
|
||||
if err != nil && input.Kind != api.KindOld {
|
||||
err = r.calculateAndSetState(ctx, input, roomInfo, &stateAtEvent, event, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
|
||||
if isRejected || softfail {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
"soft_fail": softfail,
|
||||
"sender": event.Sender(),
|
||||
}).Debug("Stored rejected event")
|
||||
logger.WithError(rejectionErr).WithField("soft_fail", softfail).Debug("Stored rejected event")
|
||||
return rejectionErr
|
||||
}
|
||||
|
||||
|
@ -228,10 +325,127 @@ func (r *Inputer) processRoomEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
// fetchAuthEvents will check to see if any of the
|
||||
// auth events specified by the given event are unknown. If they are
|
||||
// then we will go off and request them from the federation and then
|
||||
// store them in the database. By the time this function ends, either
|
||||
// we've failed to retrieve the auth chain altogether (in which case
|
||||
// an error is returned) or we've successfully retrieved them all and
|
||||
// they are now in the database.
|
||||
func (r *Inputer) fetchAuthEvents(
|
||||
ctx context.Context,
|
||||
logger *logrus.Entry,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
auth *gomatrixserverlib.AuthEvents,
|
||||
known map[string]*types.Event,
|
||||
servers []gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
unknown := map[string]struct{}{}
|
||||
authEventIDs := event.AuthEventIDs()
|
||||
if len(authEventIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, authEventID := range authEventIDs {
|
||||
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
|
||||
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
||||
unknown[authEventID] = struct{}{}
|
||||
continue
|
||||
}
|
||||
ev := authEvents[0]
|
||||
known[authEventID] = &ev // don't take the pointer of the iterated event
|
||||
if err = auth.AddEvent(ev.Event); err != nil {
|
||||
return fmt.Errorf("auth.AddEvent: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// If there are no missing auth events then there is nothing more
|
||||
// to do — we've loaded everything that we need.
|
||||
if len(unknown) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
var res gomatrixserverlib.RespEventAuth
|
||||
var found bool
|
||||
for _, serverName := range servers {
|
||||
// Request the entire auth chain for the event in question. This should
|
||||
// contain all of the auth events — including ones that we already know —
|
||||
// so we'll need to filter through those in the next section.
|
||||
res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomVersion, event.RoomID(), event.EventID())
|
||||
if err != nil {
|
||||
logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err)
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
break
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("no servers provided event auth for event ID %q, tried servers %v", event.EventID(), servers)
|
||||
}
|
||||
|
||||
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
|
||||
res.AuthEvents,
|
||||
gomatrixserverlib.TopologicalOrderByAuthEvents,
|
||||
) {
|
||||
// If we already know about this event from the database then we don't
|
||||
// need to store it again or do anything further with it, so just skip
|
||||
// over it rather than wasting cycles.
|
||||
if ev, ok := known[authEvent.EventID()]; ok && ev != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check the signatures of the event.
|
||||
// TODO: It really makes sense for the federation API to be doing this,
|
||||
// because then it can attempt another server if one serves up an event
|
||||
// with an invalid signature. For now this will do.
|
||||
if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
|
||||
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
|
||||
}
|
||||
|
||||
// In order to store the new auth event, we need to know its auth chain
|
||||
// as NIDs for the `auth_event_nids` column. Let's see if we can find those.
|
||||
authEventNIDs := make([]types.EventNID, 0, len(authEvent.AuthEventIDs()))
|
||||
for _, eventID := range authEvent.AuthEventIDs() {
|
||||
knownEvent, ok := known[eventID]
|
||||
if !ok {
|
||||
return fmt.Errorf("missing auth event %s for %s", eventID, authEvent.EventID())
|
||||
}
|
||||
authEventNIDs = append(authEventNIDs, knownEvent.EventNID)
|
||||
}
|
||||
|
||||
// Let's take a note of the fact that we now know about this event.
|
||||
if err := auth.AddEvent(authEvent); err != nil {
|
||||
return fmt.Errorf("auth.AddEvent: %w", err)
|
||||
}
|
||||
|
||||
// Check if the auth event should be rejected.
|
||||
isRejected := false
|
||||
if err := gomatrixserverlib.Allowed(authEvent, auth); err != nil {
|
||||
isRejected = true
|
||||
logger.WithError(err).Warnf("Auth event %s rejected", authEvent.EventID())
|
||||
}
|
||||
|
||||
// Finally, store the event in the database.
|
||||
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// Now we know about this event, it was stored and the signatures were OK.
|
||||
known[authEvent.EventID()] = &types.Event{
|
||||
EventNID: eventNID,
|
||||
Event: authEvent,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Inputer) calculateAndSetState(
|
||||
ctx context.Context,
|
||||
input *api.InputRoomEvent,
|
||||
roomInfo types.RoomInfo,
|
||||
roomInfo *types.RoomInfo,
|
||||
stateAtEvent *types.StateAtEvent,
|
||||
event *gomatrixserverlib.Event,
|
||||
isRejected bool,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue