Max ack wait increase, other tweaks

This commit is contained in:
Neil Alexander 2022-01-24 13:21:35 +00:00
parent 2a911aa892
commit bcda9729e4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -45,7 +45,7 @@ var keyContentFields = map[string]string{
} }
// TODO: Does this value make sense? // TODO: Does this value make sense?
const MaximumProcessingTime = time.Minute const MaximumProcessingTime = time.Minute * 2
type Inputer struct { type Inputer struct {
DB storage.Database DB storage.Database
@ -92,9 +92,10 @@ func (r *Inputer) Start() error {
return return
} }
_ = msg.InProgress()
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
worker := r.workerForRoom(roomID) r.workerForRoom(roomID).Act(nil, func() {
worker.Act(nil, func() { _ = msg.InProgress() // resets the acknowledgement wait timer
defer eventIDsQueued.Delete(eventID) defer eventIDsQueued.Delete(eventID)
ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime) ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime)
defer cancel() defer cancel()
@ -112,15 +113,14 @@ func (r *Inputer) Start() error {
// sure that we only acknowledge when we're happy we've done everything we // sure that we only acknowledge when we're happy we've done everything we
// can. This ensures we retry things when it makes sense to do so. // can. This ensures we retry things when it makes sense to do so.
nats.ManualAck(), nats.ManualAck(),
// NATS will try to redeliver things to us automatically if we don't ack
// or nak them within a certain amount of time. This stops that from
// happening, so we don't end up doing a lot of unnecessary duplicate work.
nats.MaxDeliver(0),
// Use a durable named consumer. // Use a durable named consumer.
r.Durable, r.Durable,
// If we've missed things in the stream, e.g. we restarted, then replay // If we've missed things in the stream, e.g. we restarted, then replay
// all of the queued messages that were waiting for us. // all of the queued messages that were waiting for us.
nats.DeliverAll(), nats.DeliverAll(),
// Ensure that NATS doesn't try to resend us something that wasn't done
// within the period of time that we might still be processing it.
nats.AckWait(MaximumProcessingTime+(time.Second*10)),
) )
return err return err
} }