2017-06-28 15:10:17 +00:00
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue
import (
2017-09-13 10:03:41 +00:00
"context"
2020-03-27 16:28:22 +00:00
"encoding/json"
2017-06-28 15:10:17 +00:00
"fmt"
2020-07-01 10:46:38 +00:00
"sync"
2017-06-28 15:10:17 +00:00
"time"
2020-07-22 16:01:29 +00:00
"github.com/matrix-org/dendrite/federationsender/statistics"
2020-07-01 10:46:38 +00:00
"github.com/matrix-org/dendrite/federationsender/storage"
2020-07-20 15:55:20 +00:00
"github.com/matrix-org/dendrite/federationsender/storage/shared"
2020-06-10 15:54:43 +00:00
"github.com/matrix-org/dendrite/roomserver/api"
2021-01-26 12:56:20 +00:00
"github.com/matrix-org/dendrite/setup/process"
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
"github.com/matrix-org/gomatrix"
2017-06-28 15:10:17 +00:00
"github.com/matrix-org/gomatrixserverlib"
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
"github.com/sirupsen/logrus"
2017-11-16 10:12:02 +00:00
log "github.com/sirupsen/logrus"
2020-04-03 13:29:06 +00:00
"go.uber.org/atomic"
2017-06-28 15:10:17 +00:00
)
2020-07-20 15:55:20 +00:00
const (
maxPDUsPerTransaction = 50
maxEDUsPerTransaction = 50
2020-12-09 10:03:22 +00:00
maxPDUsInMemory = 128
maxEDUsInMemory = 128
2020-07-20 15:55:20 +00:00
queueIdleTimeout = time . Second * 30
)
2020-07-01 10:46:38 +00:00
2017-06-28 15:10:17 +00:00
// destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
// ensures that only one request is in flight to a given destination
// at a time.
type destinationQueue struct {
2021-02-17 15:16:35 +00:00
queues * OutgoingQueues
2020-07-01 10:46:38 +00:00
db storage . Database
2021-01-26 12:56:20 +00:00
process * process . ProcessContext
2020-06-10 15:54:43 +00:00
signing * SigningInfo
rsAPI api . RoomserverInternalAPI
2020-08-17 10:40:49 +00:00
client * gomatrixserverlib . FederationClient // federation client
origin gomatrixserverlib . ServerName // origin of requests
destination gomatrixserverlib . ServerName // destination of requests
running atomic . Bool // is the queue worker running?
backingOff atomic . Bool // true if we're backing off
2020-12-09 10:03:22 +00:00
overflowed atomic . Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
2020-08-17 10:40:49 +00:00
statistics * statistics . ServerStatistics // statistics about this remote server
transactionIDMutex sync . Mutex // protects transactionID
2020-12-09 10:03:22 +00:00
transactionID gomatrixserverlib . TransactionID // last transaction ID if retrying, or "" if last txn was successful
notify chan struct { } // interrupts idle wait pending PDUs/EDUs
pendingPDUs [ ] * queuedPDU // PDUs waiting to be sent
pendingEDUs [ ] * queuedEDU // EDUs waiting to be sent
pendingMutex sync . RWMutex // protects pendingPDUs and pendingEDUs
2020-08-17 10:40:49 +00:00
interruptBackoff chan bool // interrupts backoff
2017-06-28 15:10:17 +00:00
}
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
2020-12-09 10:03:22 +00:00
func ( oq * destinationQueue ) sendEvent ( event * gomatrixserverlib . HeaderedEvent , receipt * shared . Receipt ) {
if event == nil {
log . Errorf ( "attempt to send nil PDU with destination %q" , oq . destination )
return
2020-07-01 10:46:38 +00:00
}
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
if err := oq . db . AssociatePDUWithDestination (
context . TODO ( ) ,
2020-12-09 10:03:22 +00:00
"" , // TODO: remove this, as we don't need to persist the transaction ID
oq . destination , // the destination server name
receipt , // NIDs from federationsender_queue_json table
2020-07-01 10:46:38 +00:00
) ; err != nil {
2020-12-09 10:03:22 +00:00
log . WithError ( err ) . Errorf ( "failed to associate PDU %q with destination %q" , event . EventID ( ) , oq . destination )
2020-07-01 10:46:38 +00:00
return
}
2020-07-22 16:01:29 +00:00
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if ! oq . statistics . Blacklisted ( ) {
2020-12-09 10:03:22 +00:00
// If there's room in memory to hold the event then add it to the
// list.
oq . pendingMutex . Lock ( )
if len ( oq . pendingPDUs ) < maxPDUsInMemory {
oq . pendingPDUs = append ( oq . pendingPDUs , & queuedPDU {
pdu : event ,
receipt : receipt ,
} )
} else {
oq . overflowed . Store ( true )
}
oq . pendingMutex . Unlock ( )
2020-07-22 16:01:29 +00:00
// Wake up the queue if it's asleep.
oq . wakeQueueIfNeeded ( )
select {
2020-12-09 10:03:22 +00:00
case oq . notify <- struct { } { } :
2020-07-22 16:01:29 +00:00
default :
}
2020-07-01 10:46:38 +00:00
}
2017-06-28 15:10:17 +00:00
}
2018-08-10 15:26:57 +00:00
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
2020-04-03 13:29:06 +00:00
// start sending events to that destination.
2020-12-09 10:03:22 +00:00
func ( oq * destinationQueue ) sendEDU ( event * gomatrixserverlib . EDU , receipt * shared . Receipt ) {
if event == nil {
log . Errorf ( "attempt to send nil EDU with destination %q" , oq . destination )
return
}
2020-07-20 15:55:20 +00:00
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
if err := oq . db . AssociateEDUWithDestination (
context . TODO ( ) ,
oq . destination , // the destination server name
receipt , // NIDs from federationsender_queue_json table
) ; err != nil {
2020-12-09 10:03:22 +00:00
log . WithError ( err ) . Errorf ( "failed to associate EDU with destination %q" , oq . destination )
2020-07-20 15:55:20 +00:00
return
}
2020-07-22 16:01:29 +00:00
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if ! oq . statistics . Blacklisted ( ) {
2020-12-09 10:03:22 +00:00
// If there's room in memory to hold the event then add it to the
// list.
oq . pendingMutex . Lock ( )
if len ( oq . pendingEDUs ) < maxEDUsInMemory {
oq . pendingEDUs = append ( oq . pendingEDUs , & queuedEDU {
edu : event ,
receipt : receipt ,
} )
} else {
oq . overflowed . Store ( true )
}
oq . pendingMutex . Unlock ( )
2020-07-22 16:01:29 +00:00
// Wake up the queue if it's asleep.
oq . wakeQueueIfNeeded ( )
select {
2020-12-09 10:03:22 +00:00
case oq . notify <- struct { } { } :
2020-07-22 16:01:29 +00:00
default :
}
2020-07-20 15:55:20 +00:00
}
2020-04-03 13:29:06 +00:00
}
2020-07-03 15:31:56 +00:00
// wakeQueueIfNeeded will wake up the destination queue if it is
// not already running. If it is running but it is backing off
// then we will interrupt the backoff, causing any federation
// requests to retry.
2020-07-02 16:43:07 +00:00
func ( oq * destinationQueue ) wakeQueueIfNeeded ( ) {
2020-07-03 15:31:56 +00:00
// If we are backing off then interrupt the backoff.
if oq . backingOff . CAS ( true , false ) {
oq . interruptBackoff <- true
}
// If we aren't running then wake up the queue.
2020-04-03 13:29:06 +00:00
if ! oq . running . Load ( ) {
2020-07-09 14:39:35 +00:00
// Start the queue.
2018-08-10 15:26:57 +00:00
go oq . backgroundSend ( )
}
}
2020-12-09 10:03:22 +00:00
// getPendingFromDatabase will look at the database and see if
// there are any persisted events that haven't been sent to this
// destination yet. If so, they will be queued up.
func ( oq * destinationQueue ) getPendingFromDatabase ( ) {
// Check to see if there's anything to do for this server
// in the database.
retrieved := false
ctx := context . Background ( )
oq . pendingMutex . Lock ( )
defer oq . pendingMutex . Unlock ( )
// Take a note of all of the PDUs and EDUs that we already
// have cached. We will index them based on the receipt,
// which ultimately just contains the index of the PDU/EDU
// in the database.
gotPDUs := map [ string ] struct { } { }
gotEDUs := map [ string ] struct { } { }
for _ , pdu := range oq . pendingPDUs {
gotPDUs [ pdu . receipt . String ( ) ] = struct { } { }
2020-07-09 14:39:35 +00:00
}
2020-12-09 10:03:22 +00:00
for _ , edu := range oq . pendingEDUs {
gotEDUs [ edu . receipt . String ( ) ] = struct { } { }
2020-07-09 14:39:35 +00:00
}
2020-12-09 10:03:22 +00:00
if pduCapacity := maxPDUsInMemory - len ( oq . pendingPDUs ) ; pduCapacity > 0 {
// We have room in memory for some PDUs - let's request no more than that.
if pdus , err := oq . db . GetPendingPDUs ( ctx , oq . destination , pduCapacity ) ; err == nil {
for receipt , pdu := range pdus {
if _ , ok := gotPDUs [ receipt . String ( ) ] ; ok {
continue
}
oq . pendingPDUs = append ( oq . pendingPDUs , & queuedPDU { receipt , pdu } )
retrieved = true
}
} else {
logrus . WithError ( err ) . Errorf ( "Failed to get pending PDUs for %q" , oq . destination )
}
2020-07-20 15:55:20 +00:00
}
2020-12-09 10:03:22 +00:00
if eduCapacity := maxEDUsInMemory - len ( oq . pendingEDUs ) ; eduCapacity > 0 {
// We have room in memory for some EDUs - let's request no more than that.
if edus , err := oq . db . GetPendingEDUs ( ctx , oq . destination , eduCapacity ) ; err == nil {
for receipt , edu := range edus {
if _ , ok := gotEDUs [ receipt . String ( ) ] ; ok {
continue
}
oq . pendingEDUs = append ( oq . pendingEDUs , & queuedEDU { receipt , edu } )
retrieved = true
}
} else {
logrus . WithError ( err ) . Errorf ( "Failed to get pending EDUs for %q" , oq . destination )
}
}
// If we've retrieved all of the events from the database with room to spare
// in memory then we'll no longer consider this queue to be overflowed.
if len ( oq . pendingPDUs ) < maxPDUsInMemory && len ( oq . pendingEDUs ) < maxEDUsInMemory {
oq . overflowed . Store ( false )
}
// If we've retrieved some events then notify the destination queue goroutine.
if retrieved {
select {
case oq . notify <- struct { } { } :
default :
}
2020-07-20 15:55:20 +00:00
}
}
2020-04-03 13:29:06 +00:00
// backgroundSend is the worker goroutine for sending events.
2017-06-28 15:10:17 +00:00
func ( oq * destinationQueue ) backgroundSend ( ) {
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
// Check if a worker is already running, and if it isn't, then
// mark it as started.
if ! oq . running . CAS ( false , true ) {
return
}
2020-12-16 15:02:39 +00:00
destinationQueueRunning . Inc ( )
defer destinationQueueRunning . Dec ( )
2021-02-17 15:16:35 +00:00
defer oq . queues . clearQueue ( oq )
2020-04-03 13:29:06 +00:00
defer oq . running . Store ( false )
2020-12-09 10:03:22 +00:00
// Mark the queue as overflowed, so we will consult the database
// to see if there's anything new to send.
oq . overflowed . Store ( true )
2017-06-28 15:10:17 +00:00
for {
2020-12-09 10:03:22 +00:00
// If we are overflowing memory and have sent things out to the
// database then we can look up what those things are.
if oq . overflowed . Load ( ) {
oq . getPendingFromDatabase ( )
}
2020-07-09 14:39:35 +00:00
2020-07-01 10:46:38 +00:00
// If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout.
2020-07-02 16:43:07 +00:00
select {
2020-12-09 10:03:22 +00:00
case <- oq . notify :
// There's work to do, either because getPendingFromDatabase
// told us there is, or because a new event has come in via
// sendEvent/sendEDU.
2020-07-09 14:39:35 +00:00
case <- time . After ( queueIdleTimeout ) :
2020-07-02 16:43:07 +00:00
// The worker is idle so stop the goroutine. It'll get
// restarted automatically the next time we have an event to
// send.
return
2017-06-28 15:10:17 +00:00
}
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
// If we are backing off this server then wait for the
2020-06-01 17:34:08 +00:00
// backoff duration to complete first, or until explicitly
// told to retry.
2020-09-21 12:30:37 +00:00
until , blacklisted := oq . statistics . BackoffInfo ( )
if blacklisted {
2020-08-07 17:50:29 +00:00
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
log . Warnf ( "Blacklisting %q due to exceeding backoff threshold" , oq . destination )
2020-12-09 10:03:22 +00:00
oq . pendingMutex . Lock ( )
for i := range oq . pendingPDUs {
oq . pendingPDUs [ i ] = nil
}
for i := range oq . pendingEDUs {
oq . pendingEDUs [ i ] = nil
}
oq . pendingPDUs = nil
oq . pendingEDUs = nil
oq . pendingMutex . Unlock ( )
2020-08-07 17:50:29 +00:00
return
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
}
2020-09-22 13:53:36 +00:00
if until != nil && until . After ( time . Now ( ) ) {
2020-09-21 12:30:37 +00:00
// We haven't backed off yet, so wait for the suggested amount of
// time.
duration := time . Until ( * until )
log . Warnf ( "Backing off %q for %s" , oq . destination , duration )
2020-12-16 15:02:39 +00:00
oq . backingOff . Store ( true )
destinationQueueBackingOff . Inc ( )
2020-09-21 12:30:37 +00:00
select {
case <- time . After ( duration ) :
case <- oq . interruptBackoff :
}
2020-12-16 15:02:39 +00:00
destinationQueueBackingOff . Dec ( )
oq . backingOff . Store ( false )
2020-09-21 12:30:37 +00:00
}
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
2020-12-09 10:03:22 +00:00
// Work out which PDUs/EDUs to include in the next transaction.
oq . pendingMutex . RLock ( )
pduCount := len ( oq . pendingPDUs )
eduCount := len ( oq . pendingEDUs )
if pduCount > maxPDUsPerTransaction {
pduCount = maxPDUsPerTransaction
}
if eduCount > maxEDUsPerTransaction {
eduCount = maxEDUsPerTransaction
}
toSendPDUs := oq . pendingPDUs [ : pduCount ]
toSendEDUs := oq . pendingEDUs [ : eduCount ]
oq . pendingMutex . RUnlock ( )
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
// If we have pending PDUs or EDUs then construct a transaction.
2020-12-09 10:03:22 +00:00
// Try sending the next transaction and see what happens.
transaction , pc , ec , terr := oq . nextTransaction ( toSendPDUs , toSendEDUs )
if terr != nil {
// We failed to send the transaction. Mark it as a failure.
oq . statistics . Failure ( )
} else if transaction {
// If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID.
oq . statistics . Success ( )
oq . pendingMutex . Lock ( )
for i := range oq . pendingPDUs [ : pc ] {
oq . pendingPDUs [ i ] = nil
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
}
2020-12-09 10:03:22 +00:00
for i := range oq . pendingEDUs [ : ec ] {
oq . pendingEDUs [ i ] = nil
}
oq . pendingPDUs = oq . pendingPDUs [ pc : ]
oq . pendingEDUs = oq . pendingEDUs [ ec : ]
oq . pendingMutex . Unlock ( )
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
}
2020-07-01 10:46:38 +00:00
}
}
2020-04-03 13:29:06 +00:00
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
2020-12-09 10:03:22 +00:00
func ( oq * destinationQueue ) nextTransaction (
pdus [ ] * queuedPDU ,
edus [ ] * queuedEDU ,
) ( bool , int , int , error ) {
// If there's no projected transaction ID then generate one. If
// the transaction succeeds then we'll set it back to "" so that
// we generate a new one next time. If it fails, we'll preserve
// it so that we retry with the same transaction ID.
2020-07-01 10:46:38 +00:00
oq . transactionIDMutex . Lock ( )
2020-12-09 10:03:22 +00:00
if oq . transactionID == "" {
now := gomatrixserverlib . AsTimestamp ( time . Now ( ) )
oq . transactionID = gomatrixserverlib . TransactionID ( fmt . Sprintf ( "%d-%d" , now , oq . statistics . SuccessCount ( ) ) )
}
2020-07-01 10:46:38 +00:00
oq . transactionIDMutex . Unlock ( )
// Create the transaction.
2020-02-28 14:54:51 +00:00
t := gomatrixserverlib . Transaction {
2020-03-27 16:28:22 +00:00
PDUs : [ ] json . RawMessage { } ,
2020-02-28 14:54:51 +00:00
EDUs : [ ] gomatrixserverlib . EDU { } ,
}
2017-06-28 15:10:17 +00:00
t . Origin = oq . origin
t . Destination = oq . destination
2020-07-01 10:46:38 +00:00
t . OriginServerTS = gomatrixserverlib . AsTimestamp ( time . Now ( ) )
2020-12-09 10:03:22 +00:00
t . TransactionID = oq . transactionID
2020-07-20 15:55:20 +00:00
2020-07-01 10:46:38 +00:00
// If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here.
2020-07-20 15:55:20 +00:00
if len ( pdus ) == 0 && len ( edus ) == 0 {
2020-12-09 10:03:22 +00:00
return false , 0 , 0 , nil
2020-07-01 10:46:38 +00:00
}
2020-12-09 10:03:22 +00:00
var pduReceipts [ ] * shared . Receipt
var eduReceipts [ ] * shared . Receipt
2018-08-10 15:26:57 +00:00
2020-07-01 10:46:38 +00:00
// Go through PDUs that we retrieved from the database, if any,
// and add them into the transaction.
for _ , pdu := range pdus {
2020-12-09 10:03:22 +00:00
if pdu == nil || pdu . pdu == nil {
continue
}
2020-03-27 16:28:22 +00:00
// Append the JSON of the event, since this is a json.RawMessage type in the
// gomatrixserverlib.Transaction struct
2020-12-09 10:03:22 +00:00
t . PDUs = append ( t . PDUs , pdu . pdu . JSON ( ) )
pduReceipts = append ( pduReceipts , pdu . receipt )
2017-06-28 15:10:17 +00:00
}
2018-08-10 15:26:57 +00:00
2020-07-01 10:46:38 +00:00
// Do the same for pending EDUS in the queue.
2020-07-20 15:55:20 +00:00
for _ , edu := range edus {
2020-12-09 10:03:22 +00:00
if edu == nil || edu . edu == nil {
continue
}
t . EDUs = append ( t . EDUs , * edu . edu )
eduReceipts = append ( eduReceipts , edu . receipt )
2018-08-10 15:26:57 +00:00
}
2020-08-07 14:00:23 +00:00
logrus . WithField ( "server_name" , oq . destination ) . Debugf ( "Sending transaction %q containing %d PDUs, %d EDUs" , t . TransactionID , len ( t . PDUs ) , len ( t . EDUs ) )
2020-04-03 13:29:06 +00:00
2020-07-01 10:46:38 +00:00
// Try to send the transaction to the destination server.
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
// TODO: we should check for 500-ish fails vs 400-ish here,
// since we shouldn't queue things indefinitely in response
// to a 400-ish error
2021-01-26 12:56:20 +00:00
ctx , cancel := context . WithTimeout ( oq . process . Context ( ) , time . Minute * 5 )
2020-07-08 13:52:48 +00:00
defer cancel ( )
2020-12-09 10:03:22 +00:00
_ , err := oq . client . SendTransaction ( ctx , t )
2021-07-27 13:26:37 +00:00
switch e := err . ( type ) {
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
case nil :
2020-07-01 10:46:38 +00:00
// Clean up the transaction in the database.
2020-12-09 10:03:22 +00:00
if pduReceipts != nil {
2020-07-20 15:55:20 +00:00
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
2020-12-09 10:03:22 +00:00
if err = oq . db . CleanPDUs ( context . Background ( ) , oq . destination , pduReceipts ) ; err != nil {
log . WithError ( err ) . Errorf ( "Failed to clean PDUs for server %q" , t . Destination )
2020-07-20 15:55:20 +00:00
}
}
2020-12-09 10:03:22 +00:00
if eduReceipts != nil {
2020-07-20 15:55:20 +00:00
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
2020-12-09 10:03:22 +00:00
if err = oq . db . CleanEDUs ( context . Background ( ) , oq . destination , eduReceipts ) ; err != nil {
log . WithError ( err ) . Errorf ( "Failed to clean EDUs for server %q" , t . Destination )
2020-07-20 15:55:20 +00:00
}
2020-07-01 10:46:38 +00:00
}
2020-12-09 10:03:22 +00:00
// Reset the transaction ID.
oq . transactionIDMutex . Lock ( )
oq . transactionID = ""
oq . transactionIDMutex . Unlock ( )
return true , len ( t . PDUs ) , len ( t . EDUs ) , nil
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
case gomatrix . HTTPError :
2020-07-07 15:53:10 +00:00
// Report that we failed to send the transaction and we
// will retry again, subject to backoff.
2021-07-27 13:26:37 +00:00
log . WithError ( err ) . WithField ( "code" , e . Code ) . WithField ( "message" , e . Message ) . WithField ( "content" , string ( e . Contents ) ) . Error ( "gomatrix.HTTPError when sending transaction" )
2020-12-09 10:03:22 +00:00
return false , 0 , 0 , err
Improve federation sender performance, implement backoff and blacklisting, fix up invites a bit (#1007)
* Improve federation sender performance and behaviour, add backoff
* Tweaks
* Tweaks
* Tweaks
* Take copies of events before passing to destination queues
* Don't accidentally drop queued messages
* Don't take copies again
* Tidy up a bit
* Break out statistics (tracked component-wide), report success and failures from Perform actions
* Fix comment, use atomic add
* Improve logic a bit, don't block on wakeup, move idle check
* Don't retry sucessful invites, don't dispatch sendEvent, sendInvite etc
* Dedupe destinations, fix other bug hopefully
* Dispatch sends again
* Federation sender to ignore invites that are destined locally
* Loopback invite events
* Remodel a bit with channels
* Linter
* Only loopback invite event if we know the room
* We should tell other resident servers about the invite if we know about the room
* Correct invite signing
* Fix invite loopback
* Check HTTP response codes, push new invites to front of queue
* Review comments
2020-05-07 11:42:06 +00:00
default :
2020-04-03 13:29:06 +00:00
log . WithFields ( log . Fields {
"destination" : oq . destination ,
log . ErrorKey : err ,
2021-02-04 12:25:31 +00:00
} ) . Debugf ( "Failed to send transaction %q" , t . TransactionID )
2020-12-09 10:03:22 +00:00
return false , 0 , 0 , err
2020-04-03 13:29:06 +00:00
}
}