mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Add ability to disable federation (#1604)
* Allow disabling federation * Don't start federation queues if disabled * Fix for Go 1.13
This commit is contained in:
parent
b4c3692dcc
commit
bdf6490375
6 changed files with 88 additions and 23 deletions
|
@ -59,8 +59,8 @@ func NewInternalAPI(
|
|||
consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||
|
||||
queues := queue.NewOutgoingQueues(
|
||||
federationSenderDB, cfg.Matrix.ServerName, federation,
|
||||
rsAPI, stats,
|
||||
federationSenderDB, cfg.Matrix.DisableFederation,
|
||||
cfg.Matrix.ServerName, federation, rsAPI, stats,
|
||||
&queue.SigningInfo{
|
||||
KeyID: cfg.Matrix.KeyID,
|
||||
PrivateKey: cfg.Matrix.PrivateKey,
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
// matrix servers
|
||||
type OutgoingQueues struct {
|
||||
db storage.Database
|
||||
disabled bool
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
origin gomatrixserverlib.ServerName
|
||||
client *gomatrixserverlib.FederationClient
|
||||
|
@ -46,6 +47,7 @@ type OutgoingQueues struct {
|
|||
// NewOutgoingQueues makes a new OutgoingQueues
|
||||
func NewOutgoingQueues(
|
||||
db storage.Database,
|
||||
disabled bool,
|
||||
origin gomatrixserverlib.ServerName,
|
||||
client *gomatrixserverlib.FederationClient,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
|
@ -53,6 +55,7 @@ func NewOutgoingQueues(
|
|||
signing *SigningInfo,
|
||||
) *OutgoingQueues {
|
||||
queues := &OutgoingQueues{
|
||||
disabled: disabled,
|
||||
db: db,
|
||||
rsAPI: rsAPI,
|
||||
origin: origin,
|
||||
|
@ -62,28 +65,30 @@ func NewOutgoingQueues(
|
|||
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
|
||||
}
|
||||
// Look up which servers we have pending items for and then rehydrate those queues.
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
|
||||
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
|
||||
for _, serverName := range names {
|
||||
serverNames[serverName] = struct{}{}
|
||||
if !disabled {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
|
||||
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
|
||||
for _, serverName := range names {
|
||||
serverNames[serverName] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
||||
}
|
||||
} else {
|
||||
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
|
||||
}
|
||||
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
|
||||
for _, serverName := range names {
|
||||
serverNames[serverName] = struct{}{}
|
||||
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
|
||||
for _, serverName := range names {
|
||||
serverNames[serverName] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||
}
|
||||
} else {
|
||||
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
|
||||
}
|
||||
for serverName := range serverNames {
|
||||
if !queues.getQueue(serverName).statistics.Blacklisted() {
|
||||
queues.getQueue(serverName).wakeQueueIfNeeded()
|
||||
for serverName := range serverNames {
|
||||
if !queues.getQueue(serverName).statistics.Blacklisted() {
|
||||
queues.getQueue(serverName).wakeQueueIfNeeded()
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
return queues
|
||||
}
|
||||
|
||||
|
@ -122,6 +127,9 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
ev *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName,
|
||||
destinations []gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
if oqs.disabled {
|
||||
return fmt.Errorf("federation is disabled")
|
||||
}
|
||||
if origin != oqs.origin {
|
||||
// TODO: Support virtual hosting; gh issue #577.
|
||||
return fmt.Errorf(
|
||||
|
@ -181,6 +189,9 @@ func (oqs *OutgoingQueues) SendEDU(
|
|||
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
|
||||
destinations []gomatrixserverlib.ServerName,
|
||||
) error {
|
||||
if oqs.disabled {
|
||||
return fmt.Errorf("federation is disabled")
|
||||
}
|
||||
if origin != oqs.origin {
|
||||
// TODO: Support virtual hosting; gh issue #577.
|
||||
return fmt.Errorf(
|
||||
|
@ -243,6 +254,9 @@ func (oqs *OutgoingQueues) SendEDU(
|
|||
|
||||
// RetryServer attempts to resend events to the given server if we had given up.
|
||||
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
|
||||
if oqs.disabled {
|
||||
return
|
||||
}
|
||||
q := oqs.getQueue(srv)
|
||||
if q == nil {
|
||||
return
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue