Apply backpressure to consumers/synchronous requests to hopefully stop things being overwhelmed

This commit is contained in:
Neil Alexander 2022-01-24 10:37:19 +00:00
parent 5ed90caaff
commit c68037b3e8
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -77,7 +77,8 @@ func (r *Inputer) Start() error {
return return
} }
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
r.workerForRoom(roomID).Act(nil, func() { worker := r.workerForRoom(roomID)
worker.Act(worker, func() {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err) sentry.CaptureException(err)
@ -133,7 +134,8 @@ func (r *Inputer) InputRoomEvents(
inputRoomEvent := e inputRoomEvent := e
roomID := inputRoomEvent.Event.RoomID() roomID := inputRoomEvent.Event.RoomID()
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
r.workerForRoom(roomID).Act(nil, func() { worker := r.workerForRoom(roomID)
worker.Act(worker, func() {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
err := r.processRoomEvent(ctx, &inputRoomEvent) err := r.processRoomEvent(ctx, &inputRoomEvent)
if err != nil { if err != nil {