From 19e8c662dc858eb6905fad7093f2e73ba8bddad2 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 25 Jan 2021 17:13:36 +0000 Subject: [PATCH] Increase parallelism further --- roomserver/internal/input/input.go | 2 +- roomserver/internal/input/input_events.go | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index ccca3ef3..b4c19a40 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -62,7 +62,7 @@ func (w *inputWorker) start() { select { case task := <-w.input: hooks.Run(hooks.KindNewEventReceived, task.event.Event) - _, task.err = w.r.processRoomEvent(task.ctx, task.event) + _, task.err = w.r.processRoomEvent(task.ctx, task.event, task.wg) if task.err == nil { hooks.Run(hooks.KindNewEventPersisted, task.event.Event) } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index ff60ec64..334de102 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "fmt" + "sync" "time" "github.com/matrix-org/dendrite/internal/eventutil" @@ -62,6 +63,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec( func (r *Inputer) processRoomEvent( ctx context.Context, input *api.InputRoomEvent, + wg *sync.WaitGroup, ) (eventID string, err error) { // Measure how long it takes to process this event. started := time.Now() @@ -192,9 +194,10 @@ func (r *Inputer) processRoomEvent( switch input.Kind { case api.KindNew: - errch := make(chan error) + wg.Add(1) go func() { - errch <- r.updateLatestEvents( + defer wg.Done() + if err = r.updateLatestEvents( ctx, // context roomInfo, // room info for the room being updated stateAtEvent, // state at event (below) @@ -202,12 +205,10 @@ func (r *Inputer) processRoomEvent( input.SendAsServer, // send as server input.TransactionID, // transaction ID input.HasState, // rewrites state? - ) - close(errch) + ); err != nil { + logrus.WithError(err).Error("r.updateLatestEvents failed") + } }() - if err = <-errch; err != nil { - return "", fmt.Errorf("r.updateLatestEvents: %w", err) - } case api.KindOld: err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ {