Persistent federation sender blacklist (#1214)

* Initial persistence of blacklists

* Move statistics folder

* Make MaxFederationRetries configurable

* Set lower failure thresholds for Yggdrasil demos

* Still write events into database for blacklisted hosts (they can be tidied up later)

* Review comments
This commit is contained in:
Neil Alexander 2020-07-22 17:01:29 +01:00 committed by GitHub
parent 470933789b
commit 1e71fd645e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 343 additions and 51 deletions

View file

@ -21,9 +21,9 @@ import (
"sync"
"time"
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
@ -51,7 +51,7 @@ type destinationQueue struct {
destination gomatrixserverlib.ServerName // destination of requests
running atomic.Bool // is the queue worker running?
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
statistics *statistics.ServerStatistics // statistics about this remote server
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID
@ -66,11 +66,6 @@ type destinationQueue struct {
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
log.Infof("%s is blacklisted; dropping event", oq.destination)
return
}
// Create a transaction ID. We'll either do this if we don't have
// one made up yet, or if we've exceeded the number of maximum
// events allowed in a single tranaction. We'll reset the counter
@ -97,13 +92,17 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
// We've successfully added a PDU to the transaction so increase
// the counter.
oq.transactionCount.Add(1)
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
// If we're blocking on waiting PDUs then tell the queue that we
// have work to do.
select {
case oq.notifyPDUs <- true:
default:
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
// If we're blocking on waiting PDUs then tell the queue that we
// have work to do.
select {
case oq.notifyPDUs <- true:
default:
}
}
}
@ -111,11 +110,6 @@ func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
log.Infof("%s is blacklisted; dropping ephemeral event", oq.destination)
return
}
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
@ -130,13 +124,17 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
// We've successfully added an EDU to the transaction so increase
// the counter.
oq.transactionCount.Add(1)
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
// If we're blocking on waiting PDUs then tell the queue that we
// have work to do.
select {
case oq.notifyEDUs <- true:
default:
// Check if the destination is blacklisted. If it isn't then wake
// up the queue.
if !oq.statistics.Blacklisted() {
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
// If we're blocking on waiting EDUs then tell the queue that we
// have work to do.
select {
case oq.notifyEDUs <- true:
default:
}
}
}

View file

@ -21,8 +21,8 @@ import (
"fmt"
"sync"
"github.com/matrix-org/dendrite/federationsender/statistics"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -36,7 +36,7 @@ type OutgoingQueues struct {
rsAPI api.RoomserverInternalAPI
origin gomatrixserverlib.ServerName
client *gomatrixserverlib.FederationClient
statistics *types.Statistics
statistics *statistics.Statistics
signing *SigningInfo
queuesMutex sync.Mutex // protects the below
queues map[gomatrixserverlib.ServerName]*destinationQueue
@ -48,7 +48,7 @@ func NewOutgoingQueues(
origin gomatrixserverlib.ServerName,
client *gomatrixserverlib.FederationClient,
rsAPI api.RoomserverInternalAPI,
statistics *types.Statistics,
statistics *statistics.Statistics,
signing *SigningInfo,
) *OutgoingQueues {
queues := &OutgoingQueues{
@ -77,7 +77,9 @@ func NewOutgoingQueues(
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
}
for serverName := range serverNames {
queues.getQueue(serverName).wakeQueueIfNeeded()
if !queues.getQueue(serverName).statistics.Blacklisted() {
queues.getQueue(serverName).wakeQueueIfNeeded()
}
}
return queues
}