mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 13:22:46 +00:00
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
This commit is contained in:
parent
48fa869fa3
commit
5b73592f5a
77 changed files with 7646 additions and 1373 deletions
|
@ -29,7 +29,7 @@ import (
|
|||
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage/shared"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
)
|
||||
|
@ -70,7 +70,7 @@ type destinationQueue struct {
|
|||
// Send event adds the event to the pending queue for the destination.
|
||||
// If the queue is empty then it starts a background goroutine to
|
||||
// start sending events to that destination.
|
||||
func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) {
|
||||
func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, dbReceipt *receipt.Receipt) {
|
||||
if event == nil {
|
||||
logrus.Errorf("attempt to send nil PDU with destination %q", oq.destination)
|
||||
return
|
||||
|
@ -84,8 +84,8 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
|||
oq.pendingMutex.Lock()
|
||||
if len(oq.pendingPDUs) < maxPDUsInMemory {
|
||||
oq.pendingPDUs = append(oq.pendingPDUs, &queuedPDU{
|
||||
pdu: event,
|
||||
receipt: receipt,
|
||||
pdu: event,
|
||||
dbReceipt: dbReceipt,
|
||||
})
|
||||
} else {
|
||||
oq.overflowed.Store(true)
|
||||
|
@ -101,7 +101,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
|
|||
// sendEDU adds the EDU event to the pending queue for the destination.
|
||||
// If the queue is empty then it starts a background goroutine to
|
||||
// start sending events to that destination.
|
||||
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) {
|
||||
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, dbReceipt *receipt.Receipt) {
|
||||
if event == nil {
|
||||
logrus.Errorf("attempt to send nil EDU with destination %q", oq.destination)
|
||||
return
|
||||
|
@ -115,8 +115,8 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
|||
oq.pendingMutex.Lock()
|
||||
if len(oq.pendingEDUs) < maxEDUsInMemory {
|
||||
oq.pendingEDUs = append(oq.pendingEDUs, &queuedEDU{
|
||||
edu: event,
|
||||
receipt: receipt,
|
||||
edu: event,
|
||||
dbReceipt: dbReceipt,
|
||||
})
|
||||
} else {
|
||||
oq.overflowed.Store(true)
|
||||
|
@ -210,10 +210,10 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
|||
gotPDUs := map[string]struct{}{}
|
||||
gotEDUs := map[string]struct{}{}
|
||||
for _, pdu := range oq.pendingPDUs {
|
||||
gotPDUs[pdu.receipt.String()] = struct{}{}
|
||||
gotPDUs[pdu.dbReceipt.String()] = struct{}{}
|
||||
}
|
||||
for _, edu := range oq.pendingEDUs {
|
||||
gotEDUs[edu.receipt.String()] = struct{}{}
|
||||
gotEDUs[edu.dbReceipt.String()] = struct{}{}
|
||||
}
|
||||
|
||||
overflowed := false
|
||||
|
@ -371,7 +371,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
|
||||
// If we have pending PDUs or EDUs then construct a transaction.
|
||||
// Try sending the next transaction and see what happens.
|
||||
terr := oq.nextTransaction(toSendPDUs, toSendEDUs)
|
||||
terr, sendMethod := oq.nextTransaction(toSendPDUs, toSendEDUs)
|
||||
if terr != nil {
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
_, blacklisted := oq.statistics.Failure()
|
||||
|
@ -388,18 +388,19 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
return
|
||||
}
|
||||
} else {
|
||||
oq.handleTransactionSuccess(pduCount, eduCount)
|
||||
oq.handleTransactionSuccess(pduCount, eduCount, sendMethod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nextTransaction creates a new transaction from the pending event
|
||||
// queue and sends it.
|
||||
// Returns an error if the transaction wasn't sent.
|
||||
// Returns an error if the transaction wasn't sent. And whether the success
|
||||
// was to a relay server or not.
|
||||
func (oq *destinationQueue) nextTransaction(
|
||||
pdus []*queuedPDU,
|
||||
edus []*queuedEDU,
|
||||
) error {
|
||||
) (err error, sendMethod statistics.SendMethod) {
|
||||
// Create the transaction.
|
||||
t, pduReceipts, eduReceipts := oq.createTransaction(pdus, edus)
|
||||
logrus.WithField("server_name", oq.destination).Debugf("Sending transaction %q containing %d PDUs, %d EDUs", t.TransactionID, len(t.PDUs), len(t.EDUs))
|
||||
|
@ -407,7 +408,37 @@ func (oq *destinationQueue) nextTransaction(
|
|||
// Try to send the transaction to the destination server.
|
||||
ctx, cancel := context.WithTimeout(oq.process.Context(), time.Minute*5)
|
||||
defer cancel()
|
||||
_, err := oq.client.SendTransaction(ctx, t)
|
||||
|
||||
relayServers := oq.statistics.KnownRelayServers()
|
||||
if oq.statistics.AssumedOffline() && len(relayServers) > 0 {
|
||||
sendMethod = statistics.SendViaRelay
|
||||
relaySuccess := false
|
||||
logrus.Infof("Sending to relay servers: %v", relayServers)
|
||||
// TODO : how to pass through actual userID here?!?!?!?!
|
||||
userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
|
||||
if userErr != nil {
|
||||
return userErr, sendMethod
|
||||
}
|
||||
|
||||
// Attempt sending to each known relay server.
|
||||
for _, relayServer := range relayServers {
|
||||
_, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
|
||||
if relayErr != nil {
|
||||
err = relayErr
|
||||
} else {
|
||||
// If sending to one of the relay servers succeeds, consider the send successful.
|
||||
relaySuccess = true
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the error if sending to any of the relay servers succeeded.
|
||||
if relaySuccess {
|
||||
err = nil
|
||||
}
|
||||
} else {
|
||||
sendMethod = statistics.SendDirect
|
||||
_, err = oq.client.SendTransaction(ctx, t)
|
||||
}
|
||||
switch errResponse := err.(type) {
|
||||
case nil:
|
||||
// Clean up the transaction in the database.
|
||||
|
@ -427,7 +458,7 @@ func (oq *destinationQueue) nextTransaction(
|
|||
oq.transactionIDMutex.Lock()
|
||||
oq.transactionID = ""
|
||||
oq.transactionIDMutex.Unlock()
|
||||
return nil
|
||||
return nil, sendMethod
|
||||
case gomatrix.HTTPError:
|
||||
// Report that we failed to send the transaction and we
|
||||
// will retry again, subject to backoff.
|
||||
|
@ -437,13 +468,13 @@ func (oq *destinationQueue) nextTransaction(
|
|||
// to a 400-ish error
|
||||
code := errResponse.Code
|
||||
logrus.Debug("Transaction failed with HTTP", code)
|
||||
return err
|
||||
return err, sendMethod
|
||||
default:
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"destination": oq.destination,
|
||||
logrus.ErrorKey: err,
|
||||
}).Debugf("Failed to send transaction %q", t.TransactionID)
|
||||
return err
|
||||
return err, sendMethod
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -453,7 +484,7 @@ func (oq *destinationQueue) nextTransaction(
|
|||
func (oq *destinationQueue) createTransaction(
|
||||
pdus []*queuedPDU,
|
||||
edus []*queuedEDU,
|
||||
) (gomatrixserverlib.Transaction, []*shared.Receipt, []*shared.Receipt) {
|
||||
) (gomatrixserverlib.Transaction, []*receipt.Receipt, []*receipt.Receipt) {
|
||||
// If there's no projected transaction ID then generate one. If
|
||||
// the transaction succeeds then we'll set it back to "" so that
|
||||
// we generate a new one next time. If it fails, we'll preserve
|
||||
|
@ -474,8 +505,8 @@ func (oq *destinationQueue) createTransaction(
|
|||
t.OriginServerTS = gomatrixserverlib.AsTimestamp(time.Now())
|
||||
t.TransactionID = oq.transactionID
|
||||
|
||||
var pduReceipts []*shared.Receipt
|
||||
var eduReceipts []*shared.Receipt
|
||||
var pduReceipts []*receipt.Receipt
|
||||
var eduReceipts []*receipt.Receipt
|
||||
|
||||
// Go through PDUs that we retrieved from the database, if any,
|
||||
// and add them into the transaction.
|
||||
|
@ -487,7 +518,7 @@ func (oq *destinationQueue) createTransaction(
|
|||
// Append the JSON of the event, since this is a json.RawMessage type in the
|
||||
// gomatrixserverlib.Transaction struct
|
||||
t.PDUs = append(t.PDUs, pdu.pdu.JSON())
|
||||
pduReceipts = append(pduReceipts, pdu.receipt)
|
||||
pduReceipts = append(pduReceipts, pdu.dbReceipt)
|
||||
}
|
||||
|
||||
// Do the same for pending EDUS in the queue.
|
||||
|
@ -497,7 +528,7 @@ func (oq *destinationQueue) createTransaction(
|
|||
continue
|
||||
}
|
||||
t.EDUs = append(t.EDUs, *edu.edu)
|
||||
eduReceipts = append(eduReceipts, edu.receipt)
|
||||
eduReceipts = append(eduReceipts, edu.dbReceipt)
|
||||
}
|
||||
|
||||
return t, pduReceipts, eduReceipts
|
||||
|
@ -530,10 +561,11 @@ func (oq *destinationQueue) blacklistDestination() {
|
|||
|
||||
// handleTransactionSuccess updates the cached event queues as well as the success and
|
||||
// backoff information for this server.
|
||||
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int) {
|
||||
func (oq *destinationQueue) handleTransactionSuccess(pduCount int, eduCount int, sendMethod statistics.SendMethod) {
|
||||
// If we successfully sent the transaction then clear out
|
||||
// the pending events and EDUs, and wipe our transaction ID.
|
||||
oq.statistics.Success()
|
||||
|
||||
oq.statistics.Success(sendMethod)
|
||||
oq.pendingMutex.Lock()
|
||||
defer oq.pendingMutex.Unlock()
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ import (
|
|||
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage/shared"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
)
|
||||
|
@ -138,13 +138,13 @@ func NewOutgoingQueues(
|
|||
}
|
||||
|
||||
type queuedPDU struct {
|
||||
receipt *shared.Receipt
|
||||
pdu *gomatrixserverlib.HeaderedEvent
|
||||
dbReceipt *receipt.Receipt
|
||||
pdu *gomatrixserverlib.HeaderedEvent
|
||||
}
|
||||
|
||||
type queuedEDU struct {
|
||||
receipt *shared.Receipt
|
||||
edu *gomatrixserverlib.EDU
|
||||
dbReceipt *receipt.Receipt
|
||||
edu *gomatrixserverlib.EDU
|
||||
}
|
||||
|
||||
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
|
||||
|
@ -374,24 +374,13 @@ func (oqs *OutgoingQueues) SendEDU(
|
|||
return nil
|
||||
}
|
||||
|
||||
// IsServerBlacklisted returns whether or not the provided server is currently
|
||||
// blacklisted.
|
||||
func (oqs *OutgoingQueues) IsServerBlacklisted(srv gomatrixserverlib.ServerName) bool {
|
||||
return oqs.statistics.ForServer(srv).Blacklisted()
|
||||
}
|
||||
|
||||
// RetryServer attempts to resend events to the given server if we had given up.
|
||||
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
|
||||
func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName, wasBlacklisted bool) {
|
||||
if oqs.disabled {
|
||||
return
|
||||
}
|
||||
|
||||
serverStatistics := oqs.statistics.ForServer(srv)
|
||||
forceWakeup := serverStatistics.Blacklisted()
|
||||
serverStatistics.RemoveBlacklist()
|
||||
serverStatistics.ClearBackoff()
|
||||
|
||||
if queue := oqs.getQueue(srv); queue != nil {
|
||||
queue.wakeQueueIfEventsPending(forceWakeup)
|
||||
queue.wakeQueueIfEventsPending(wasBlacklisted)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -26,13 +25,11 @@ import (
|
|||
"gotest.tools/v3/poll"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage/shared"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
@ -57,7 +54,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase
|
|||
}
|
||||
} else {
|
||||
// Fake Database
|
||||
db := createDatabase()
|
||||
db := test.NewInMemoryFederationDatabase()
|
||||
b := struct {
|
||||
ProcessContext *process.ProcessContext
|
||||
}{ProcessContext: process.NewProcessContext()}
|
||||
|
@ -65,220 +62,6 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase
|
|||
}
|
||||
}
|
||||
|
||||
func createDatabase() storage.Database {
|
||||
return &fakeDatabase{
|
||||
pendingPDUServers: make(map[gomatrixserverlib.ServerName]struct{}),
|
||||
pendingEDUServers: make(map[gomatrixserverlib.ServerName]struct{}),
|
||||
blacklistedServers: make(map[gomatrixserverlib.ServerName]struct{}),
|
||||
pendingPDUs: make(map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent),
|
||||
pendingEDUs: make(map[*shared.Receipt]*gomatrixserverlib.EDU),
|
||||
associatedPDUs: make(map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}),
|
||||
associatedEDUs: make(map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
type fakeDatabase struct {
|
||||
storage.Database
|
||||
dbMutex sync.Mutex
|
||||
pendingPDUServers map[gomatrixserverlib.ServerName]struct{}
|
||||
pendingEDUServers map[gomatrixserverlib.ServerName]struct{}
|
||||
blacklistedServers map[gomatrixserverlib.ServerName]struct{}
|
||||
pendingPDUs map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent
|
||||
pendingEDUs map[*shared.Receipt]*gomatrixserverlib.EDU
|
||||
associatedPDUs map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}
|
||||
associatedEDUs map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}
|
||||
}
|
||||
|
||||
var nidMutex sync.Mutex
|
||||
var nid = int64(0)
|
||||
|
||||
func (d *fakeDatabase) StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal([]byte(js), &event); err == nil {
|
||||
nidMutex.Lock()
|
||||
defer nidMutex.Unlock()
|
||||
nid++
|
||||
receipt := shared.NewReceipt(nid)
|
||||
d.pendingPDUs[&receipt] = &event
|
||||
return &receipt, nil
|
||||
}
|
||||
|
||||
var edu gomatrixserverlib.EDU
|
||||
if err := json.Unmarshal([]byte(js), &edu); err == nil {
|
||||
nidMutex.Lock()
|
||||
defer nidMutex.Unlock()
|
||||
nid++
|
||||
receipt := shared.NewReceipt(nid)
|
||||
d.pendingEDUs[&receipt] = &edu
|
||||
return &receipt, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("Failed to determine type of json to store")
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error) {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
pduCount := 0
|
||||
pdus = make(map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent)
|
||||
if receipts, ok := d.associatedPDUs[serverName]; ok {
|
||||
for receipt := range receipts {
|
||||
if event, ok := d.pendingPDUs[receipt]; ok {
|
||||
pdus[receipt] = event
|
||||
pduCount++
|
||||
if pduCount == limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return pdus, nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
eduCount := 0
|
||||
edus = make(map[*shared.Receipt]*gomatrixserverlib.EDU)
|
||||
if receipts, ok := d.associatedEDUs[serverName]; ok {
|
||||
for receipt := range receipts {
|
||||
if event, ok := d.pendingEDUs[receipt]; ok {
|
||||
edus[receipt] = event
|
||||
eduCount++
|
||||
if eduCount == limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return edus, nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) AssociatePDUWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.ServerName]struct{}, receipt *shared.Receipt) error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
if _, ok := d.pendingPDUs[receipt]; ok {
|
||||
for destination := range destinations {
|
||||
if _, ok := d.associatedPDUs[destination]; !ok {
|
||||
d.associatedPDUs[destination] = make(map[*shared.Receipt]struct{})
|
||||
}
|
||||
d.associatedPDUs[destination][receipt] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
} else {
|
||||
return errors.New("PDU doesn't exist")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) AssociateEDUWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.ServerName]struct{}, receipt *shared.Receipt, eduType string, expireEDUTypes map[string]time.Duration) error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
if _, ok := d.pendingEDUs[receipt]; ok {
|
||||
for destination := range destinations {
|
||||
if _, ok := d.associatedEDUs[destination]; !ok {
|
||||
d.associatedEDUs[destination] = make(map[*shared.Receipt]struct{})
|
||||
}
|
||||
d.associatedEDUs[destination][receipt] = struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
} else {
|
||||
return errors.New("EDU doesn't exist")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
if pdus, ok := d.associatedPDUs[serverName]; ok {
|
||||
for _, receipt := range receipts {
|
||||
delete(pdus, receipt)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
if edus, ok := d.associatedEDUs[serverName]; ok {
|
||||
for _, receipt := range receipts {
|
||||
delete(edus, receipt)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
servers := []gomatrixserverlib.ServerName{}
|
||||
for server := range d.pendingPDUServers {
|
||||
servers = append(servers, server)
|
||||
}
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
servers := []gomatrixserverlib.ServerName{}
|
||||
for server := range d.pendingEDUServers {
|
||||
servers = append(servers, server)
|
||||
}
|
||||
return servers, nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
d.blacklistedServers[serverName] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
delete(d.blacklistedServers, serverName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) RemoveAllServersFromBlacklist() error {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
d.blacklistedServers = make(map[gomatrixserverlib.ServerName]struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *fakeDatabase) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
|
||||
d.dbMutex.Lock()
|
||||
defer d.dbMutex.Unlock()
|
||||
|
||||
isBlacklisted := false
|
||||
if _, ok := d.blacklistedServers[serverName]; ok {
|
||||
isBlacklisted = true
|
||||
}
|
||||
|
||||
return isBlacklisted, nil
|
||||
}
|
||||
|
||||
type stubFederationRoomServerAPI struct {
|
||||
rsapi.FederationRoomserverAPI
|
||||
}
|
||||
|
@ -290,8 +73,10 @@ func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Cont
|
|||
|
||||
type stubFederationClient struct {
|
||||
api.FederationClient
|
||||
shouldTxSucceed bool
|
||||
txCount atomic.Uint32
|
||||
shouldTxSucceed bool
|
||||
shouldTxRelaySucceed bool
|
||||
txCount atomic.Uint32
|
||||
txRelayCount atomic.Uint32
|
||||
}
|
||||
|
||||
func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
|
||||
|
@ -304,6 +89,16 @@ func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixse
|
|||
return gomatrixserverlib.RespSend{}, result
|
||||
}
|
||||
|
||||
func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) {
|
||||
var result error
|
||||
if !f.shouldTxRelaySucceed {
|
||||
result = fmt.Errorf("relay transaction failed")
|
||||
}
|
||||
|
||||
f.txRelayCount.Add(1)
|
||||
return gomatrixserverlib.EmptyResp{}, result
|
||||
}
|
||||
|
||||
func mustCreatePDU(t *testing.T) *gomatrixserverlib.HeaderedEvent {
|
||||
t.Helper()
|
||||
content := `{"type":"m.room.message"}`
|
||||
|
@ -319,15 +114,18 @@ func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU {
|
|||
return &gomatrixserverlib.EDU{Type: gomatrixserverlib.MTyping}
|
||||
}
|
||||
|
||||
func testSetup(failuresUntilBlacklist uint32, shouldTxSucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
|
||||
func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxRelaySucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
|
||||
db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase)
|
||||
|
||||
fc := &stubFederationClient{
|
||||
shouldTxSucceed: shouldTxSucceed,
|
||||
txCount: *atomic.NewUint32(0),
|
||||
shouldTxSucceed: shouldTxSucceed,
|
||||
shouldTxRelaySucceed: shouldTxRelaySucceed,
|
||||
txCount: *atomic.NewUint32(0),
|
||||
txRelayCount: *atomic.NewUint32(0),
|
||||
}
|
||||
rs := &stubFederationRoomServerAPI{}
|
||||
stats := statistics.NewStatistics(db, failuresUntilBlacklist)
|
||||
|
||||
stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline)
|
||||
signingInfo := []*gomatrixserverlib.SigningIdentity{
|
||||
{
|
||||
KeyID: "ed21019:auto",
|
||||
|
@ -344,7 +142,7 @@ func TestSendPDUOnSuccessRemovedFromDB(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -373,7 +171,7 @@ func TestSendEDUOnSuccessRemovedFromDB(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -402,7 +200,7 @@ func TestSendPDUOnFailStoredInDB(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -432,7 +230,7 @@ func TestSendEDUOnFailStoredInDB(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -462,7 +260,7 @@ func TestSendPDUAgainDoesntInterruptBackoff(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -513,7 +311,7 @@ func TestSendEDUAgainDoesntInterruptBackoff(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -564,7 +362,7 @@ func TestSendPDUMultipleFailuresBlacklisted(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(2)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -596,7 +394,7 @@ func TestSendEDUMultipleFailuresBlacklisted(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(2)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -628,7 +426,7 @@ func TestSendPDUBlacklistedWithPriorExternalFailure(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(2)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -662,7 +460,7 @@ func TestSendEDUBlacklistedWithPriorExternalFailure(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(2)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -696,7 +494,7 @@ func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(1)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -730,8 +528,8 @@ func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
|
|||
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
fc.shouldTxSucceed = true
|
||||
db.RemoveServerFromBlacklist(destination)
|
||||
queues.RetryServer(destination)
|
||||
wasBlacklisted := dest.statistics.MarkServerAlive()
|
||||
queues.RetryServer(destination, wasBlacklisted)
|
||||
checkRetry := func(log poll.LogT) poll.Result {
|
||||
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
|
||||
assert.NoError(t, dbErr)
|
||||
|
@ -747,7 +545,7 @@ func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(1)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -781,8 +579,8 @@ func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
|
|||
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
fc.shouldTxSucceed = true
|
||||
db.RemoveServerFromBlacklist(destination)
|
||||
queues.RetryServer(destination)
|
||||
wasBlacklisted := dest.statistics.MarkServerAlive()
|
||||
queues.RetryServer(destination, wasBlacklisted)
|
||||
checkRetry := func(log poll.LogT) poll.Result {
|
||||
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
|
||||
assert.NoError(t, dbErr)
|
||||
|
@ -801,7 +599,7 @@ func TestSendPDUBatches(t *testing.T) {
|
|||
|
||||
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -845,7 +643,7 @@ func TestSendEDUBatches(t *testing.T) {
|
|||
|
||||
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -889,7 +687,7 @@ func TestSendPDUAndEDUBatches(t *testing.T) {
|
|||
|
||||
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -940,7 +738,7 @@ func TestExternalFailureBackoffDoesntStartQueue(t *testing.T) {
|
|||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
|
@ -978,7 +776,7 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
|
|||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
destinations := map[gomatrixserverlib.ServerName]struct{}{destination: {}}
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, dbType, true)
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, dbType, true)
|
||||
// NOTE : These defers aren't called if go test is killed so the dbs may not get cleaned up.
|
||||
defer close()
|
||||
defer func() {
|
||||
|
@ -1023,8 +821,8 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
|
|||
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
fc.shouldTxSucceed = true
|
||||
db.RemoveServerFromBlacklist(destination)
|
||||
queues.RetryServer(destination)
|
||||
wasBlacklisted := dest.statistics.MarkServerAlive()
|
||||
queues.RetryServer(destination, wasBlacklisted)
|
||||
checkRetry := func(log poll.LogT) poll.Result {
|
||||
pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200)
|
||||
assert.NoError(t, dbErrPDU)
|
||||
|
@ -1038,3 +836,147 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
|
|||
poll.WaitOn(t, checkRetry, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendPDUMultipleFailuresAssumedOffline(t *testing.T) {
|
||||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(7)
|
||||
failuresUntilAssumedOffline := uint32(2)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
<-pc.WaitForShutdown()
|
||||
}()
|
||||
|
||||
ev := mustCreatePDU(t)
|
||||
err := queues.SendEvent(ev, "localhost", []gomatrixserverlib.ServerName{destination})
|
||||
assert.NoError(t, err)
|
||||
|
||||
check := func(log poll.LogT) poll.Result {
|
||||
if fc.txCount.Load() == failuresUntilAssumedOffline {
|
||||
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
|
||||
assert.NoError(t, dbErr)
|
||||
if len(data) == 1 {
|
||||
if val, _ := db.IsServerAssumedOffline(context.Background(), destination); val {
|
||||
return poll.Success()
|
||||
}
|
||||
return poll.Continue("waiting for server to be assumed offline")
|
||||
}
|
||||
return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
|
||||
}
|
||||
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
|
||||
}
|
||||
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
}
|
||||
|
||||
func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) {
|
||||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(7)
|
||||
failuresUntilAssumedOffline := uint32(2)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
<-pc.WaitForShutdown()
|
||||
}()
|
||||
|
||||
ev := mustCreateEDU(t)
|
||||
err := queues.SendEDU(ev, "localhost", []gomatrixserverlib.ServerName{destination})
|
||||
assert.NoError(t, err)
|
||||
|
||||
check := func(log poll.LogT) poll.Result {
|
||||
if fc.txCount.Load() == failuresUntilAssumedOffline {
|
||||
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
|
||||
assert.NoError(t, dbErr)
|
||||
if len(data) == 1 {
|
||||
if val, _ := db.IsServerAssumedOffline(context.Background(), destination); val {
|
||||
return poll.Success()
|
||||
}
|
||||
return poll.Continue("waiting for server to be assumed offline")
|
||||
}
|
||||
return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
|
||||
}
|
||||
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
|
||||
}
|
||||
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
}
|
||||
|
||||
func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) {
|
||||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
failuresUntilAssumedOffline := uint32(1)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
<-pc.WaitForShutdown()
|
||||
}()
|
||||
|
||||
relayServers := []gomatrixserverlib.ServerName{"relayserver"}
|
||||
queues.statistics.ForServer(destination).AddRelayServers(relayServers)
|
||||
|
||||
ev := mustCreatePDU(t)
|
||||
err := queues.SendEvent(ev, "localhost", []gomatrixserverlib.ServerName{destination})
|
||||
assert.NoError(t, err)
|
||||
|
||||
check := func(log poll.LogT) poll.Result {
|
||||
if fc.txCount.Load() == 1 {
|
||||
if fc.txRelayCount.Load() == 1 {
|
||||
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
|
||||
assert.NoError(t, dbErr)
|
||||
if len(data) == 0 {
|
||||
return poll.Success()
|
||||
}
|
||||
return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
|
||||
}
|
||||
return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
|
||||
}
|
||||
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
|
||||
}
|
||||
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
assumedOffline, _ := db.IsServerAssumedOffline(context.Background(), destination)
|
||||
assert.Equal(t, true, assumedOffline)
|
||||
}
|
||||
|
||||
func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) {
|
||||
t.Parallel()
|
||||
failuresUntilBlacklist := uint32(16)
|
||||
failuresUntilAssumedOffline := uint32(1)
|
||||
destination := gomatrixserverlib.ServerName("remotehost")
|
||||
db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
|
||||
defer close()
|
||||
defer func() {
|
||||
pc.ShutdownDendrite()
|
||||
<-pc.WaitForShutdown()
|
||||
}()
|
||||
|
||||
relayServers := []gomatrixserverlib.ServerName{"relayserver"}
|
||||
queues.statistics.ForServer(destination).AddRelayServers(relayServers)
|
||||
|
||||
ev := mustCreateEDU(t)
|
||||
err := queues.SendEDU(ev, "localhost", []gomatrixserverlib.ServerName{destination})
|
||||
assert.NoError(t, err)
|
||||
|
||||
check := func(log poll.LogT) poll.Result {
|
||||
if fc.txCount.Load() == 1 {
|
||||
if fc.txRelayCount.Load() == 1 {
|
||||
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
|
||||
assert.NoError(t, dbErr)
|
||||
if len(data) == 0 {
|
||||
return poll.Success()
|
||||
}
|
||||
return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
|
||||
}
|
||||
return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
|
||||
}
|
||||
return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
|
||||
}
|
||||
poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
assumedOffline, _ := db.IsServerAssumedOffline(context.Background(), destination)
|
||||
assert.Equal(t, true, assumedOffline)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue