diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 404bc742..a1bb63a1 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,7 +19,6 @@ import ( "context" "encoding/json" "sync" - "time" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal/hooks" @@ -28,7 +27,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" - "go.uber.org/atomic" ) type Inputer struct { @@ -38,7 +36,7 @@ type Inputer struct { ACLs *acls.ServerACLs OutputRoomEventTopic string - workers sync.Map // room ID -> *inputWorker + latestEventsMutexes sync.Map // room ID -> sync.Mutex } type inputTask struct { @@ -48,30 +46,6 @@ type inputTask struct { err error // written back by worker, only safe to read when all tasks are done } -type inputWorker struct { - r *Inputer - running atomic.Bool - input chan *inputTask -} - -// Guarded by a CAS on w.running -func (w *inputWorker) start() { - defer w.running.Store(false) - for { - select { - case task := <-w.input: - hooks.Run(hooks.KindNewEventReceived, task.event.Event) - _, task.err = w.r.processRoomEvent(task.ctx, task.event) - if task.err == nil { - hooks.Run(hooks.KindNewEventPersisted, task.event.Event) - } - task.wg.Done() - case <-time.After(time.Second * 5): - return - } - } -} - // WriteOutputEvents implements OutputRoomEventWriter func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { messages := make([]*sarama.ProducerMessage, len(updates)) @@ -127,37 +101,21 @@ func (r *Inputer) InputRoomEvents( wg.Add(len(request.InputRoomEvents)) tasks := make([]*inputTask, len(request.InputRoomEvents)) - for i, e := range request.InputRoomEvents { - // Work out if we are running per-room workers or if we're just doing - // it on a global basis (e.g. SQLite). - roomID := "global" - if r.DB.SupportsConcurrentRoomInputs() { - roomID = e.Event.RoomID() - } - - // Look up the worker, or create it if it doesn't exist. This channel - // is buffered to reduce the chance that we'll be blocked by another - // room - the channel will be quite small as it's just pointer types. - w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ - r: r, - input: make(chan *inputTask, 32), - }) - worker := w.(*inputWorker) - - // Create a task. This contains the input event and a reference to - // the wait group, so that the worker can notify us when this specific - // task has been finished. + for i := range request.InputRoomEvents { + wg.Add(1) tasks[i] = &inputTask{ ctx: context.Background(), event: &request.InputRoomEvents[i], wg: wg, } - - // Send the task to the worker. - if worker.running.CAS(false, true) { - go worker.start() - } - worker.input <- tasks[i] + go func(task *inputTask) { + hooks.Run(hooks.KindNewEventReceived, task.event.Event) + _, task.err = r.processRoomEvent(task.ctx, task.event) + if task.err == nil { + hooks.Run(hooks.KindNewEventPersisted, task.event.Event) + } + task.wg.Done() + }(tasks[i]) } // Wait for all of the workers to return results about our tasks. diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index c9264a27..0551ed52 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -19,6 +19,7 @@ package input import ( "context" "fmt" + "sync" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" @@ -55,6 +56,11 @@ func (r *Inputer) updateLatestEvents( transactionID *api.TransactionID, rewritesState bool, ) (err error) { + v, _ := r.latestEventsMutexes.LoadOrStore(roomInfo.RoomNID, &sync.Mutex{}) + mutex := v.(*sync.Mutex) + mutex.Lock() + defer mutex.Unlock() + updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo) if err != nil { return fmt.Errorf("r.DB.GetLatestEventsForUpdate: %w", err)