From 03a989d5c960ecd3c14393bb91165971ca20f0d4 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 24 Jan 2022 11:15:20 +0000 Subject: [PATCH] Set timeouts on roomserver input tasks (need to decide what timeout makes sense) --- roomserver/internal/input/input.go | 19 +++++++++++++++---- roomserver/internal/input/input_events.go | 7 +++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index fbd0c79f..1bc14a3b 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "sync" + "time" "github.com/Arceliar/phony" "github.com/getsentry/sentry-go" @@ -42,6 +43,9 @@ var keyContentFields = map[string]string{ "m.room.member": "membership", } +// TODO: Does this value make sense? +const MaximumProcessingTime = time.Minute + type Inputer struct { DB storage.Database JetStream nats.JetStreamContext @@ -78,9 +82,11 @@ func (r *Inputer) Start() error { } roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() worker := r.workerForRoom(roomID) - worker.Act(worker, func() { + worker.Act(nil, func() { + ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime) + defer cancel() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { + if err := r.processRoomEvent(ctx, &inputRoomEvent); err != nil { sentry.CaptureException(err) } else { hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) @@ -99,6 +105,9 @@ func (r *Inputer) Start() error { nats.MaxDeliver(0), // Use a durable named consumer. r.Durable, + // Only process one message at a time, rather than have NATS flood us with + // more messages when we're still busy working on the last one. + nats.MaxAckPending(1), ) return err } @@ -135,9 +144,11 @@ func (r *Inputer) InputRoomEvents( roomID := inputRoomEvent.Event.RoomID() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() worker := r.workerForRoom(roomID) - worker.Act(worker, func() { + worker.Act(nil, func() { + reqctx, cancel := context.WithTimeout(ctx, MaximumProcessingTime) + defer cancel() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() - err := r.processRoomEvent(ctx, &inputRoomEvent) + err := r.processRoomEvent(reqctx, &inputRoomEvent) if err != nil { sentry.CaptureException(err) } else { diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 3ed83294..7c9d1f18 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -65,6 +65,13 @@ func (r *Inputer) processRoomEvent( ctx context.Context, input *api.InputRoomEvent, ) (err error) { + // Before we do anything, make sure the context hasn't expired for this pending task. + select { + case <-ctx.Done(): + return context.DeadlineExceeded + default: + } + // Measure how long it takes to process this event. started := time.Now() defer func() {