From 9ddb8749c1076ecad9928ef6640752fa4cfd03c1 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 24 Jan 2022 11:59:28 +0000 Subject: [PATCH] Use work queue policy, deliver all on restart --- roomserver/internal/input/input.go | 6 +++--- setup/jetstream/streams.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1bc14a3b..27809e54 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -105,9 +105,9 @@ func (r *Inputer) Start() error { nats.MaxDeliver(0), // Use a durable named consumer. r.Durable, - // Only process one message at a time, rather than have NATS flood us with - // more messages when we're still busy working on the last one. - nats.MaxAckPending(1), + // If we've missed things in the stream, e.g. we restarted, then replay + // all of the queued messages that were waiting for us. + nats.DeliverAll(), ) return err } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 0fd31082..5810a2a9 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -24,7 +24,7 @@ var ( var streams = []*nats.StreamConfig{ { Name: InputRoomEvent, - Retention: nats.InterestPolicy, + Retention: nats.WorkQueuePolicy, Storage: nats.FileStorage, }, {