Don't wait on handling unsent events during startup

This commit is contained in:
Erik Johnston 2018-01-03 15:01:37 +00:00
parent 546afcc519
commit 8980a29b0d
3 changed files with 15 additions and 7 deletions

View file

@ -43,14 +43,14 @@ func (l *Linearizer) Await(key string, callback func()) {
// testing as any functions scheduled after hook has been closed are guaranteed // testing as any functions scheduled after hook has been closed are guaranteed
// to be run after this callback has finished. If hook is nil then it is // to be run after this callback has finished. If hook is nil then it is
// ignored. // ignored.
func (l *Linearizer) AwaitWithHook(key string, callback func(), hook chan<- struct{}) { func (l *Linearizer) AwaitWithHook(key string, callback func(), hook func()) {
closeChannel := make(chan struct{}) closeChannel := make(chan struct{})
defer close(closeChannel) defer close(closeChannel)
awaitChannel := l.getAndSetLastMutex(key, closeChannel) awaitChannel := l.getAndSetLastMutex(key, closeChannel)
if hook != nil { if hook != nil {
close(hook) hook()
} }
if awaitChannel != nil { if awaitChannel != nil {

View file

@ -96,7 +96,7 @@ func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo
if numThirdCalls != 0 { if numThirdCalls != 0 {
t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls) t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls)
} }
}, setupAwait2) }, func() { close(setupAwait2) })
t.Log("Finished waiting on w1") t.Log("Finished waiting on w1")
waitGroup.Done() waitGroup.Done()
@ -118,7 +118,7 @@ func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo
if numThirdCalls != 0 { if numThirdCalls != 0 {
t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls) t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls)
} }
}, setupAwait3) }, func() { close(setupAwait3) })
t.Log("Finished waiting on w2") t.Log("Finished waiting on w2")
@ -141,7 +141,7 @@ func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo
if numThirdCalls != 1 { if numThirdCalls != 1 {
t.Fatalf("Expected third function to be called once, called %d times", numThirdCalls) t.Fatalf("Expected third function to be called once, called %d times", numThirdCalls)
} }
}, startSignal) }, func() { close(startSignal) })
t.Log("Finished waiting on w3") t.Log("Finished waiting on w3")
waitGroup.Done() waitGroup.Done()

View file

@ -17,6 +17,7 @@ package input
import ( import (
"bytes" "bytes"
"context" "context"
"sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -68,8 +69,13 @@ func (e *EventSender) Start(ctx context.Context) (err error) {
eventToRoomMap[roomNID] = append(eventToRoomMap[roomNID], entries[i]) eventToRoomMap[roomNID] = append(eventToRoomMap[roomNID], entries[i])
} }
// We only wait until we have registered all the rooms with the linearizer,
// as we don't want to block startup on handling them.
var wg sync.WaitGroup
wg.Add(len(eventToRoomMap))
for roomNID, roomEntries := range eventToRoomMap { for roomNID, roomEntries := range eventToRoomMap {
e.Linearizer.Await(roomEntries[0].Event.RoomID(), func() { e.Linearizer.AwaitWithHook(roomEntries[0].Event.RoomID(), func() {
for i := range roomEntries { for i := range roomEntries {
err = updateLatestEvents( err = updateLatestEvents(
ctx, e.DB, e.OutputWriter, roomNID, ctx, e.DB, e.OutputWriter, roomNID,
@ -82,9 +88,11 @@ func (e *EventSender) Start(ctx context.Context) (err error) {
return return
} }
} }
}) }, wg.Done)
} }
wg.Wait()
return return
} }