Use GME context for roomserver queuing

This commit is contained in:
Neil Alexander 2021-06-29 14:13:44 +01:00
parent efb9588a36
commit 957b3787c2
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 20 additions and 17 deletions

View file

@ -734,7 +734,7 @@ func (t *txnReq) processEventWithMissingState(
// newly resolved state. This marks the "oldest" point in the backfill and // newly resolved state. This marks the "oldest" point in the backfill and
// sets the baseline state for any new events after this. // sets the baseline state for any new events after this.
err = api.SendEventWithState( err = api.SendEventWithState(
context.Background(), gmectx,
t.rsAPI, t.rsAPI,
api.KindOld, api.KindOld,
resolvedState, resolvedState,
@ -754,7 +754,7 @@ func (t *txnReq) processEventWithMissingState(
headeredNewEvents[i] = newEvent.Headered(roomVersion) headeredNewEvents[i] = newEvent.Headered(roomVersion)
} }
if err = api.SendEvents( if err = api.SendEvents(
context.Background(), gmectx,
t.rsAPI, t.rsAPI,
api.KindOld, api.KindOld,
append(headeredNewEvents, e.Headered(roomVersion)), append(headeredNewEvents, e.Headered(roomVersion)),

View file

@ -64,6 +64,11 @@ func (w *inputWorker) start() {
if !ok { if !ok {
continue continue
} }
select {
case <-task.ctx.Done():
continue
default:
}
hooks.Run(hooks.KindNewEventReceived, task.event.Event) hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event) _, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil { if task.err == nil {
@ -122,7 +127,7 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
// InputRoomEvents implements api.RoomserverInternalAPI // InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents( func (r *Inputer) InputRoomEvents(
_ context.Context, ctx context.Context,
request *api.InputRoomEventsRequest, request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse, response *api.InputRoomEventsResponse,
) { ) {
@ -154,7 +159,7 @@ func (r *Inputer) InputRoomEvents(
// the wait group, so that the worker can notify us when this specific // the wait group, so that the worker can notify us when this specific
// task has been finished. // task has been finished.
tasks[i] = &inputTask{ tasks[i] = &inputTask{
ctx: context.Background(), ctx: ctx,
event: &request.InputRoomEvents[i], event: &request.InputRoomEvents[i],
wg: wg, wg: wg,
} }
@ -166,7 +171,6 @@ func (r *Inputer) InputRoomEvents(
worker.input.push(tasks[i]) worker.input.push(tasks[i])
} }
/*
// Wait for all of the workers to return results about our tasks. // Wait for all of the workers to return results about our tasks.
wg.Wait() wg.Wait()
@ -180,5 +184,4 @@ func (r *Inputer) InputRoomEvents(
return return
} }
} }
*/
} }