diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 9c750923..291b68e1 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -734,7 +734,7 @@ func (t *txnReq) processEventWithMissingState( // newly resolved state. This marks the "oldest" point in the backfill and // sets the baseline state for any new events after this. err = api.SendEventWithState( - context.Background(), + gmectx, t.rsAPI, api.KindOld, resolvedState, @@ -754,7 +754,7 @@ func (t *txnReq) processEventWithMissingState( headeredNewEvents[i] = newEvent.Headered(roomVersion) } if err = api.SendEvents( - context.Background(), + gmectx, t.rsAPI, api.KindOld, append(headeredNewEvents, e.Headered(roomVersion)), diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index d135a9f4..4d6da229 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -64,6 +64,11 @@ func (w *inputWorker) start() { if !ok { continue } + select { + case <-task.ctx.Done(): + continue + default: + } hooks.Run(hooks.KindNewEventReceived, task.event.Event) _, task.err = w.r.processRoomEvent(task.ctx, task.event) if task.err == nil { @@ -122,7 +127,7 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er // InputRoomEvents implements api.RoomserverInternalAPI func (r *Inputer) InputRoomEvents( - _ context.Context, + ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { @@ -154,7 +159,7 @@ func (r *Inputer) InputRoomEvents( // the wait group, so that the worker can notify us when this specific // task has been finished. tasks[i] = &inputTask{ - ctx: context.Background(), + ctx: ctx, event: &request.InputRoomEvents[i], wg: wg, } @@ -166,19 +171,17 @@ func (r *Inputer) InputRoomEvents( worker.input.push(tasks[i]) } - /* - // Wait for all of the workers to return results about our tasks. - wg.Wait() + // Wait for all of the workers to return results about our tasks. + wg.Wait() - // If any of the tasks returned an error, we should probably report - // that back to the caller. - for _, task := range tasks { - if task.err != nil { - response.ErrMsg = task.err.Error() - _, rejected := task.err.(*gomatrixserverlib.NotAllowed) - response.NotAllowed = rejected - return - } + // If any of the tasks returned an error, we should probably report + // that back to the caller. + for _, task := range tasks { + if task.err != nil { + response.ErrMsg = task.err.Error() + _, rejected := task.err.(*gomatrixserverlib.NotAllowed) + response.NotAllowed = rejected + return } - */ + } }