diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index acffde5d..1eab6780 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -54,8 +54,10 @@ type Inputer struct { func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( r.InputRoomEventTopic, + // We specifically don't use jetstream.WithJetStreamMessage here because we + // queue the task off to a room-specific queue and the ACK needs to be sent + // later, possibly with an error response to the inputter if synchronous. func(msg *nats.Msg) { - _ = msg.InProgress() roomID := msg.Header.Get("room_id") defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent @@ -65,17 +67,22 @@ func (r *Inputer) Start() error { } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { - _ = msg.InProgress() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) - _ = msg.Respond([]byte(err.Error())) } else { hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) } _ = msg.Ack() }) }, + // NATS wants to acknowledge automatically by default when the message is + // read from the stream, but we want to override that behaviour by making + // sure that we only acknowledge when we're happy we've done everything we + // can. This ensures we retry things when it makes sense to do so. nats.ManualAck(), + // NATS will try to redeliver things to us automatically if we don't ack + // or nak them within a certain amount of time. This stops that from + // happening, so we don't end up doing a lot of unnecessary duplicate work. nats.MaxDeliver(0), ) return err