From 25f9fda0bb5b91577af55de256b5af0d01663e7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Dec 2017 15:42:34 +0000 Subject: [PATCH] Add a Linearizer impl --- .../matrix-org/dendrite/common/linearizer.go | 96 +++++++++ .../dendrite/common/linearizer_test.go | 196 ++++++++++++++++++ 2 files changed, 292 insertions(+) create mode 100644 src/github.com/matrix-org/dendrite/common/linearizer.go create mode 100644 src/github.com/matrix-org/dendrite/common/linearizer_test.go diff --git a/src/github.com/matrix-org/dendrite/common/linearizer.go b/src/github.com/matrix-org/dendrite/common/linearizer.go new file mode 100644 index 00000000..53c44dd7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/linearizer.go @@ -0,0 +1,96 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import "sync" + +// Linearizer allows different goroutines to serialize execution of functions +// based on a string key. +type Linearizer struct { + // protects lastMutex + mutex sync.Mutex + // + lastMutex map[string]<-chan struct{} +} + +// NewLinearizer creates a new Linearizer +func NewLinearizer() Linearizer { + return Linearizer{ + lastMutex: make(map[string]<-chan struct{}), + } +} + +// Await schedules the callback to run once all previous callbacks for the given +// key have finished executing, returning once callback has completed. +func (l *Linearizer) Await(key string, callback func()) { + l.AwaitWithHook(key, callback, nil) +} + +// AwaitWithHook is the same as Await, but with an added hook channel gets +// closed once the callback has been scheduled. This is mainly useful for +// testing as any functions scheduled after hook has been closed are guaranteed +// to be run after this callback has finished. If hook is nil then it is +// ignored. +func (l *Linearizer) AwaitWithHook(key string, callback func(), hook chan<- struct{}) { + closeChannel := make(chan struct{}) + defer close(closeChannel) + + awaitChannel := l.getAndSetLastMutex(key, closeChannel) + + if hook != nil { + close(hook) + } + + if awaitChannel != nil { + <-awaitChannel + } + + callback() + + l.cleanupKey(key, closeChannel) +} + +// NumberOfActiveKeys returns the number of keys that have callbacks currently +// scheduled to be run (or are running). Used mostly in tests to ensure that map +// entries are cleaned up. +func (l *Linearizer) NumberOfActiveKeys() int { + return len(l.lastMutex) +} + +// getAndSetLastMutex replaces the current entry in lastMutex with the given +// channel, while locking the mutex. The existing entry is returned if it +// exists, otherwise returns nil. +func (l *Linearizer) getAndSetLastMutex(key string, closeChannel <-chan struct{}) <-chan struct{} { + l.mutex.Lock() + defer l.mutex.Unlock() + + awaitChannel := l.lastMutex[key] + l.lastMutex[key] = closeChannel + + return awaitChannel +} + +// cleanupKey deletes the entry in the lastMutex map if the value of key in the +// map is currentChannel. If they match then its safe to delete because all +// scheduled callbacks have been run for that key. +func (l *Linearizer) cleanupKey(key string, currentChannel <-chan struct{}) { + l.mutex.Lock() + defer l.mutex.Unlock() + + entry, ok := l.lastMutex[key] + if ok && entry == currentChannel { + delete(l.lastMutex, key) + } +} diff --git a/src/github.com/matrix-org/dendrite/common/linearizer_test.go b/src/github.com/matrix-org/dendrite/common/linearizer_test.go new file mode 100644 index 00000000..d1ba0ef5 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/linearizer_test.go @@ -0,0 +1,196 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "sync" + "testing" +) + +func TestSimpleLinearizer(t *testing.T) { + l := NewLinearizer() + + numCalls := 0 + l.Await("foo", func() { + numCalls++ + }) + + if numCalls != 1 { + t.Fatalf("Expected function to be called once, called %d times", numCalls) + } + + activeKeys := l.NumberOfActiveKeys() + if activeKeys != 0 { + t.Fatalf("Expected no active keys, got %d active keys", activeKeys) + } +} + +func TestMultipleAfterLinearizer(t *testing.T) { + l := NewLinearizer() + + numFirstCalls := 0 + l.Await("foo", func() { + numFirstCalls++ + }) + + numSecondCalls := 0 + l.Await("foo", func() { + numSecondCalls++ + }) + + if numFirstCalls != 1 { + t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls) + } + + if numSecondCalls != 1 { + t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls) + } + + activeKeys := l.NumberOfActiveKeys() + if activeKeys != 0 { + t.Fatalf("Expected no active keys, got %d active keys", activeKeys) + } +} + +func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo + l := NewLinearizer() + + waitGroup := sync.WaitGroup{} + waitGroup.Add(3) + + numFirstCalls := 0 + numSecondCalls := 0 + numThirdCalls := 0 + + startSignal := make(chan struct{}) + + setupAwait2 := make(chan struct{}) + setupAwait3 := make(chan struct{}) + + go func() { + l.AwaitWithHook("foo", func() { + <-startSignal + + numFirstCalls++ + + if numFirstCalls != 1 { + t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls) + } + + if numSecondCalls != 0 { + t.Fatalf("Expected second function to not be called, called %d times", numSecondCalls) + } + + if numThirdCalls != 0 { + t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls) + } + }, setupAwait2) + + t.Log("Finished waiting on w1") + waitGroup.Done() + }() + + go func() { + <-setupAwait2 + l.AwaitWithHook("foo", func() { + numSecondCalls++ + + if numFirstCalls != 1 { + t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls) + } + + if numSecondCalls != 1 { + t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls) + } + + if numThirdCalls != 0 { + t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls) + } + }, setupAwait3) + + t.Log("Finished waiting on w2") + + waitGroup.Done() + }() + + go func() { + <-setupAwait3 + l.AwaitWithHook("foo", func() { + numThirdCalls++ + + if numFirstCalls != 1 { + t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls) + } + + if numSecondCalls != 1 { + t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls) + } + + if numThirdCalls != 1 { + t.Fatalf("Expected third function to be called once, called %d times", numThirdCalls) + } + }, startSignal) + + t.Log("Finished waiting on w3") + waitGroup.Done() + }() + + waitGroup.Wait() + + activeKeys := l.NumberOfActiveKeys() + if activeKeys != 0 { + t.Fatalf("Expected no active keys, got %d active keys", activeKeys) + } +} + +func TestDifferentKeysLiniearizer(t *testing.T) { + l := NewLinearizer() + + waitChan := make(chan struct{}) + + numFirstCalls := 0 + go l.Await("foo", func() { + <-waitChan + numFirstCalls++ + }) + + numSecondCalls := 0 + l.Await("bar", func() { + numSecondCalls++ + }) + + if numFirstCalls != 0 { + t.Fatalf("Expected first function to not be called, called %d times", numFirstCalls) + } + + if numSecondCalls != 1 { + t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls) + } + + waitChan <- struct{}{} + + if numFirstCalls != 1 { + t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls) + } + + if numSecondCalls != 1 { + t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls) + } + + activeKeys := l.NumberOfActiveKeys() + if activeKeys != 0 { + t.Fatalf("Expected no active keys, got %d active keys", activeKeys) + } +}