Add a Linearizer impl

This commit is contained in:
Erik Johnston 2017-12-19 15:42:34 +00:00
parent 27c335438f
commit 25f9fda0bb
2 changed files with 292 additions and 0 deletions

View file

@ -0,0 +1,96 @@
// Copyright 2017 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import "sync"
// Linearizer allows different goroutines to serialize execution of functions
// based on a string key.
type Linearizer struct {
// protects lastMutex
mutex sync.Mutex
//
lastMutex map[string]<-chan struct{}
}
// NewLinearizer creates a new Linearizer
func NewLinearizer() Linearizer {
return Linearizer{
lastMutex: make(map[string]<-chan struct{}),
}
}
// Await schedules the callback to run once all previous callbacks for the given
// key have finished executing, returning once callback has completed.
func (l *Linearizer) Await(key string, callback func()) {
l.AwaitWithHook(key, callback, nil)
}
// AwaitWithHook is the same as Await, but with an added hook channel gets
// closed once the callback has been scheduled. This is mainly useful for
// testing as any functions scheduled after hook has been closed are guaranteed
// to be run after this callback has finished. If hook is nil then it is
// ignored.
func (l *Linearizer) AwaitWithHook(key string, callback func(), hook chan<- struct{}) {
closeChannel := make(chan struct{})
defer close(closeChannel)
awaitChannel := l.getAndSetLastMutex(key, closeChannel)
if hook != nil {
close(hook)
}
if awaitChannel != nil {
<-awaitChannel
}
callback()
l.cleanupKey(key, closeChannel)
}
// NumberOfActiveKeys returns the number of keys that have callbacks currently
// scheduled to be run (or are running). Used mostly in tests to ensure that map
// entries are cleaned up.
func (l *Linearizer) NumberOfActiveKeys() int {
return len(l.lastMutex)
}
// getAndSetLastMutex replaces the current entry in lastMutex with the given
// channel, while locking the mutex. The existing entry is returned if it
// exists, otherwise returns nil.
func (l *Linearizer) getAndSetLastMutex(key string, closeChannel <-chan struct{}) <-chan struct{} {
l.mutex.Lock()
defer l.mutex.Unlock()
awaitChannel := l.lastMutex[key]
l.lastMutex[key] = closeChannel
return awaitChannel
}
// cleanupKey deletes the entry in the lastMutex map if the value of key in the
// map is currentChannel. If they match then its safe to delete because all
// scheduled callbacks have been run for that key.
func (l *Linearizer) cleanupKey(key string, currentChannel <-chan struct{}) {
l.mutex.Lock()
defer l.mutex.Unlock()
entry, ok := l.lastMutex[key]
if ok && entry == currentChannel {
delete(l.lastMutex, key)
}
}

View file

@ -0,0 +1,196 @@
// Copyright 2017 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"sync"
"testing"
)
func TestSimpleLinearizer(t *testing.T) {
l := NewLinearizer()
numCalls := 0
l.Await("foo", func() {
numCalls++
})
if numCalls != 1 {
t.Fatalf("Expected function to be called once, called %d times", numCalls)
}
activeKeys := l.NumberOfActiveKeys()
if activeKeys != 0 {
t.Fatalf("Expected no active keys, got %d active keys", activeKeys)
}
}
func TestMultipleAfterLinearizer(t *testing.T) {
l := NewLinearizer()
numFirstCalls := 0
l.Await("foo", func() {
numFirstCalls++
})
numSecondCalls := 0
l.Await("foo", func() {
numSecondCalls++
})
if numFirstCalls != 1 {
t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls)
}
if numSecondCalls != 1 {
t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls)
}
activeKeys := l.NumberOfActiveKeys()
if activeKeys != 0 {
t.Fatalf("Expected no active keys, got %d active keys", activeKeys)
}
}
func TestMultipleConcurrentLinearizer(t *testing.T) { // nolint: gocyclo
l := NewLinearizer()
waitGroup := sync.WaitGroup{}
waitGroup.Add(3)
numFirstCalls := 0
numSecondCalls := 0
numThirdCalls := 0
startSignal := make(chan struct{})
setupAwait2 := make(chan struct{})
setupAwait3 := make(chan struct{})
go func() {
l.AwaitWithHook("foo", func() {
<-startSignal
numFirstCalls++
if numFirstCalls != 1 {
t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls)
}
if numSecondCalls != 0 {
t.Fatalf("Expected second function to not be called, called %d times", numSecondCalls)
}
if numThirdCalls != 0 {
t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls)
}
}, setupAwait2)
t.Log("Finished waiting on w1")
waitGroup.Done()
}()
go func() {
<-setupAwait2
l.AwaitWithHook("foo", func() {
numSecondCalls++
if numFirstCalls != 1 {
t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls)
}
if numSecondCalls != 1 {
t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls)
}
if numThirdCalls != 0 {
t.Fatalf("Expected third function to not be called, called %d times", numThirdCalls)
}
}, setupAwait3)
t.Log("Finished waiting on w2")
waitGroup.Done()
}()
go func() {
<-setupAwait3
l.AwaitWithHook("foo", func() {
numThirdCalls++
if numFirstCalls != 1 {
t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls)
}
if numSecondCalls != 1 {
t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls)
}
if numThirdCalls != 1 {
t.Fatalf("Expected third function to be called once, called %d times", numThirdCalls)
}
}, startSignal)
t.Log("Finished waiting on w3")
waitGroup.Done()
}()
waitGroup.Wait()
activeKeys := l.NumberOfActiveKeys()
if activeKeys != 0 {
t.Fatalf("Expected no active keys, got %d active keys", activeKeys)
}
}
func TestDifferentKeysLiniearizer(t *testing.T) {
l := NewLinearizer()
waitChan := make(chan struct{})
numFirstCalls := 0
go l.Await("foo", func() {
<-waitChan
numFirstCalls++
})
numSecondCalls := 0
l.Await("bar", func() {
numSecondCalls++
})
if numFirstCalls != 0 {
t.Fatalf("Expected first function to not be called, called %d times", numFirstCalls)
}
if numSecondCalls != 1 {
t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls)
}
waitChan <- struct{}{}
if numFirstCalls != 1 {
t.Fatalf("Expected first function to be called once, called %d times", numFirstCalls)
}
if numSecondCalls != 1 {
t.Fatalf("Expected second function to be called once, called %d times", numSecondCalls)
}
activeKeys := l.NumberOfActiveKeys()
if activeKeys != 0 {
t.Fatalf("Expected no active keys, got %d active keys", activeKeys)
}
}