From bcda9729e45befe858602437066a7756e78b056d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 24 Jan 2022 13:21:35 +0000 Subject: [PATCH] Max ack wait increase, other tweaks --- roomserver/internal/input/input.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 07c6ed7e..6090e14d 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -45,7 +45,7 @@ var keyContentFields = map[string]string{ } // TODO: Does this value make sense? -const MaximumProcessingTime = time.Minute +const MaximumProcessingTime = time.Minute * 2 type Inputer struct { DB storage.Database @@ -92,9 +92,10 @@ func (r *Inputer) Start() error { return } + _ = msg.InProgress() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - worker := r.workerForRoom(roomID) - worker.Act(nil, func() { + r.workerForRoom(roomID).Act(nil, func() { + _ = msg.InProgress() // resets the acknowledgement wait timer defer eventIDsQueued.Delete(eventID) ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime) defer cancel() @@ -112,15 +113,14 @@ func (r *Inputer) Start() error { // sure that we only acknowledge when we're happy we've done everything we // can. This ensures we retry things when it makes sense to do so. nats.ManualAck(), - // NATS will try to redeliver things to us automatically if we don't ack - // or nak them within a certain amount of time. This stops that from - // happening, so we don't end up doing a lot of unnecessary duplicate work. - nats.MaxDeliver(0), // Use a durable named consumer. r.Durable, // If we've missed things in the stream, e.g. we restarted, then replay // all of the queued messages that were waiting for us. nats.DeliverAll(), + // Ensure that NATS doesn't try to resend us something that wasn't done + // within the period of time that we might still be processing it. + nats.AckWait(MaximumProcessingTime+(time.Second*10)), ) return err }