Use a FIFO queue instead of a channel to reduce backpressure

This commit is contained in:
Neil Alexander 2021-06-28 11:12:58 +01:00
parent a6f7e83596
commit b63f699f1b
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 69 additions and 6 deletions

View file

@ -38,8 +38,7 @@ type Inputer struct {
ServerName gomatrixserverlib.ServerName ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs ACLs *acls.ServerACLs
OutputRoomEventTopic string OutputRoomEventTopic string
workers sync.Map // room ID -> *inputWorker
workers sync.Map // room ID -> *inputWorker
} }
type inputTask struct { type inputTask struct {
@ -52,7 +51,7 @@ type inputTask struct {
type inputWorker struct { type inputWorker struct {
r *Inputer r *Inputer
running atomic.Bool running atomic.Bool
input chan *inputTask input *fifoQueue
} }
// Guarded by a CAS on w.running // Guarded by a CAS on w.running
@ -60,7 +59,11 @@ func (w *inputWorker) start() {
defer w.running.Store(false) defer w.running.Store(false)
for { for {
select { select {
case task := <-w.input: case <-w.input.wait():
task, ok := w.input.pop()
if !ok {
continue
}
hooks.Run(hooks.KindNewEventReceived, task.event.Event) hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event) _, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil { if task.err == nil {
@ -143,7 +146,7 @@ func (r *Inputer) InputRoomEvents(
// room - the channel will be quite small as it's just pointer types. // room - the channel will be quite small as it's just pointer types.
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
r: r, r: r,
input: make(chan *inputTask, 32), input: newFIFOQueue(),
}) })
worker := w.(*inputWorker) worker := w.(*inputWorker)
@ -160,7 +163,7 @@ func (r *Inputer) InputRoomEvents(
if worker.running.CAS(false, true) { if worker.running.CAS(false, true) {
go worker.start() go worker.start()
} }
worker.input <- tasks[i] worker.input.push(tasks[i])
} }
// Wait for all of the workers to return results about our tasks. // Wait for all of the workers to return results about our tasks.

View file

@ -0,0 +1,60 @@
package input
import (
"sync"
)
type fifoQueue struct {
frames []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}
func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}),
}
return q
}
func (q *fifoQueue) push(frame *inputTask) bool {
q.mutex.Lock()
defer q.mutex.Unlock()
q.frames = append(q.frames, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
return true
}
func (q *fifoQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.frames[0]
q.frames[0] = nil
q.frames = q.frames[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.frames = nil
}
return frame, true
}
func (q *fifoQueue) wait() <-chan struct{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count > 0 {
ch := make(chan struct{})
close(ch)
return ch
}
return q.notifs
}