From 10e1d347e2a7435c4a187e9d742c9874e170c6d0 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Fri, 7 Jan 2022 11:57:11 +0000 Subject: [PATCH] Use synchronous contexts, limit time to fetch missing events --- roomserver/internal/input/input.go | 3 +- roomserver/internal/input/input_events.go | 40 +++++++++++++--------- roomserver/internal/input/input_missing.go | 6 ++-- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 8922bc44..ea5af908 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -126,7 +126,7 @@ func (r *Inputer) InputRoomEvents( inputRoomEvent := e inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { - err := r.processRoomEvent(context.TODO(), &inputRoomEvent) + err := r.processRoomEvent(ctx, &inputRoomEvent) if err != nil { sentry.CaptureException(err) } else { @@ -142,6 +142,7 @@ func (r *Inputer) InputRoomEvents( for i := 0; i < len(request.InputRoomEvents); i++ { select { case <-ctx.Done(): + response.ErrMsg = context.DeadlineExceeded.Error() return case err := <-responses: if err != nil { diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index db631b63..4e982afc 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -84,7 +84,7 @@ func (r *Inputer) processRoomEvent( }) if input.Origin == "" { - input.Origin = event.Origin() + // input.Origin = event.Origin() } logger.Println("XXX: Processing event") @@ -131,17 +131,18 @@ func (r *Inputer) processRoomEvent( return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err) } } + if input.Origin != "" { + serverRes.ServerNames = append([]gomatrixserverlib.ServerName{input.Origin}, serverRes.ServerNames...) + } // First of all, check that the auth events of the event are known. // If they aren't then we will ask the federation API for them. isRejected := false authEvents := gomatrixserverlib.NewAuthEvents(nil) knownEvents := map[string]*types.Event{} - logger.Println("Starting to check for missing auth events") if err = r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents, serverRes.ServerNames); err != nil { return fmt.Errorf("r.checkForMissingAuthEvents: %w", err) } - logger.Println("Checked for missing auth events") // Check if the event is allowed by its auth events. If it isn't then // we consider the event to be "rejected" — it will still be persisted. @@ -172,21 +173,26 @@ func (r *Inputer) processRoomEvent( } if input.Kind != api.KindOutlier && len(missingRes.MissingPrevEventIDs) > 0 { - missingState := missingStateReq{ - origin: input.Origin, - inputer: r, - queryer: r.Queryer, - db: r.DB, - federation: r.FSAPI, - keys: r.KeyRing, - roomsMu: internal.NewMutexByRoom(), - servers: []gomatrixserverlib.ServerName{input.Origin}, - hadEvents: map[string]bool{}, - haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, - } - if err = missingState.processEventWithMissingState(ctx, input.Event.Unwrap(), input.Event.RoomVersion); err != nil { + if len(serverRes.ServerNames) > 0 { + missingState := missingStateReq{ + origin: input.Origin, + inputer: r, + queryer: r.Queryer, + db: r.DB, + federation: r.FSAPI, + keys: r.KeyRing, + roomsMu: internal.NewMutexByRoom(), + servers: serverRes.ServerNames, + hadEvents: map[string]bool{}, + haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{}, + } + if err = missingState.processEventWithMissingState(ctx, input.Event.Unwrap(), input.Event.RoomVersion); err != nil { + isRejected = true + rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) + } + } else { isRejected = true - rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err) + rejectionErr = fmt.Errorf("missing prev events and no other servers to ask") } } diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 9fcaa56b..79a76af9 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" fedapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" @@ -369,9 +370,10 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve var missingResp *gomatrixserverlib.RespMissingEvents for _, server := range t.servers { - logger.Infof("Trying server %q for missing events", server) var m gomatrixserverlib.RespMissingEvents - if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ + rctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + if m, err = t.federation.LookupMissingEvents(rctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ Limit: 20, // The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events. EarliestEvents: latestEvents,