mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 23:48:27 +00:00
Merge branch 'neilalexander/rsinputfifo' into neilalexander/rsinputfifo2
This commit is contained in:
commit
8866120ebf
1 changed files with 13 additions and 9 deletions
|
@ -5,7 +5,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type fifoQueue struct {
|
type fifoQueue struct {
|
||||||
frames []*inputTask
|
tasks []*inputTask
|
||||||
count int
|
count int
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
notifs chan struct{}
|
notifs chan struct{}
|
||||||
|
@ -18,40 +18,44 @@ func newFIFOQueue() *fifoQueue {
|
||||||
return q
|
return q
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *fifoQueue) push(frame *inputTask) bool {
|
func (q *fifoQueue) push(frame *inputTask) {
|
||||||
q.mutex.Lock()
|
q.mutex.Lock()
|
||||||
defer q.mutex.Unlock()
|
defer q.mutex.Unlock()
|
||||||
q.frames = append(q.frames, frame)
|
q.tasks = append(q.tasks, frame)
|
||||||
q.count++
|
q.count++
|
||||||
select {
|
select {
|
||||||
case q.notifs <- struct{}{}:
|
case q.notifs <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pop returns the first item of the queue, if there is one.
|
||||||
|
// The second return value will indicate if a task was returned.
|
||||||
|
// You must check this value, even after calling wait().
|
||||||
func (q *fifoQueue) pop() (*inputTask, bool) {
|
func (q *fifoQueue) pop() (*inputTask, bool) {
|
||||||
q.mutex.Lock()
|
q.mutex.Lock()
|
||||||
defer q.mutex.Unlock()
|
defer q.mutex.Unlock()
|
||||||
if q.count == 0 {
|
if q.count == 0 {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
frame := q.frames[0]
|
frame := q.tasks[0]
|
||||||
q.frames[0] = nil
|
q.tasks[0] = nil
|
||||||
q.frames = q.frames[1:]
|
q.tasks = q.tasks[1:]
|
||||||
q.count--
|
q.count--
|
||||||
if q.count == 0 {
|
if q.count == 0 {
|
||||||
// Force a GC of the underlying array, since it might have
|
// Force a GC of the underlying array, since it might have
|
||||||
// grown significantly if the queue was hammered for some reason
|
// grown significantly if the queue was hammered for some reason
|
||||||
q.frames = nil
|
q.tasks = nil
|
||||||
}
|
}
|
||||||
return frame, true
|
return frame, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait returns a channel which can be used to detect when an
|
||||||
|
// item is waiting in the queue.
|
||||||
func (q *fifoQueue) wait() <-chan struct{} {
|
func (q *fifoQueue) wait() <-chan struct{} {
|
||||||
q.mutex.Lock()
|
q.mutex.Lock()
|
||||||
defer q.mutex.Unlock()
|
defer q.mutex.Unlock()
|
||||||
if q.count > 0 {
|
if q.count > 0 && len(q.notifs) == 0 {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
close(ch)
|
close(ch)
|
||||||
return ch
|
return ch
|
||||||
|
|
Loading…
Reference in a new issue