diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1bc14a3b..27809e54 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -105,9 +105,9 @@ func (r *Inputer) Start() error { nats.MaxDeliver(0), // Use a durable named consumer. r.Durable, - // Only process one message at a time, rather than have NATS flood us with - // more messages when we're still busy working on the last one. - nats.MaxAckPending(1), + // If we've missed things in the stream, e.g. we restarted, then replay + // all of the queued messages that were waiting for us. + nats.DeliverAll(), ) return err } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index 0fd31082..5810a2a9 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -24,7 +24,7 @@ var ( var streams = []*nats.StreamConfig{ { Name: InputRoomEvent, - Retention: nats.InterestPolicy, + Retention: nats.WorkQueuePolicy, Storage: nats.FileStorage, }, {