diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 1eab6780..c58b0825 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -59,7 +59,7 @@ func (r *Inputer) Start() error { // later, possibly with an error response to the inputter if synchronous. func(msg *nats.Msg) { roomID := msg.Header.Get("room_id") - defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { _ = msg.Term() @@ -67,6 +67,7 @@ func (r *Inputer) Start() error { } inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) } else { @@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents( if _, err = r.JetStream.PublishMsg(msg); err != nil { return } - roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } } else { responses := make(chan error, len(request.InputRoomEvents)) defer close(responses) for _, e := range request.InputRoomEvents { inputRoomEvent := e - inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) + roomID := inputRoomEvent.Event.RoomID() + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() inbox.(*phony.Inbox).Act(nil, func() { + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(context.TODO(), &inputRoomEvent) if err != nil { sentry.CaptureException(err)