diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 80d6d2c5..f411de8e 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -57,12 +57,14 @@ func (r *Inputer) Start() error { r.InputRoomEventTopic, func(msg *nats.Msg) { _ = msg.InProgress() + roomID := msg.Header.Get("room_id") + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() var inputRoomEvent api.InputRoomEvent if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { _ = msg.Nak() return } - inbox, _ := r.workers.LoadOrStore(msg.Header.Get("room_id"), &phony.Inbox{}) + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) inbox.(*phony.Inbox).Act(nil, func() { if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) @@ -90,7 +92,8 @@ func (r *Inputer) InputRoomEvents( Subject: r.InputRoomEventTopic, Header: nats.Header{}, } - msg.Header.Set("room_id", e.Event.RoomID()) + roomID := e.Event.RoomID() + msg.Header.Set("room_id", roomID) msg.Data, err = json.Marshal(e) if err != nil { response.ErrMsg = err.Error() @@ -100,6 +103,7 @@ func (r *Inputer) InputRoomEvents( response.ErrMsg = err.Error() return } + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() } }