Allow RS input API to time out without leaking goroutines

This commit is contained in:
Neil Alexander 2021-06-29 14:36:38 +01:00
parent 7e86f81909
commit 7c9f6b5872
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -44,8 +44,7 @@ type Inputer struct {
type inputTask struct { type inputTask struct {
ctx context.Context ctx context.Context
event *api.InputRoomEvent event *api.InputRoomEvent
wg *sync.WaitGroup err chan error
err error // written back by worker, only safe to read when all tasks are done
} }
type inputWorker struct { type inputWorker struct {
@ -70,13 +69,13 @@ func (w *inputWorker) start() {
default: default:
} }
hooks.Run(hooks.KindNewEventReceived, task.event.Event) hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event) _, err := w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil { if err == nil {
hooks.Run(hooks.KindNewEventPersisted, task.event.Event) hooks.Run(hooks.KindNewEventPersisted, task.event.Event)
} else { } else {
sentry.CaptureException(task.err) sentry.CaptureException(err)
} }
task.wg.Done() task.err <- err
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
return return
} }
@ -134,9 +133,11 @@ func (r *Inputer) InputRoomEvents(
// Create a wait group. Each task that we dispatch will call Done on // Create a wait group. Each task that we dispatch will call Done on
// this wait group so that we know when all of our events have been // this wait group so that we know when all of our events have been
// processed. // processed.
wg := &sync.WaitGroup{} ctx, cancel := context.WithCancel(ctx)
wg.Add(len(request.InputRoomEvents)) defer cancel()
tasks := make([]*inputTask, len(request.InputRoomEvents)) count := len(request.InputRoomEvents)
wait := make(chan error, count)
tasks := make([]*inputTask, count)
for i, e := range request.InputRoomEvents { for i, e := range request.InputRoomEvents {
// Work out if we are running per-room workers or if we're just doing // Work out if we are running per-room workers or if we're just doing
@ -161,7 +162,7 @@ func (r *Inputer) InputRoomEvents(
tasks[i] = &inputTask{ tasks[i] = &inputTask{
ctx: ctx, ctx: ctx,
event: &request.InputRoomEvents[i], event: &request.InputRoomEvents[i],
wg: wg, err: wait,
} }
// Send the task to the worker. // Send the task to the worker.
@ -171,15 +172,23 @@ func (r *Inputer) InputRoomEvents(
worker.input.push(tasks[i]) worker.input.push(tasks[i])
} }
// Wait for all of the workers to return results about our tasks. // Wait for the request context to close and then
wg.Wait() go func() {
<-ctx.Done()
close(wait)
}()
// Wait for all of the workers to return results about our tasks.
// If any of the tasks returned an error, we should probably report // If any of the tasks returned an error, we should probably report
// that back to the caller. // that back to the caller.
for _, task := range tasks { for err := range wait {
if task.err != nil { count--
response.ErrMsg = task.err.Error() if count == 0 {
_, rejected := task.err.(*gomatrixserverlib.NotAllowed) cancel()
}
if err != nil {
response.ErrMsg = err.Error()
_, rejected := err.(*gomatrixserverlib.NotAllowed)
response.NotAllowed = rejected response.NotAllowed = rejected
return return
} }