From 8db25eaa65f9449e3a368c12ff6caaa168a8e767 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 20 Jan 2022 14:53:38 +0000 Subject: [PATCH] Add `workerForRoom` for tidiness --- roomserver/internal/input/input.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index bf816f41..837612b8 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -57,6 +57,11 @@ type Inputer struct { Queryer *query.Queryer } +func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + return inbox.(*phony.Inbox) +} + // onMessage is called when a new event arrives in the roomserver input stream. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( @@ -71,9 +76,8 @@ func (r *Inputer) Start() error { _ = msg.Term() return } - inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - inbox.(*phony.Inbox).Act(nil, func() { + r.workerForRoom(roomID).Act(nil, func() { defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { sentry.CaptureException(err) @@ -104,7 +108,7 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - if request.Asynchronous { + if false && request.Asynchronous { var err error for _, e := range request.InputRoomEvents { msg := &nats.Msg{ @@ -128,9 +132,8 @@ func (r *Inputer) InputRoomEvents( for _, e := range request.InputRoomEvents { inputRoomEvent := e roomID := inputRoomEvent.Event.RoomID() - inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() - inbox.(*phony.Inbox).Act(nil, func() { + r.workerForRoom(roomID).Act(nil, func() { defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() err := r.processRoomEvent(ctx, &inputRoomEvent) if err != nil {