mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-26 06:58:27 +00:00
Tweak metrics
This commit is contained in:
parent
5e9f6eb909
commit
42a250da15
1 changed files with 6 additions and 3 deletions
|
@ -59,7 +59,7 @@ func (r *Inputer) Start() error {
|
||||||
// later, possibly with an error response to the inputter if synchronous.
|
// later, possibly with an error response to the inputter if synchronous.
|
||||||
func(msg *nats.Msg) {
|
func(msg *nats.Msg) {
|
||||||
roomID := msg.Header.Get("room_id")
|
roomID := msg.Header.Get("room_id")
|
||||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||||
var inputRoomEvent api.InputRoomEvent
|
var inputRoomEvent api.InputRoomEvent
|
||||||
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
|
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
|
||||||
_ = msg.Term()
|
_ = msg.Term()
|
||||||
|
@ -67,6 +67,7 @@ func (r *Inputer) Start() error {
|
||||||
}
|
}
|
||||||
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
||||||
inbox.(*phony.Inbox).Act(nil, func() {
|
inbox.(*phony.Inbox).Act(nil, func() {
|
||||||
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
|
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -111,15 +112,17 @@ func (r *Inputer) InputRoomEvents(
|
||||||
if _, err = r.JetStream.PublishMsg(msg); err != nil {
|
if _, err = r.JetStream.PublishMsg(msg); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
responses := make(chan error, len(request.InputRoomEvents))
|
responses := make(chan error, len(request.InputRoomEvents))
|
||||||
defer close(responses)
|
defer close(responses)
|
||||||
for _, e := range request.InputRoomEvents {
|
for _, e := range request.InputRoomEvents {
|
||||||
inputRoomEvent := e
|
inputRoomEvent := e
|
||||||
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
|
roomID := inputRoomEvent.Event.RoomID()
|
||||||
|
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
||||||
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||||
inbox.(*phony.Inbox).Act(nil, func() {
|
inbox.(*phony.Inbox).Act(nil, func() {
|
||||||
|
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||||
err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
|
err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
|
|
Loading…
Reference in a new issue