mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 21:32:46 +00:00
Federation sender refactor (#1621)
* Refactor federation sender, again * Clean up better * Missing operators * Try to get overflowed events from database * Fix queries * Log less * Comments * nil PDUs/EDUs shouldn't happen but guard against them for safety * Tweak logging * Fix transaction coalescing * Update comments * Check nils more * Remove channels as they add extra complexity and possibly will deadlock * Don't hold lock while sending transaction * Less spam about sleeping queues * Comments * Bug-fixing * Don't try to rehydrate twice * Don't queue in memory for blacklisted destinations * Don't queue in memory for blacklisted destinations * Fix a couple of bugs * Check for duplicates when pulling things out of the database * Durable transactions, some more refactoring * Revert "Durable transactions, some more refactoring" This reverts commit 5daf924eaaefec5e4f7c12c16ca24e898de4adbb. * Fix deadlock
This commit is contained in:
parent
e1d32e2ff1
commit
f64c8822bc
9 changed files with 294 additions and 267 deletions
|
@ -35,6 +35,8 @@ import (
|
|||
const (
|
||||
maxPDUsPerTransaction = 50
|
||||
maxEDUsPerTransaction = 50
|
||||
maxPDUsInMemory = 128
|
||||
maxEDUsInMemory = 128
|
||||
queueIdleTimeout = time.Second * 30
|
||||
)
|
||||
|
||||
|
@ -51,54 +53,56 @@ type destinationQueue struct {
|
|||
destination gomatrixserverlib.ServerName // destination of requests
|
||||
running atomic.Bool // is the queue worker running?
|
||||
backingOff atomic.Bool // true if we're backing off
|
||||
overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
|
||||
statistics *statistics.ServerStatistics // statistics about this remote server
|
||||
transactionIDMutex sync.Mutex // protects transactionID
|
||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||
notifyPDUs chan bool // interrupts idle wait for PDUs
|
||||
notifyEDUs chan bool // interrupts idle wait for EDUs
|
||||
transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful
|
||||
notify chan struct{} // interrupts idle wait pending PDUs/EDUs
|
||||
pendingPDUs []*queuedPDU // PDUs waiting to be sent
|
||||
pendingEDUs []*queuedEDU // EDUs waiting to be sent
|
||||
pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
|
||||
interruptBackoff chan bool // interrupts backoff
|
||||
}
|
||||
|
||||
// Send event adds the event to the pending queue for the destination.
|
||||
// If the queue is empty then it starts a background goroutine to
|
||||
// start sending events to that destination.
|
||||
func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
|
||||
// Create a transaction ID. We'll either do this if we don't have
|
||||
// one made up yet, or if we've exceeded the number of maximum
|
||||
// events allowed in a single tranaction. We'll reset the counter
|
||||
// when we do.
|
||||
oq.transactionIDMutex.Lock()
|
||||
if oq.transactionID == "" || oq.transactionCount.Load() >= maxPDUsPerTransaction {
|
||||
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||
oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
|
||||
oq.transactionCount.Store(0)
|
||||
func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) {
|
||||
if event == nil {
|
||||
log.Errorf("attempt to send nil PDU with destination %q", oq.destination)
|
||||
return
|
||||
}
|
||||
oq.transactionIDMutex.Unlock()
|
||||
// Create a database entry that associates the given PDU NID with
|
||||
// this destination queue. We'll then be able to retrieve the PDU
|
||||
// later.
|
||||
if err := oq.db.AssociatePDUWithDestination(
|
||||
context.TODO(),
|
||||
oq.transactionID, // the current transaction ID
|
||||
oq.destination, // the destination server name
|
||||
receipt, // NIDs from federationsender_queue_json table
|
||||
"", // TODO: remove this, as we don't need to persist the transaction ID
|
||||
oq.destination, // the destination server name
|
||||
receipt, // NIDs from federationsender_queue_json table
|
||||
); err != nil {
|
||||
log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination)
|
||||
log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
|
||||
return
|
||||
}
|
||||
// We've successfully added a PDU to the transaction so increase
|
||||
// the counter.
|
||||
oq.transactionCount.Add(1)
|
||||
// Check if the destination is blacklisted. If it isn't then wake
|
||||
// up the queue.
|
||||
if !oq.statistics.Blacklisted() {
|
||||
// If there's room in memory to hold the event then add it to the
|
||||
// list.
|
||||
oq.pendingMutex.Lock()
|
||||
if len(oq.pendingPDUs) < maxPDUsInMemory {
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
|
||||
pdu: event,
|
||||
receipt: receipt,
|
||||
})
|
||||
} else {
|
||||
oq.overflowed.Store(true)
|
||||
}
|
||||
oq.pendingMutex.Unlock()
|
||||
// Wake up the queue if it's asleep.
|
||||
oq.wakeQueueIfNeeded()
|
||||
// If we're blocking on waiting PDUs then tell the queue that we
|
||||
// have work to do.
|
||||
select {
|
||||
case oq.notifyPDUs <- true:
|
||||
case oq.notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +111,11 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
|
|||
// sendEDU adds the EDU event to the pending queue for the destination.
|
||||
// If the queue is empty then it starts a background goroutine to
|
||||
// start sending events to that destination.
|
||||
func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
|
||||
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) {
|
||||
if event == nil {
|
||||
log.Errorf("attempt to send nil EDU with destination %q", oq.destination)
|
||||
return
|
||||
}
|
||||
// Create a database entry that associates the given PDU NID with
|
||||
// this destination queue. We'll then be able to retrieve the PDU
|
||||
// later.
|
||||
|
@ -116,21 +124,28 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
|
|||
oq.destination, // the destination server name
|
||||
receipt, // NIDs from federationsender_queue_json table
|
||||
); err != nil {
|
||||
log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination)
|
||||
log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
|
||||
return
|
||||
}
|
||||
// We've successfully added an EDU to the transaction so increase
|
||||
// the counter.
|
||||
oq.transactionCount.Add(1)
|
||||
// Check if the destination is blacklisted. If it isn't then wake
|
||||
// up the queue.
|
||||
if !oq.statistics.Blacklisted() {
|
||||
// If there's room in memory to hold the event then add it to the
|
||||
// list.
|
||||
oq.pendingMutex.Lock()
|
||||
if len(oq.pendingEDUs) < maxEDUsInMemory {
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
|
||||
edu: event,
|
||||
receipt: receipt,
|
||||
})
|
||||
} else {
|
||||
oq.overflowed.Store(true)
|
||||
}
|
||||
oq.pendingMutex.Unlock()
|
||||
// Wake up the queue if it's asleep.
|
||||
oq.wakeQueueIfNeeded()
|
||||
// If we're blocking on waiting EDUs then tell the queue that we
|
||||
// have work to do.
|
||||
select {
|
||||
case oq.notifyEDUs <- true:
|
||||
case oq.notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -152,48 +167,71 @@ func (oq *destinationQueue) wakeQueueIfNeeded() {
|
|||
}
|
||||
}
|
||||
|
||||
// waitForPDUs returns a channel for pending PDUs, which will be
|
||||
// used in backgroundSend select. It returns a closed channel if
|
||||
// there is something pending right now, or an open channel if
|
||||
// we're waiting for something.
|
||||
func (oq *destinationQueue) waitForPDUs() chan bool {
|
||||
pendingPDUs, err := oq.db.GetPendingPDUCount(context.TODO(), oq.destination)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to get pending PDU count on queue %q", oq.destination)
|
||||
}
|
||||
// If there are PDUs pending right now then we'll return a closed
|
||||
// channel. This will mean that the backgroundSend will not block.
|
||||
if pendingPDUs > 0 {
|
||||
ch := make(chan bool, 1)
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
// If there are no PDUs pending right now then instead we'll return
|
||||
// the notify channel, so that backgroundSend can pick up normal
|
||||
// notifications from sendEvent.
|
||||
return oq.notifyPDUs
|
||||
}
|
||||
// getPendingFromDatabase will look at the database and see if
|
||||
// there are any persisted events that haven't been sent to this
|
||||
// destination yet. If so, they will be queued up.
|
||||
// nolint:gocyclo
|
||||
func (oq *destinationQueue) getPendingFromDatabase() {
|
||||
// Check to see if there's anything to do for this server
|
||||
// in the database.
|
||||
retrieved := false
|
||||
ctx := context.Background()
|
||||
oq.pendingMutex.Lock()
|
||||
defer oq.pendingMutex.Unlock()
|
||||
|
||||
// waitForEDUs returns a channel for pending EDUs, which will be
|
||||
// used in backgroundSend select. It returns a closed channel if
|
||||
// there is something pending right now, or an open channel if
|
||||
// we're waiting for something.
|
||||
func (oq *destinationQueue) waitForEDUs() chan bool {
|
||||
pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination)
|
||||
// Take a note of all of the PDUs and EDUs that we already
|
||||
// have cached. We will index them based on the receipt,
|
||||
// which ultimately just contains the index of the PDU/EDU
|
||||
// in the database.
|
||||
gotPDUs := map[string]struct{}{}
|
||||
gotEDUs := map[string]struct{}{}
|
||||
for _, pdu := range oq.pendingPDUs {
|
||||
gotPDUs[pdu.receipt.String()] = struct{}{}
|
||||
}
|
||||
// If there are EDUs pending right now then we'll return a closed
|
||||
// channel. This will mean that the backgroundSend will not block.
|
||||
if pendingEDUs > 0 {
|
||||
ch := make(chan bool, 1)
|
||||
close(ch)
|
||||
return ch
|
||||
for _, edu := range oq.pendingEDUs {
|
||||
gotEDUs[edu.receipt.String()] = struct{}{}
|
||||
}
|
||||
|
||||
if pduCapacity := maxPDUsInMemory - len(oq.pendingPDUs); pduCapacity > 0 {
|
||||
// We have room in memory for some PDUs - let's request no more than that.
|
||||
if pdus, err := oq.db.GetPendingPDUs(ctx, oq.destination, pduCapacity); err == nil {
|
||||
for receipt, pdu := range pdus {
|
||||
if _, ok := gotPDUs[receipt.String()]; ok {
|
||||
continue
|
||||
}
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{receipt, pdu})
|
||||
retrieved = true
|
||||
}
|
||||
} else {
|
||||
logrus.WithError(err).Errorf("Failed to get pending PDUs for %q", oq.destination)
|
||||
}
|
||||
}
|
||||
if eduCapacity := maxEDUsInMemory - len(oq.pendingEDUs); eduCapacity > 0 {
|
||||
// We have room in memory for some EDUs - let's request no more than that.
|
||||
if edus, err := oq.db.GetPendingEDUs(ctx, oq.destination, eduCapacity); err == nil {
|
||||
for receipt, edu := range edus {
|
||||
if _, ok := gotEDUs[receipt.String()]; ok {
|
||||
continue
|
||||
}
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{receipt, edu})
|
||||
retrieved = true
|
||||
}
|
||||
} else {
|
||||
logrus.WithError(err).Errorf("Failed to get pending EDUs for %q", oq.destination)
|
||||
}
|
||||
}
|
||||
// If we've retrieved all of the events from the database with room to spare
|
||||
// in memory then we'll no longer consider this queue to be overflowed.
|
||||
if len(oq.pendingPDUs) < maxPDUsInMemory && len(oq.pendingEDUs) < maxEDUsInMemory {
|
||||
oq.overflowed.Store(false)
|
||||
}
|
||||
// If we've retrieved some events then notify the destination queue goroutine.
|
||||
if retrieved {
|
||||
select {
|
||||
case oq.notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
// If there are no EDUs pending right now then instead we'll return
|
||||
// the notify channel, so that backgroundSend can pick up normal
|
||||
// notifications from sendEvent.
|
||||
return oq.notifyEDUs
|
||||
}
|
||||
|
||||
// backgroundSend is the worker goroutine for sending events.
|
||||
|
@ -206,25 +244,28 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
}
|
||||
defer oq.running.Store(false)
|
||||
|
||||
// Mark the queue as overflowed, so we will consult the database
|
||||
// to see if there's anything new to send.
|
||||
oq.overflowed.Store(true)
|
||||
|
||||
for {
|
||||
pendingPDUs, pendingEDUs := false, false
|
||||
// If we are overflowing memory and have sent things out to the
|
||||
// database then we can look up what those things are.
|
||||
if oq.overflowed.Load() {
|
||||
oq.getPendingFromDatabase()
|
||||
}
|
||||
|
||||
// If we have nothing to do then wait either for incoming events, or
|
||||
// until we hit an idle timeout.
|
||||
select {
|
||||
case <-oq.waitForPDUs():
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
pendingPDUs = true
|
||||
case <-oq.waitForEDUs():
|
||||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
pendingEDUs = true
|
||||
case <-oq.notify:
|
||||
// There's work to do, either because getPendingFromDatabase
|
||||
// told us there is, or because a new event has come in via
|
||||
// sendEvent/sendEDU.
|
||||
case <-time.After(queueIdleTimeout):
|
||||
// The worker is idle so stop the goroutine. It'll get
|
||||
// restarted automatically the next time we have an event to
|
||||
// send.
|
||||
log.Tracef("Queue %q has been idle for %s, going to sleep", oq.destination, queueIdleTimeout)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -237,6 +278,16 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||
// buffers at this point. The PDU clean-up is already on a defer.
|
||||
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
||||
oq.pendingMutex.Lock()
|
||||
for i := range oq.pendingPDUs {
|
||||
oq.pendingPDUs[i] = nil
|
||||
}
|
||||
for i := range oq.pendingEDUs {
|
||||
oq.pendingEDUs[i] = nil
|
||||
}
|
||||
oq.pendingPDUs = nil
|
||||
oq.pendingEDUs = nil
|
||||
oq.pendingMutex.Unlock()
|
||||
return
|
||||
}
|
||||
if until != nil && until.After(time.Now()) {
|
||||
|
@ -250,18 +301,41 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
}
|
||||
}
|
||||
|
||||
// Work out which PDUs/EDUs to include in the next transaction.
|
||||
oq.pendingMutex.RLock()
|
||||
pduCount := len(oq.pendingPDUs)
|
||||
eduCount := len(oq.pendingEDUs)
|
||||
if pduCount > maxPDUsPerTransaction {
|
||||
pduCount = maxPDUsPerTransaction
|
||||
}
|
||||
if eduCount > maxEDUsPerTransaction {
|
||||
eduCount = maxEDUsPerTransaction
|
||||
}
|
||||
toSendPDUs := oq.pendingPDUs[:pduCount]
|
||||
toSendEDUs := oq.pendingEDUs[:eduCount]
|
||||
oq.pendingMutex.RUnlock()
|
||||
|
||||
// If we have pending PDUs or EDUs then construct a transaction.
|
||||
if pendingPDUs || pendingEDUs {
|
||||
// Try sending the next transaction and see what happens.
|
||||
transaction, terr := oq.nextTransaction()
|
||||
if terr != nil {
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
oq.statistics.Failure()
|
||||
} else if transaction {
|
||||
// If we successfully sent the transaction then clear out
|
||||
// the pending events and EDUs, and wipe our transaction ID.
|
||||
oq.statistics.Success()
|
||||
// Try sending the next transaction and see what happens.
|
||||
transaction, pc, ec, terr := oq.nextTransaction(toSendPDUs, toSendEDUs)
|
||||
if terr != nil {
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
oq.statistics.Failure()
|
||||
|
||||
} else if transaction {
|
||||
// If we successfully sent the transaction then clear out
|
||||
// the pending events and EDUs, and wipe our transaction ID.
|
||||
oq.statistics.Success()
|
||||
oq.pendingMutex.Lock()
|
||||
for i := range oq.pendingPDUs[:pc] {
|
||||
oq.pendingPDUs[i] = nil
|
||||
}
|
||||
for i := range oq.pendingEDUs[:ec] {
|
||||
oq.pendingEDUs[i] = nil
|
||||
}
|
||||
oq.pendingPDUs = oq.pendingPDUs[pc:]
|
||||
oq.pendingEDUs = oq.pendingEDUs[ec:]
|
||||
oq.pendingMutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -270,16 +344,20 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// queue and sends it. Returns true if a transaction was sent or
|
||||
// false otherwise.
|
||||
// nolint:gocyclo
|
||||
func (oq *destinationQueue) nextTransaction() (bool, error) {
|
||||
// Before we do anything, we need to roll over the transaction
|
||||
// ID that is being used to coalesce events into the next TX.
|
||||
// Otherwise it's possible that we'll pick up an incomplete
|
||||
// transaction and end up nuking the rest of the events at the
|
||||
// cleanup stage.
|
||||
func (oq *destinationQueue) nextTransaction(
|
||||
pdus []*queuedPDU,
|
||||
edus []*queuedEDU,
|
||||
) (bool, int, int, error) {
|
||||
// If there's no projected transaction ID then generate one. If
|
||||
// the transaction succeeds then we'll set it back to "" so that
|
||||
// we generate a new one next time. If it fails, we'll preserve
|
||||
// it so that we retry with the same transaction ID.
|
||||
oq.transactionIDMutex.Lock()
|
||||
oq.transactionID = ""
|
||||
if oq.transactionID == "" {
|
||||
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||
oq.transactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
|
||||
}
|
||||
oq.transactionIDMutex.Unlock()
|
||||
oq.transactionCount.Store(0)
|
||||
|
||||
// Create the transaction.
|
||||
t := gomatrixserverlib.Transaction{
|
||||
|
@ -289,58 +367,36 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
|
|||
t.Origin = oq.origin
|
||||
t.Destination = oq.destination
|
||||
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
|
||||
|
||||
// Ask the database for any pending PDUs from the next transaction.
|
||||
// maxPDUsPerTransaction is an upper limit but we probably won't
|
||||
// actually retrieve that many events.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs(
|
||||
ctx, // context
|
||||
oq.destination, // server name
|
||||
maxPDUsPerTransaction, // max events to retrieve
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to get next transaction PDUs for server %q", oq.destination)
|
||||
return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
|
||||
}
|
||||
|
||||
edus, eduReceipt, err := oq.db.GetNextTransactionEDUs(
|
||||
ctx, // context
|
||||
oq.destination, // server name
|
||||
maxEDUsPerTransaction, // max events to retrieve
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination)
|
||||
return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err)
|
||||
}
|
||||
t.TransactionID = oq.transactionID
|
||||
|
||||
// If we didn't get anything from the database and there are no
|
||||
// pending EDUs then there's nothing to do - stop here.
|
||||
if len(pdus) == 0 && len(edus) == 0 {
|
||||
return false, nil
|
||||
return false, 0, 0, nil
|
||||
}
|
||||
|
||||
// Pick out the transaction ID from the database. If we didn't
|
||||
// get a transaction ID (i.e. because there are no PDUs but only
|
||||
// EDUs) then generate a transaction ID.
|
||||
t.TransactionID = txid
|
||||
if t.TransactionID == "" {
|
||||
now := gomatrixserverlib.AsTimestamp(time.Now())
|
||||
t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d-%d", now, oq.statistics.SuccessCount()))
|
||||
}
|
||||
var pduReceipts []*shared.Receipt
|
||||
var eduReceipts []*shared.Receipt
|
||||
|
||||
// Go through PDUs that we retrieved from the database, if any,
|
||||
// and add them into the transaction.
|
||||
for _, pdu := range pdus {
|
||||
if pdu == nil || pdu.pdu == nil {
|
||||
continue
|
||||
}
|
||||
// Append the JSON of the event, since this is a json.RawMessage type in the
|
||||
// gomatrixserverlib.Transaction struct
|
||||
t.PDUs = append(t.PDUs, (*pdu).JSON())
|
||||
t.PDUs = append(t.PDUs, pdu.pdu.JSON())
|
||||
pduReceipts = append(pduReceipts, pdu.receipt)
|
||||
}
|
||||
|
||||
// Do the same for pending EDUS in the queue.
|
||||
for _, edu := range edus {
|
||||
t.EDUs = append(t.EDUs, *edu)
|
||||
if edu == nil || edu.edu == nil {
|
||||
continue
|
||||
}
|
||||
t.EDUs = append(t.EDUs, *edu.edu)
|
||||
eduReceipts = append(eduReceipts, edu.receipt)
|
||||
}
|
||||
|
||||
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
||||
|
@ -349,34 +405,38 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
|
|||
// TODO: we should check for 500-ish fails vs 400-ish here,
|
||||
// since we shouldn't queue things indefinitely in response
|
||||
// to a 400-ish error
|
||||
ctx, cancel = context.WithTimeout(context.Background(), time.Minute*5)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
||||
defer cancel()
|
||||
_, err = oq.client.SendTransaction(ctx, t)
|
||||
_, err := oq.client.SendTransaction(ctx, t)
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
// Clean up the transaction in the database.
|
||||
if pduReceipt != nil {
|
||||
if pduReceipts != nil {
|
||||
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
|
||||
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil {
|
||||
log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination)
|
||||
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
|
||||
log.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
|
||||
}
|
||||
}
|
||||
if eduReceipt != nil {
|
||||
if eduReceipts != nil {
|
||||
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
|
||||
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil {
|
||||
log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination)
|
||||
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
|
||||
log.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
// Reset the transaction ID.
|
||||
oq.transactionIDMutex.Lock()
|
||||
oq.transactionID = ""
|
||||
oq.transactionIDMutex.Unlock()
|
||||
return true, len(t.PDUs), len(t.EDUs), nil
|
||||
case gomatrix.HTTPError:
|
||||
// Report that we failed to send the transaction and we
|
||||
// will retry again, subject to backoff.
|
||||
return false, err
|
||||
return false, 0, 0, err
|
||||
default:
|
||||
log.WithFields(log.Fields{
|
||||
"destination": oq.destination,
|
||||
log.ErrorKey: err,
|
||||
}).Info("problem sending transaction")
|
||||
return false, err
|
||||
}).Infof("Failed to send transaction %q", t.TransactionID)
|
||||
return false, 0, 0, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/federationsender/statistics"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage"
|
||||
"github.com/matrix-org/dendrite/federationsender/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -83,8 +84,8 @@ func NewOutgoingQueues(
|
|||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||
}
|
||||
for serverName := range serverNames {
|
||||
if !queues.getQueue(serverName).statistics.Blacklisted() {
|
||||
queues.getQueue(serverName).wakeQueueIfNeeded()
|
||||
if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() {
|
||||
queue.wakeQueueIfNeeded()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -100,6 +101,16 @@ type SigningInfo struct {
|
|||
PrivateKey ed25519.PrivateKey
|
||||
}
|
||||
|
||||
type queuedPDU struct {
|
||||
receipt *shared.Receipt
|
||||
pdu *gomatrixserverlib.HeaderedEvent
|
||||
}
|
||||
|
||||
type queuedEDU struct {
|
||||
receipt *shared.Receipt
|
||||
edu *gomatrixserverlib.EDU
|
||||
}
|
||||
|
||||
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
|
||||
oqs.queuesMutex.Lock()
|
||||
defer oqs.queuesMutex.Unlock()
|
||||
|
@ -112,8 +123,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
|||
destination: destination,
|
||||
client: oqs.client,
|
||||
statistics: oqs.statistics.ForServer(destination),
|
||||
notifyPDUs: make(chan bool, 1),
|
||||
notifyEDUs: make(chan bool, 1),
|
||||
notify: make(chan struct{}, 1),
|
||||
interruptBackoff: make(chan bool),
|
||||
signing: oqs.signing,
|
||||
}
|
||||
|
@ -188,7 +198,7 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
}
|
||||
|
||||
for destination := range destmap {
|
||||
oqs.getQueue(destination).sendEvent(nid)
|
||||
oqs.getQueue(destination).sendEvent(ev, nid)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -258,7 +268,7 @@ func (oqs *OutgoingQueues) SendEDU(
|
|||
}
|
||||
|
||||
for destination := range destmap {
|
||||
oqs.getQueue(destination).sendEDU(nid)
|
||||
oqs.getQueue(destination).sendEDU(e, nid)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue