mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Yggdrasil+QUIC demo, federation sender tweaks (#1177)
* Initial QUIC work * Update Yggdrasil demo * Make sure that the federation sender knows how many pending events are in the database when the worker starts * QUIC tunables * pprof * Don't spin * Set build info for Yggdrasil
This commit is contained in:
parent
9c1f38621c
commit
38caf8e5b7
13 changed files with 406 additions and 141 deletions
|
@ -52,7 +52,7 @@ type destinationQueue struct {
|
|||
transactionIDMutex sync.Mutex // protects transactionID
|
||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||
pendingPDUs atomic.Int32 // how many PDUs are waiting to be sent
|
||||
pendingPDUs atomic.Int64 // how many PDUs are waiting to be sent
|
||||
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
|
||||
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
||||
wakeServerCh chan bool // interrupts idle wait
|
||||
|
@ -91,6 +91,7 @@ func (oq *destinationQueue) sendEvent(nid int64) {
|
|||
// If the destination is blacklisted then drop the event.
|
||||
return
|
||||
}
|
||||
oq.wakeQueueIfNeeded()
|
||||
// Create a transaction ID. We'll either do this if we don't have
|
||||
// one made up yet, or if we've exceeded the number of maximum
|
||||
// events allowed in a single tranaction. We'll reset the counter
|
||||
|
@ -117,10 +118,6 @@ func (oq *destinationQueue) sendEvent(nid int64) {
|
|||
// We've successfully added a PDU to the transaction so increase
|
||||
// the counter.
|
||||
oq.transactionCount.Add(1)
|
||||
// If the queue isn't running at this point then start it.
|
||||
if !oq.running.Load() {
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
// Signal that we've sent a new PDU. This will cause the queue to
|
||||
// wake up if it's asleep. The return to the Add function will only
|
||||
// be 1 if the previous value was 0, e.g. nothing was waiting before.
|
||||
|
@ -137,9 +134,7 @@ func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
|
|||
// If the destination is blacklisted then drop the event.
|
||||
return
|
||||
}
|
||||
if !oq.running.Load() {
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
oq.wakeQueueIfNeeded()
|
||||
oq.incomingEDUs <- ev
|
||||
}
|
||||
|
||||
|
@ -151,10 +146,30 @@ func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
|
|||
// If the destination is blacklisted then drop the event.
|
||||
return
|
||||
}
|
||||
oq.wakeQueueIfNeeded()
|
||||
oq.incomingInvites <- ev
|
||||
}
|
||||
|
||||
func (oq *destinationQueue) wakeQueueIfNeeded() {
|
||||
if !oq.running.Load() {
|
||||
// Look up how many events are pending in this queue. We need
|
||||
// to do this so that the queue thinks it has work to do.
|
||||
count, err := oq.db.GetPendingPDUCount(
|
||||
context.TODO(),
|
||||
oq.destination,
|
||||
)
|
||||
if err == nil {
|
||||
oq.pendingPDUs.Store(count)
|
||||
log.Printf("Destination queue %q has %d pending PDUs", oq.destination, count)
|
||||
} else {
|
||||
log.WithError(err).Errorf("Can't get pending PDU count for %q destination queue", oq.destination)
|
||||
}
|
||||
if count > 0 {
|
||||
oq.wakeServerCh <- true
|
||||
}
|
||||
// Then start the queue.
|
||||
go oq.backgroundSend()
|
||||
}
|
||||
oq.incomingInvites <- ev
|
||||
}
|
||||
|
||||
// backgroundSend is the worker goroutine for sending events.
|
||||
|
@ -170,46 +185,44 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
for {
|
||||
// If we have nothing to do then wait either for incoming events, or
|
||||
// until we hit an idle timeout.
|
||||
if oq.pendingPDUs.Load() == 0 && len(oq.pendingEDUs) == 0 && len(oq.pendingInvites) == 0 {
|
||||
select {
|
||||
case <-oq.wakeServerCh:
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
case edu := <-oq.incomingEDUs:
|
||||
// EDUs are handled in-memory for now. We will try to keep
|
||||
// the ordering intact.
|
||||
// TODO: Certain EDU types need persistence, e.g. send-to-device
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||
// If there are any more things waiting in the channel queue
|
||||
// then read them. This is safe because we guarantee only
|
||||
// having one goroutine per destination queue, so the channel
|
||||
// isn't being consumed anywhere else.
|
||||
for len(oq.incomingEDUs) > 0 {
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
|
||||
}
|
||||
case invite := <-oq.incomingInvites:
|
||||
// There's no strict ordering requirement for invites like
|
||||
// there is for transactions, so we put the invite onto the
|
||||
// front of the queue. This means that if an invite that is
|
||||
// stuck failing already, that it won't block our new invite
|
||||
// from being sent.
|
||||
oq.pendingInvites = append(
|
||||
[]*gomatrixserverlib.InviteV2Request{invite},
|
||||
oq.pendingInvites...,
|
||||
)
|
||||
// If there are any more things waiting in the channel queue
|
||||
// then read them. This is safe because we guarantee only
|
||||
// having one goroutine per destination queue, so the channel
|
||||
// isn't being consumed anywhere else.
|
||||
for len(oq.incomingInvites) > 0 {
|
||||
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
||||
}
|
||||
case <-time.After(time.Second * 30):
|
||||
// The worker is idle so stop the goroutine. It'll get
|
||||
// restarted automatically the next time we have an event to
|
||||
// send.
|
||||
return
|
||||
select {
|
||||
case <-oq.wakeServerCh:
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
case edu := <-oq.incomingEDUs:
|
||||
// EDUs are handled in-memory for now. We will try to keep
|
||||
// the ordering intact.
|
||||
// TODO: Certain EDU types need persistence, e.g. send-to-device
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, edu)
|
||||
// If there are any more things waiting in the channel queue
|
||||
// then read them. This is safe because we guarantee only
|
||||
// having one goroutine per destination queue, so the channel
|
||||
// isn't being consumed anywhere else.
|
||||
for len(oq.incomingEDUs) > 0 {
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
|
||||
}
|
||||
case invite := <-oq.incomingInvites:
|
||||
// There's no strict ordering requirement for invites like
|
||||
// there is for transactions, so we put the invite onto the
|
||||
// front of the queue. This means that if an invite that is
|
||||
// stuck failing already, that it won't block our new invite
|
||||
// from being sent.
|
||||
oq.pendingInvites = append(
|
||||
[]*gomatrixserverlib.InviteV2Request{invite},
|
||||
oq.pendingInvites...,
|
||||
)
|
||||
// If there are any more things waiting in the channel queue
|
||||
// then read them. This is safe because we guarantee only
|
||||
// having one goroutine per destination queue, so the channel
|
||||
// isn't being consumed anywhere else.
|
||||
for len(oq.incomingInvites) > 0 {
|
||||
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
||||
}
|
||||
case <-time.After(time.Second * 30):
|
||||
// The worker is idle so stop the goroutine. It'll get
|
||||
// restarted automatically the next time we have an event to
|
||||
// send.
|
||||
return
|
||||
}
|
||||
|
||||
// If we are backing off this server then wait for the
|
||||
|
@ -317,8 +330,10 @@ func (oq *destinationQueue) nextTransaction(
|
|||
// Ask the database for any pending PDUs from the next transaction.
|
||||
// maxPDUsPerTransaction is an upper limit but we probably won't
|
||||
// actually retrieve that many events.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
txid, pdus, err := oq.db.GetNextTransactionPDUs(
|
||||
context.TODO(), // context
|
||||
ctx, // context
|
||||
oq.destination, // server name
|
||||
maxPDUsPerTransaction, // max events to retrieve
|
||||
)
|
||||
|
@ -366,7 +381,7 @@ func (oq *destinationQueue) nextTransaction(
|
|||
case nil:
|
||||
// No error was returned so the transaction looks to have
|
||||
// been successfully sent.
|
||||
oq.pendingPDUs.Sub(int32(len(t.PDUs)))
|
||||
oq.pendingPDUs.Sub(int64(len(t.PDUs)))
|
||||
// Clean up the transaction in the database.
|
||||
if err = oq.db.CleanTransactionPDUs(
|
||||
context.TODO(),
|
||||
|
|
|
@ -30,4 +30,5 @@ type Database interface {
|
|||
AssociatePDUWithDestination(ctx context.Context, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nids []int64) error
|
||||
GetNextTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (gomatrixserverlib.TransactionID, []*gomatrixserverlib.HeaderedEvent, error)
|
||||
CleanTransactionPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID) error
|
||||
GetPendingPDUCount(ctx context.Context, serverName gomatrixserverlib.ServerName) (int64, error)
|
||||
}
|
||||
|
|
|
@ -59,12 +59,17 @@ const selectQueueReferenceJSONCountSQL = "" +
|
|||
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
|
||||
" WHERE json_nid = $1"
|
||||
|
||||
const selectQueuePDUsCountSQL = "" +
|
||||
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
|
||||
" WHERE server_name = $1"
|
||||
|
||||
type queuePDUsStatements struct {
|
||||
insertQueuePDUStmt *sql.Stmt
|
||||
deleteQueueTransactionPDUsStmt *sql.Stmt
|
||||
selectQueueNextTransactionIDStmt *sql.Stmt
|
||||
selectQueuePDUsByTransactionStmt *sql.Stmt
|
||||
selectQueueReferenceJSONCountStmt *sql.Stmt
|
||||
selectQueuePDUsCountStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -87,6 +92,9 @@ func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectQueuePDUsCountStmt, err = db.Prepare(selectQueuePDUsCountSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -144,6 +152,21 @@ func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
|
|||
return count, err
|
||||
}
|
||||
|
||||
func (s *queuePDUsStatements) selectQueuePDUCount(
|
||||
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
|
||||
) (int64, error) {
|
||||
var count int64
|
||||
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt)
|
||||
err := stmt.QueryRowContext(ctx, serverName).Scan(&count)
|
||||
if err == sql.ErrNoRows {
|
||||
// It's acceptable for there to be no rows referencing a given
|
||||
// JSON NID but it's not an error condition. Just return as if
|
||||
// there's a zero count.
|
||||
return 0, nil
|
||||
}
|
||||
return count, err
|
||||
}
|
||||
|
||||
func (s *queuePDUsStatements) selectQueuePDUs(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
|
|
|
@ -255,3 +255,12 @@ func (d *Database) CleanTransactionPDUs(
|
|||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetPendingPDUCount returns the number of PDUs waiting to be
|
||||
// sent for a given servername.
|
||||
func (d *Database) GetPendingPDUCount(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
) (int64, error) {
|
||||
return d.selectQueuePDUCount(ctx, nil, serverName)
|
||||
}
|
||||
|
|
|
@ -60,12 +60,17 @@ const selectQueueReferenceJSONCountSQL = "" +
|
|||
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
|
||||
" WHERE json_nid = $1"
|
||||
|
||||
const selectQueuePDUsCountSQL = "" +
|
||||
"SELECT COUNT(*) FROM federationsender_queue_pdus" +
|
||||
" WHERE server_name = $1"
|
||||
|
||||
type queuePDUsStatements struct {
|
||||
insertQueuePDUStmt *sql.Stmt
|
||||
deleteQueueTransactionPDUsStmt *sql.Stmt
|
||||
selectQueueNextTransactionIDStmt *sql.Stmt
|
||||
selectQueuePDUsByTransactionStmt *sql.Stmt
|
||||
selectQueueReferenceJSONCountStmt *sql.Stmt
|
||||
selectQueuePDUsCountStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -88,6 +93,9 @@ func (s *queuePDUsStatements) prepare(db *sql.DB) (err error) {
|
|||
if s.selectQueueReferenceJSONCountStmt, err = db.Prepare(selectQueueReferenceJSONCountSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectQueuePDUsCountStmt, err = db.Prepare(selectQueuePDUsCountSQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -142,6 +150,21 @@ func (s *queuePDUsStatements) selectQueueReferenceJSONCount(
|
|||
return count, err
|
||||
}
|
||||
|
||||
func (s *queuePDUsStatements) selectQueuePDUCount(
|
||||
ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName,
|
||||
) (int64, error) {
|
||||
var count int64
|
||||
stmt := sqlutil.TxStmt(txn, s.selectQueuePDUsCountStmt)
|
||||
err := stmt.QueryRowContext(ctx, serverName).Scan(&count)
|
||||
if err == sql.ErrNoRows {
|
||||
// It's acceptable for there to be no rows referencing a given
|
||||
// JSON NID but it's not an error condition. Just return as if
|
||||
// there's a zero count.
|
||||
return 0, nil
|
||||
}
|
||||
return count, err
|
||||
}
|
||||
|
||||
func (s *queuePDUsStatements) selectQueuePDUs(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
|
|
|
@ -261,3 +261,12 @@ func (d *Database) CleanTransactionPDUs(
|
|||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetPendingPDUCount returns the number of PDUs waiting to be
|
||||
// sent for a given servername.
|
||||
func (d *Database) GetPendingPDUCount(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
) (int64, error) {
|
||||
return d.selectQueuePDUCount(ctx, nil, serverName)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue