Increase parallelism further

This commit is contained in:
Neil Alexander 2021-01-25 17:13:36 +00:00
parent 5223d02709
commit 19e8c662dc
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 9 additions and 8 deletions

View file

@ -62,7 +62,7 @@ func (w *inputWorker) start() {
select { select {
case task := <-w.input: case task := <-w.input:
hooks.Run(hooks.KindNewEventReceived, task.event.Event) hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event) _, task.err = w.r.processRoomEvent(task.ctx, task.event, task.wg)
if task.err == nil { if task.err == nil {
hooks.Run(hooks.KindNewEventPersisted, task.event.Event) hooks.Run(hooks.KindNewEventPersisted, task.event.Event)
} }

View file

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/eventutil"
@ -62,6 +63,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
func (r *Inputer) processRoomEvent( func (r *Inputer) processRoomEvent(
ctx context.Context, ctx context.Context,
input *api.InputRoomEvent, input *api.InputRoomEvent,
wg *sync.WaitGroup,
) (eventID string, err error) { ) (eventID string, err error) {
// Measure how long it takes to process this event. // Measure how long it takes to process this event.
started := time.Now() started := time.Now()
@ -192,9 +194,10 @@ func (r *Inputer) processRoomEvent(
switch input.Kind { switch input.Kind {
case api.KindNew: case api.KindNew:
errch := make(chan error) wg.Add(1)
go func() { go func() {
errch <- r.updateLatestEvents( defer wg.Done()
if err = r.updateLatestEvents(
ctx, // context ctx, // context
roomInfo, // room info for the room being updated roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below) stateAtEvent, // state at event (below)
@ -202,12 +205,10 @@ func (r *Inputer) processRoomEvent(
input.SendAsServer, // send as server input.SendAsServer, // send as server
input.TransactionID, // transaction ID input.TransactionID, // transaction ID
input.HasState, // rewrites state? input.HasState, // rewrites state?
) ); err != nil {
close(errch) logrus.WithError(err).Error("r.updateLatestEvents failed")
}
}() }()
if err = <-errch; err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld: case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
{ {