Try to capture RS input backpressure metric

This commit is contained in:
Neil Alexander 2021-07-02 09:33:33 +01:00
parent 2647f6e9c5
commit a9ddbfaed4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
@ -52,6 +53,7 @@ type inputWorker struct {
r *Inputer r *Inputer
running atomic.Bool running atomic.Bool
input *fifoQueue input *fifoQueue
roomID string
} }
// Guarded by a CAS on w.running // Guarded by a CAS on w.running
@ -64,6 +66,9 @@ func (w *inputWorker) start() {
if !ok { if !ok {
continue continue
} }
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": task.event.Event.RoomID(),
}).Observe(float64(w.input.count))
hooks.Run(hooks.KindNewEventReceived, task.event.Event) hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event) _, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil { if task.err == nil {
@ -120,6 +125,20 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
return errs return errs
} }
func init() {
prometheus.MustRegister(processRoomEventDuration)
}
var roomserverInputBackpressure = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "dendrite",
Subsystem: "roomserver",
Name: "input_backpressure",
Help: "How many events are queued for input for a given room",
},
[]string{"room_id"},
)
// InputRoomEvents implements api.RoomserverInternalAPI // InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents( func (r *Inputer) InputRoomEvents(
_ context.Context, _ context.Context,
@ -164,6 +183,9 @@ func (r *Inputer) InputRoomEvents(
go worker.start() go worker.start()
} }
worker.input.push(tasks[i]) worker.input.push(tasks[i])
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": roomID,
}).Observe(float64(worker.input.count))
} }
// Wait for all of the workers to return results about our tasks. // Wait for all of the workers to return results about our tasks.