From 8980a29b0d2e7a5eef6c974a1e5d87f0c51aa3ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Jan 2018 15:01:37 +0000 Subject: [PATCH] Don't wait on handling unsent events during startup --- .../matrix-org/dendrite/common/linearizer.go | 4 ++-- .../matrix-org/dendrite/common/linearizer_test.go | 6 +++--- .../dendrite/roomserver/input/latest_events.go | 12 ++++++++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/common/linearizer.go b/src/github.com/matrix-org/dendrite/common/linearizer.go index 53c44dd7..22b405cc 100644 --- a/src/github.com/matrix-org/dendrite/common/linearizer.go +++ b/src/github.com/matrix-org/dendrite/common/linearizer.go @@ -43,14 +43,14 @@ func (l *Linearizer) Await(key string, callback func()) { // testing as any functions scheduled after hook has been closed are guaranteed // to be run after this callback has finished. If hook is nil then it is // ignored. -func (l *Linearizer) AwaitWithHook(key string, callback func(), hook chan<- struct{}) { +func (l *Linearizer) AwaitWithHook(key string, callback func(), hook func()) { closeChannel := make(chan struct{}) defer close(closeChannel) awaitChannel := l.getAndSetLastMutex(key, closeChannel) if hook != nil { - close(hook) + hook() } if awaitChannel != nil { diff --git a/src/github.com/matrix-org/dendrite/common/linearizer_test.go b/src/github.com/matrix-org/dendrite/common/linearizer_test.go index d1ba0ef5..904ea8fd 100644 --- a/src/github.com/matrix-org/dendrite/common/linearizer_test.go +++ b/src/github.com/matrix-org/dendrite/common/linearizer_test.go @@ -96,7 +96,7 @@ func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo if numThirdCalls != 0 { t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls) } - }, setupAwait2) + }, func() { close(setupAwait2) }) t.Log("Finished waiting on w1") waitGroup.Done() @@ -118,7 +118,7 @@ func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo if numThirdCalls != 0 { t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls) } - }, setupAwait3) + }, func() { close(setupAwait3) }) t.Log("Finished waiting on w2") @@ -141,7 +141,7 @@ func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo if numThirdCalls != 1 { t.Fatalf("Expected third function to be called once, called %d times", numThirdCalls) } - }, startSignal) + }, func() { close(startSignal) }) t.Log("Finished waiting on w3") waitGroup.Done() diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index dab3a3b3..e43e5d5c 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -17,6 +17,7 @@ package input import ( "bytes" "context" + "sync" "github.com/pkg/errors" @@ -68,8 +69,13 @@ func (e *EventSender) Start(ctx context.Context) (err error) { eventToRoomMap[roomNID] = append(eventToRoomMap[roomNID], entries[i]) } + // We only wait until we have registered all the rooms with the linearizer, + // as we don't want to block startup on handling them. + var wg sync.WaitGroup + wg.Add(len(eventToRoomMap)) + for roomNID, roomEntries := range eventToRoomMap { - e.Linearizer.Await(roomEntries[0].Event.RoomID(), func() { + e.Linearizer.AwaitWithHook(roomEntries[0].Event.RoomID(), func() { for i := range roomEntries { err = updateLatestEvents( ctx, e.DB, e.OutputWriter, roomNID, @@ -82,9 +88,11 @@ func (e *EventSender) Start(ctx context.Context) (err error) { return } } - }) + }, wg.Done) } + wg.Wait() + return }