From 8e5ad6e554e0d993fb9a0e3c5b7d562bdcf0f223 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 28 Jun 2021 10:51:22 +0100 Subject: [PATCH] Try to increase roomserver parallelisation --- roomserver/internal/input/input.go | 110 +++++------------- .../internal/input/input_latest_events.go | 3 + 2 files changed, 31 insertions(+), 82 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 82ece230..6997cffc 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,17 +19,16 @@ import ( "context" "encoding/json" "sync" - "time" "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" - "go.uber.org/atomic" ) type Inputer struct { @@ -38,41 +37,7 @@ type Inputer struct { ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs OutputRoomEventTopic string - - workers sync.Map // room ID -> *inputWorker -} - -type inputTask struct { - ctx context.Context - event *api.InputRoomEvent - wg *sync.WaitGroup - err error // written back by worker, only safe to read when all tasks are done -} - -type inputWorker struct { - r *Inputer - running atomic.Bool - input chan *inputTask -} - -// Guarded by a CAS on w.running -func (w *inputWorker) start() { - defer w.running.Store(false) - for { - select { - case task := <-w.input: - hooks.Run(hooks.KindNewEventReceived, task.event.Event) - _, task.err = w.r.processRoomEvent(task.ctx, task.event) - if task.err == nil { - hooks.Run(hooks.KindNewEventPersisted, task.event.Event) - } else { - sentry.CaptureException(task.err) - } - task.wg.Done() - case <-time.After(time.Second * 5): - return - } - } + roomMutexes internal.MutexByRoom } // WriteOutputEvents implements OutputRoomEventWriter @@ -123,57 +88,38 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - // Create a wait group. Each task that we dispatch will call Done on - // this wait group so that we know when all of our events have been - // processed. + ctx := context.Background() + wg := &sync.WaitGroup{} + errs := make(chan error) wg.Add(len(request.InputRoomEvents)) - tasks := make([]*inputTask, len(request.InputRoomEvents)) + go func() { + wg.Wait() + close(errs) + }() - for i, e := range request.InputRoomEvents { - // Work out if we are running per-room workers or if we're just doing - // it on a global basis (e.g. SQLite). - roomID := "global" - if r.DB.SupportsConcurrentRoomInputs() { - roomID = e.Event.RoomID() - } - - // Look up the worker, or create it if it doesn't exist. This channel - // is buffered to reduce the chance that we'll be blocked by another - // room - the channel will be quite small as it's just pointer types. - w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ - r: r, - input: make(chan *inputTask, 32), - }) - worker := w.(*inputWorker) - - // Create a task. This contains the input event and a reference to - // the wait group, so that the worker can notify us when this specific - // task has been finished. - tasks[i] = &inputTask{ - ctx: context.Background(), - event: &request.InputRoomEvents[i], - wg: wg, - } - - // Send the task to the worker. - if worker.running.CAS(false, true) { - go worker.start() - } - worker.input <- tasks[i] + for _, e := range request.InputRoomEvents { + go func(e *api.InputRoomEvent) { + defer wg.Done() + hooks.Run(hooks.KindNewEventReceived, e.Event) + _, err := r.processRoomEvent(ctx, e) + if err == nil { + hooks.Run(hooks.KindNewEventPersisted, e.Event) + } else { + sentry.CaptureException(err) + select { + case errs <- err: + default: + } + } + }(&e) } - // Wait for all of the workers to return results about our tasks. - wg.Wait() - - // If any of the tasks returned an error, we should probably report - // that back to the caller. - for _, task := range tasks { - if task.err != nil { - response.ErrMsg = task.err.Error() - _, rejected := task.err.(*gomatrixserverlib.NotAllowed) + for err := range errs { + if err != nil { + response.ErrMsg = err.Error() + _, rejected := err.(*gomatrixserverlib.NotAllowed) response.NotAllowed = rejected - return } } } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index c9264a27..b60d93a7 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -288,6 +288,9 @@ func (u *latestEventsUpdater) calculateLatest( newEvent *gomatrixserverlib.Event, newStateAndRef types.StateAtEventAndReference, ) (bool, error) { + u.api.roomMutexes.Lock(u.event.EventID()) + defer u.api.roomMutexes.Unlock(u.event.EventID()) + // First of all, get a list of all of the events in our current // set of forward extremities. existingRefs := make(map[string]*types.StateAtEventAndReference)