Merge branch 'master' into neilalexander/fsconcurrency

This commit is contained in:
Neil Alexander 2021-07-02 10:10:42 +01:00
commit 55075a0cb5
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
@ -64,6 +65,9 @@ func (w *inputWorker) start() {
if !ok {
continue
}
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": task.event.Event.RoomID(),
}).Dec()
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil {
@ -120,6 +124,20 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
return errs
}
func init() {
prometheus.MustRegister(roomserverInputBackpressure)
}
var roomserverInputBackpressure = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dendrite",
Subsystem: "roomserver",
Name: "input_backpressure",
Help: "How many events are queued for input for a given room",
},
[]string{"room_id"},
)
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
_ context.Context,
@ -164,6 +182,9 @@ func (r *Inputer) InputRoomEvents(
go worker.start()
}
worker.input.push(tasks[i])
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": roomID,
}).Inc()
}
// Wait for all of the workers to return results about our tasks.