Deduplicate FS database, EDU persistence table (#1207)

* Deduplicate FS database, add some EDU persistence groundwork

* Extend TransactionWriter to use optional existing transaction, use that for FS SQLite database writes

* Fix build due to bad keyserver import

* Working EDU persistence

* gocyclo, unsurprisingly

* Remove unused

* Update copyright notices
This commit is contained in:
Neil Alexander 2020-07-20 16:55:20 +01:00 committed by GitHub
parent f3c482b078
commit 11a39fe3b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 1334 additions and 698 deletions

View file

@ -22,6 +22,7 @@ import (
"time"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/federationsender/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
@ -31,8 +32,11 @@ import (
"go.uber.org/atomic"
)
const maxPDUsPerTransaction = 50
const queueIdleTimeout = time.Second * 30
const (
maxPDUsPerTransaction = 50
maxEDUsPerTransaction = 50
queueIdleTimeout = time.Second * 30
)
// destinationQueue is a queue of events for a single destination.
// It is responsible for sending the events to the destination and
@ -49,20 +53,19 @@ type destinationQueue struct {
backingOff atomic.Bool // true if we're backing off
statistics *types.ServerStatistics // statistics about this remote server
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
incomingEDUs chan *gomatrixserverlib.EDU // EDUs to send
transactionIDMutex sync.Mutex // protects transactionID
transactionID gomatrixserverlib.TransactionID // last transaction ID
transactionCount atomic.Int32 // how many events in this transaction so far
pendingEDUs []*gomatrixserverlib.EDU // owned by backgroundSend
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
notifyPDUs chan bool // interrupts idle wait for PDUs
notifyEDUs chan bool // interrupts idle wait for EDUs
interruptBackoff chan bool // interrupts backoff
}
// Send event adds the event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEvent(nid int64) {
func (oq *destinationQueue) sendEvent(receipt *shared.Receipt) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
log.Infof("%s is blacklisted; dropping event", oq.destination)
@ -86,9 +89,9 @@ func (oq *destinationQueue) sendEvent(nid int64) {
context.TODO(),
oq.transactionID, // the current transaction ID
oq.destination, // the destination server name
[]int64{nid}, // NID from federationsender_queue_json table
receipt, // NIDs from federationsender_queue_json table
); err != nil {
log.WithError(err).Errorf("failed to associate PDU NID %d with destination %q", nid, oq.destination)
log.WithError(err).Errorf("failed to associate PDU receipt %q with destination %q", receipt.String(), oq.destination)
return
}
// We've successfully added a PDU to the transaction so increase
@ -107,13 +110,34 @@ func (oq *destinationQueue) sendEvent(nid int64) {
// sendEDU adds the EDU event to the pending queue for the destination.
// If the queue is empty then it starts a background goroutine to
// start sending events to that destination.
func (oq *destinationQueue) sendEDU(ev *gomatrixserverlib.EDU) {
func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
if oq.statistics.Blacklisted() {
// If the destination is blacklisted then drop the event.
log.Infof("%s is blacklisted; dropping ephemeral event", oq.destination)
return
}
// Create a database entry that associates the given PDU NID with
// this destination queue. We'll then be able to retrieve the PDU
// later.
if err := oq.db.AssociateEDUWithDestination(
context.TODO(),
oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table
); err != nil {
log.WithError(err).Errorf("failed to associate EDU receipt %q with destination %q", receipt.String(), oq.destination)
return
}
// We've successfully added an EDU to the transaction so increase
// the counter.
oq.transactionCount.Add(1)
// Wake up the queue if it's asleep.
oq.wakeQueueIfNeeded()
oq.incomingEDUs <- ev
// If we're blocking on waiting PDUs then tell the queue that we
// have work to do.
select {
case oq.notifyEDUs <- true:
default:
}
}
// sendInvite adds the invite event to the pending queue for the
@ -166,6 +190,28 @@ func (oq *destinationQueue) waitForPDUs() chan bool {
return oq.notifyPDUs
}
// waitForEDUs returns a channel for pending EDUs, which will be
// used in backgroundSend select. It returns a closed channel if
// there is something pending right now, or an open channel if
// we're waiting for something.
func (oq *destinationQueue) waitForEDUs() chan bool {
pendingEDUs, err := oq.db.GetPendingEDUCount(context.TODO(), oq.destination)
if err != nil {
log.WithError(err).Errorf("Failed to get pending EDU count on queue %q", oq.destination)
}
// If there are EDUs pending right now then we'll return a closed
// channel. This will mean that the backgroundSend will not block.
if pendingEDUs > 0 {
ch := make(chan bool, 1)
close(ch)
return ch
}
// If there are no EDUs pending right now then instead we'll return
// the notify channel, so that backgroundSend can pick up normal
// notifications from sendEvent.
return oq.notifyEDUs
}
// backgroundSend is the worker goroutine for sending events.
// nolint:gocyclo
func (oq *destinationQueue) backgroundSend() {
@ -177,7 +223,7 @@ func (oq *destinationQueue) backgroundSend() {
defer oq.running.Store(false)
for {
pendingPDUs := false
pendingPDUs, pendingEDUs := false, false
// If we have nothing to do then wait either for incoming events, or
// until we hit an idle timeout.
@ -186,18 +232,10 @@ func (oq *destinationQueue) backgroundSend() {
// We were woken up because there are new PDUs waiting in the
// database.
pendingPDUs = true
case edu := <-oq.incomingEDUs:
// EDUs are handled in-memory for now. We will try to keep
// the ordering intact.
// TODO: Certain EDU types need persistence, e.g. send-to-device
oq.pendingEDUs = append(oq.pendingEDUs, edu)
// If there are any more things waiting in the channel queue
// then read them. This is safe because we guarantee only
// having one goroutine per destination queue, so the channel
// isn't being consumed anywhere else.
for len(oq.incomingEDUs) > 0 {
oq.pendingEDUs = append(oq.pendingEDUs, <-oq.incomingEDUs)
}
case <-oq.waitForEDUs():
// We were woken up because there are new PDUs waiting in the
// database.
pendingEDUs = true
case invite := <-oq.incomingInvites:
// There's no strict ordering requirement for invites like
// there is for transactions, so we put the invite onto the
@ -238,16 +276,15 @@ func (oq *destinationQueue) backgroundSend() {
}
// If we have pending PDUs or EDUs then construct a transaction.
if pendingPDUs || len(oq.pendingEDUs) > 0 {
if pendingPDUs || pendingEDUs {
// Try sending the next transaction and see what happens.
transaction, terr := oq.nextTransaction(oq.pendingEDUs)
transaction, terr := oq.nextTransaction()
if terr != nil {
// We failed to send the transaction.
if giveUp := oq.statistics.Failure(); giveUp {
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
oq.cleanPendingEDUs()
oq.cleanPendingInvites()
log.Infof("Blacklisting %q due to errors", oq.destination)
return
@ -265,8 +302,6 @@ func (oq *destinationQueue) backgroundSend() {
// If we successfully sent the transaction then clear out
// the pending events and EDUs, and wipe our transaction ID.
oq.statistics.Success()
// Clean up the in-memory buffers.
oq.cleanPendingEDUs()
}
}
@ -294,15 +329,6 @@ func (oq *destinationQueue) backgroundSend() {
}
}
// cleanPendingEDUs cleans out the pending EDU buffer, removing
// all references so that the underlying objects can be GC'd.
func (oq *destinationQueue) cleanPendingEDUs() {
for i := 0; i < len(oq.pendingEDUs); i++ {
oq.pendingEDUs[i] = nil
}
oq.pendingEDUs = []*gomatrixserverlib.EDU{}
}
// cleanPendingInvites cleans out the pending invite buffer,
// removing all references so that the underlying objects can
// be GC'd.
@ -316,9 +342,8 @@ func (oq *destinationQueue) cleanPendingInvites() {
// nextTransaction creates a new transaction from the pending event
// queue and sends it. Returns true if a transaction was sent or
// false otherwise.
func (oq *destinationQueue) nextTransaction(
pendingEDUs []*gomatrixserverlib.EDU,
) (bool, error) {
// nolint:gocyclo
func (oq *destinationQueue) nextTransaction() (bool, error) {
// Before we do anything, we need to roll over the transaction
// ID that is being used to coalesce events into the next TX.
// Otherwise it's possible that we'll pick up an incomplete
@ -343,7 +368,7 @@ func (oq *destinationQueue) nextTransaction(
// actually retrieve that many events.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
txid, pdus, err := oq.db.GetNextTransactionPDUs(
txid, pdus, pduReceipt, err := oq.db.GetNextTransactionPDUs(
ctx, // context
oq.destination, // server name
maxPDUsPerTransaction, // max events to retrieve
@ -353,9 +378,19 @@ func (oq *destinationQueue) nextTransaction(
return false, fmt.Errorf("oq.db.GetNextTransactionPDUs: %w", err)
}
edus, eduReceipt, err := oq.db.GetNextTransactionEDUs(
ctx, // context
oq.destination, // server name
maxEDUsPerTransaction, // max events to retrieve
)
if err != nil {
log.WithError(err).Errorf("failed to get next transaction EDUs for server %q", oq.destination)
return false, fmt.Errorf("oq.db.GetNextTransactionEDUs: %w", err)
}
// If we didn't get anything from the database and there are no
// pending EDUs then there's nothing to do - stop here.
if len(pdus) == 0 && len(pendingEDUs) == 0 {
if len(pdus) == 0 && len(edus) == 0 {
return false, nil
}
@ -377,7 +412,7 @@ func (oq *destinationQueue) nextTransaction(
}
// Do the same for pending EDUS in the queue.
for _, edu := range pendingEDUs {
for _, edu := range edus {
t.EDUs = append(t.EDUs, *edu)
}
@ -393,12 +428,17 @@ func (oq *destinationQueue) nextTransaction(
switch err.(type) {
case nil:
// Clean up the transaction in the database.
if err = oq.db.CleanTransactionPDUs(
context.Background(),
t.Destination,
t.TransactionID,
); err != nil {
log.WithError(err).Errorf("failed to clean transaction %q for server %q", t.TransactionID, t.Destination)
if pduReceipt != nil {
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipt); err != nil {
log.WithError(err).Errorf("failed to clean PDUs %q for server %q", pduReceipt.String(), t.Destination)
}
}
if eduReceipt != nil {
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipt); err != nil {
log.WithError(err).Errorf("failed to clean EDUs %q for server %q", eduReceipt.String(), t.Destination)
}
}
return true, nil
case gomatrix.HTTPError:

View file

@ -61,12 +61,23 @@ func NewOutgoingQueues(
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
// Look up which servers we have pending items for and then rehydrate those queues.
if serverNames, err := db.GetPendingServerNames(context.Background()); err == nil {
for _, serverName := range serverNames {
queues.getQueue(serverName).wakeQueueIfNeeded()
serverNames := map[gomatrixserverlib.ServerName]struct{}{}
if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
for _, serverName := range names {
serverNames[serverName] = struct{}{}
}
} else {
log.WithError(err).Error("Failed to get server names for destination queue hydration")
log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
}
if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
for _, serverName := range names {
serverNames[serverName] = struct{}{}
}
} else {
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
}
for serverName := range serverNames {
queues.getQueue(serverName).wakeQueueIfNeeded()
}
return queues
}
@ -91,9 +102,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
notifyPDUs: make(chan bool, 1),
notifyEDUs: make(chan bool, 1),
interruptBackoff: make(chan bool),
signing: oqs.signing,
}
@ -196,8 +207,18 @@ func (oqs *OutgoingQueues) SendEDU(
}).Info("Sending EDU event")
}
ephemeralJSON, err := json.Marshal(e)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}
nid, err := oqs.db.StoreJSON(context.TODO(), string(ephemeralJSON))
if err != nil {
return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
}
for _, destination := range destinations {
oqs.getQueue(destination).sendEDU(e)
oqs.getQueue(destination).sendEDU(nid)
}
return nil