mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 06:12:45 +00:00
Merge federationapi
, federationsender
, signingkeyserver
components (#2055)
* Initial federation sender -> federation API refactoring * Move base into own package, avoids import cycle * Fix build errors * Fix tests * Add signing key server tables * Try to fold signing key server into federation API * Fix dendritejs builds * Update embedded interfaces * Fix panic, fix lint error * Update configs, docker * Rename some things * Reuse same keyring on the implementing side * Fix federation tests, `NewBaseDendrite` can accept freeform options * Fix build * Update create_db, configs * Name tables back * Don't rename federationsender consumer for now
This commit is contained in:
parent
6e93531e94
commit
ec716793eb
136 changed files with 1211 additions and 1786 deletions
247
federationapi/storage/shared/storage.go
Normal file
247
federationapi/storage/shared/storage.go
Normal file
|
@ -0,0 +1,247 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/federationapi/types"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
DB *sql.DB
|
||||
Cache caching.FederationCache
|
||||
Writer sqlutil.Writer
|
||||
FederationQueuePDUs tables.FederationQueuePDUs
|
||||
FederationQueueEDUs tables.FederationQueueEDUs
|
||||
FederationQueueJSON tables.FederationQueueJSON
|
||||
FederationJoinedHosts tables.FederationJoinedHosts
|
||||
FederationBlacklist tables.FederationBlacklist
|
||||
FederationOutboundPeeks tables.FederationOutboundPeeks
|
||||
FederationInboundPeeks tables.FederationInboundPeeks
|
||||
NotaryServerKeysJSON tables.FederationNotaryServerKeysJSON
|
||||
NotaryServerKeysMetadata tables.FederationNotaryServerKeysMetadata
|
||||
ServerSigningKeys tables.FederationServerSigningKeys
|
||||
}
|
||||
|
||||
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
|
||||
// We don't actually export the NIDs but we need the caller to be able
|
||||
// to pass them back so that we can clean up if the transaction sends
|
||||
// successfully.
|
||||
type Receipt struct {
|
||||
nid int64
|
||||
}
|
||||
|
||||
func (r *Receipt) String() string {
|
||||
return fmt.Sprintf("%d", r.nid)
|
||||
}
|
||||
|
||||
// UpdateRoom updates the joined hosts for a room and returns what the joined
|
||||
// hosts were before the update, or nil if this was a duplicate message.
|
||||
// This is called when we receive a message from kafka, so we pass in
|
||||
// oldEventID and newEventID to check that we haven't missed any messages or
|
||||
// this isn't a duplicate message.
|
||||
func (d *Database) UpdateRoom(
|
||||
ctx context.Context,
|
||||
roomID, oldEventID, newEventID string,
|
||||
addHosts []types.JoinedHost,
|
||||
removeHosts []string,
|
||||
) (joinedHosts []types.JoinedHost, err error) {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, add := range addHosts {
|
||||
err = d.FederationJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = d.FederationJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// GetJoinedHosts returns the currently joined hosts for room,
|
||||
// as known to federationserver.
|
||||
// Returns an error if something goes wrong.
|
||||
func (d *Database) GetJoinedHosts(
|
||||
ctx context.Context, roomID string,
|
||||
) ([]types.JoinedHost, error) {
|
||||
return d.FederationJoinedHosts.SelectJoinedHosts(ctx, roomID)
|
||||
}
|
||||
|
||||
// GetAllJoinedHosts returns the currently joined hosts for
|
||||
// all rooms known to the federation sender.
|
||||
// Returns an error if something goes wrong.
|
||||
func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
|
||||
return d.FederationJoinedHosts.SelectAllJoinedHosts(ctx)
|
||||
}
|
||||
|
||||
func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) {
|
||||
return d.FederationJoinedHosts.SelectJoinedHostsForRooms(ctx, roomIDs)
|
||||
}
|
||||
|
||||
// StoreJSON adds a JSON blob into the queue JSON table and returns
|
||||
// a NID. The NID will then be used when inserting the per-destination
|
||||
// metadata entries.
|
||||
func (d *Database) StoreJSON(
|
||||
ctx context.Context, js string,
|
||||
) (*Receipt, error) {
|
||||
var nid int64
|
||||
var err error
|
||||
_ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
nid, err = d.FederationQueueJSON.InsertQueueJSON(ctx, txn, js)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("d.insertQueueJSON: %w", err)
|
||||
}
|
||||
return &Receipt{
|
||||
nid: nid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *Database) PurgeRoomState(
|
||||
ctx context.Context, roomID string,
|
||||
) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
// If the event is a create event then we'll delete all of the existing
|
||||
// data for the room. The only reason that a create event would be replayed
|
||||
// to us in this way is if we're about to receive the entire room state.
|
||||
if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil {
|
||||
return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationBlacklist.InsertBlacklist(context.TODO(), txn, serverName)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationBlacklist.DeleteBlacklist(context.TODO(), txn, serverName)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) RemoveAllServersFromBlacklist() error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationBlacklist.DeleteAllBlacklist(context.TODO(), txn)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
|
||||
return d.FederationBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
|
||||
}
|
||||
|
||||
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
|
||||
return d.FederationOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
|
||||
}
|
||||
|
||||
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
|
||||
return d.FederationOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
|
||||
}
|
||||
|
||||
func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
return d.FederationInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
|
||||
return d.FederationInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
|
||||
}
|
||||
|
||||
func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
|
||||
return d.FederationInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
|
||||
}
|
||||
|
||||
func (d *Database) UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
validUntil := serverKeys.ValidUntilTS
|
||||
// Servers MUST use the lesser of this field and 7 days into the future when determining if a key is valid.
|
||||
// This is to avoid a situation where an attacker publishes a key which is valid for a significant amount of
|
||||
// time without a way for the homeserver owner to revoke it.
|
||||
// https://spec.matrix.org/unstable/server-server-api/#querying-keys-through-another-server
|
||||
weekIntoFuture := time.Now().Add(7 * 24 * time.Hour)
|
||||
if weekIntoFuture.Before(validUntil.Time()) {
|
||||
validUntil = gomatrixserverlib.AsTimestamp(weekIntoFuture)
|
||||
}
|
||||
notaryID, err := d.NotaryServerKeysJSON.InsertJSONResponse(ctx, txn, serverKeys, serverName, validUntil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// update the metadata for the keys
|
||||
for keyID := range serverKeys.OldVerifyKeys {
|
||||
_, err = d.NotaryServerKeysMetadata.UpsertKey(ctx, txn, serverName, keyID, notaryID, validUntil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for keyID := range serverKeys.VerifyKeys {
|
||||
_, err = d.NotaryServerKeysMetadata.UpsertKey(ctx, txn, serverName, keyID, notaryID, validUntil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// clean up old responses
|
||||
return d.NotaryServerKeysMetadata.DeleteOldJSONResponses(ctx, txn)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *Database) GetNotaryKeys(
|
||||
ctx context.Context, serverName gomatrixserverlib.ServerName, optKeyIDs []gomatrixserverlib.KeyID,
|
||||
) (sks []gomatrixserverlib.ServerKeys, err error) {
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
sks, err = d.NotaryServerKeysMetadata.SelectKeys(ctx, txn, serverName, optKeyIDs)
|
||||
return err
|
||||
})
|
||||
return sks, err
|
||||
}
|
151
federationapi/storage/shared/storage_edus.go
Normal file
151
federationapi/storage/shared/storage_edus.go
Normal file
|
@ -0,0 +1,151 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// AssociateEDUWithDestination creates an association that the
|
||||
// destination queues will use to determine which JSON blobs to send
|
||||
// to which servers.
|
||||
func (d *Database) AssociateEDUWithDestination(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
receipt *Receipt,
|
||||
) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if err := d.FederationQueueEDUs.InsertQueueEDU(
|
||||
ctx, // context
|
||||
txn, // SQL transaction
|
||||
"", // TODO: EDU type for coalescing
|
||||
serverName, // destination server name
|
||||
receipt.nid, // NID from the federationapi_queue_json table
|
||||
); err != nil {
|
||||
return fmt.Errorf("InsertQueueEDU: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetNextTransactionEDUs retrieves events from the database for
|
||||
// the next pending transaction, up to the limit specified.
|
||||
func (d *Database) GetPendingEDUs(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
limit int,
|
||||
) (
|
||||
edus map[*Receipt]*gomatrixserverlib.EDU,
|
||||
err error,
|
||||
) {
|
||||
edus = make(map[*Receipt]*gomatrixserverlib.EDU)
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
nids, err := d.FederationQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SelectQueueEDUs: %w", err)
|
||||
}
|
||||
|
||||
retrieve := make([]int64, 0, len(nids))
|
||||
for _, nid := range nids {
|
||||
if edu, ok := d.Cache.GetFederationQueuedEDU(nid); ok {
|
||||
edus[&Receipt{nid}] = edu
|
||||
} else {
|
||||
retrieve = append(retrieve, nid)
|
||||
}
|
||||
}
|
||||
|
||||
blobs, err := d.FederationQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||
}
|
||||
|
||||
for nid, blob := range blobs {
|
||||
var event gomatrixserverlib.EDU
|
||||
if err := json.Unmarshal(blob, &event); err != nil {
|
||||
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||
}
|
||||
edus[&Receipt{nid}] = &event
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// CleanEDUs cleans up all specified EDUs. This is done when a
|
||||
// transaction was sent successfully.
|
||||
func (d *Database) CleanEDUs(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
receipts []*Receipt,
|
||||
) error {
|
||||
if len(receipts) == 0 {
|
||||
return errors.New("expected receipt")
|
||||
}
|
||||
|
||||
nids := make([]int64, len(receipts))
|
||||
for i := range receipts {
|
||||
nids[i] = receipts[i].nid
|
||||
}
|
||||
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if err := d.FederationQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var deleteNIDs []int64
|
||||
for _, nid := range nids {
|
||||
count, err := d.FederationQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err)
|
||||
}
|
||||
if count == 0 {
|
||||
deleteNIDs = append(deleteNIDs, nid)
|
||||
d.Cache.EvictFederationQueuedEDU(nid)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deleteNIDs) > 0 {
|
||||
if err := d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||
return fmt.Errorf("DeleteQueueJSON: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetPendingEDUCount returns the number of EDUs waiting to be
|
||||
// sent for a given servername.
|
||||
func (d *Database) GetPendingEDUCount(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
) (int64, error) {
|
||||
return d.FederationQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName)
|
||||
}
|
||||
|
||||
// GetPendingServerNames returns the server names that have EDUs
|
||||
// waiting to be sent.
|
||||
func (d *Database) GetPendingEDUServerNames(
|
||||
ctx context.Context,
|
||||
) ([]gomatrixserverlib.ServerName, error) {
|
||||
return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil)
|
||||
}
|
59
federationapi/storage/shared/storage_keys.go
Normal file
59
federationapi/storage/shared/storage_keys.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
// Copyright 2017-2018 New Vector Ltd
|
||||
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// FetcherName implements KeyFetcher
|
||||
func (d Database) FetcherName() string {
|
||||
return "FederationAPIKeyDatabase"
|
||||
}
|
||||
|
||||
// FetchKeys implements gomatrixserverlib.KeyDatabase
|
||||
func (d *Database) FetchKeys(
|
||||
ctx context.Context,
|
||||
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
|
||||
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
|
||||
return d.ServerSigningKeys.BulkSelectServerKeys(ctx, nil, requests)
|
||||
}
|
||||
|
||||
// StoreKeys implements gomatrixserverlib.KeyDatabase
|
||||
func (d *Database) StoreKeys(
|
||||
ctx context.Context,
|
||||
keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
|
||||
) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
var lastErr error
|
||||
for request, keys := range keyMap {
|
||||
if err := d.ServerSigningKeys.UpsertServerKeys(ctx, txn, request, keys); err != nil {
|
||||
// Rather than returning immediately on error we try to insert the
|
||||
// remaining keys.
|
||||
// Since we are inserting the keys outside of a transaction it is
|
||||
// possible for some of the inserts to succeed even though some
|
||||
// of the inserts have failed.
|
||||
// Ensuring that we always insert all the keys we can means that
|
||||
// this behaviour won't depend on the iteration order of the map.
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
})
|
||||
}
|
159
federationapi/storage/shared/storage_pdus.go
Normal file
159
federationapi/storage/shared/storage_pdus.go
Normal file
|
@ -0,0 +1,159 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// AssociatePDUWithDestination creates an association that the
|
||||
// destination queues will use to determine which JSON blobs to send
|
||||
// to which servers.
|
||||
func (d *Database) AssociatePDUWithDestination(
|
||||
ctx context.Context,
|
||||
transactionID gomatrixserverlib.TransactionID,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
receipt *Receipt,
|
||||
) error {
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if err := d.FederationQueuePDUs.InsertQueuePDU(
|
||||
ctx, // context
|
||||
txn, // SQL transaction
|
||||
transactionID, // transaction ID
|
||||
serverName, // destination server name
|
||||
receipt.nid, // NID from the federationapi_queue_json table
|
||||
); err != nil {
|
||||
return fmt.Errorf("InsertQueuePDU: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetNextTransactionPDUs retrieves events from the database for
|
||||
// the next pending transaction, up to the limit specified.
|
||||
func (d *Database) GetPendingPDUs(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
limit int,
|
||||
) (
|
||||
events map[*Receipt]*gomatrixserverlib.HeaderedEvent,
|
||||
err error,
|
||||
) {
|
||||
// Strictly speaking this doesn't need to be using the writer
|
||||
// since we are only performing selects, but since we don't have
|
||||
// a guarantee of transactional isolation, it's actually useful
|
||||
// to know in SQLite mode that nothing else is trying to modify
|
||||
// the database.
|
||||
events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent)
|
||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
nids, err := d.FederationQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SelectQueuePDUs: %w", err)
|
||||
}
|
||||
|
||||
retrieve := make([]int64, 0, len(nids))
|
||||
for _, nid := range nids {
|
||||
if event, ok := d.Cache.GetFederationQueuedPDU(nid); ok {
|
||||
events[&Receipt{nid}] = event
|
||||
} else {
|
||||
retrieve = append(retrieve, nid)
|
||||
}
|
||||
}
|
||||
|
||||
blobs, err := d.FederationQueueJSON.SelectQueueJSON(ctx, txn, retrieve)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SelectQueueJSON: %w", err)
|
||||
}
|
||||
|
||||
for nid, blob := range blobs {
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
if err := json.Unmarshal(blob, &event); err != nil {
|
||||
return fmt.Errorf("json.Unmarshal: %w", err)
|
||||
}
|
||||
events[&Receipt{nid}] = &event
|
||||
d.Cache.StoreFederationQueuedPDU(nid, &event)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// CleanTransactionPDUs cleans up all associated events for a
|
||||
// given transaction. This is done when the transaction was sent
|
||||
// successfully.
|
||||
func (d *Database) CleanPDUs(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
receipts []*Receipt,
|
||||
) error {
|
||||
if len(receipts) == 0 {
|
||||
return errors.New("expected receipt")
|
||||
}
|
||||
|
||||
nids := make([]int64, len(receipts))
|
||||
for i := range receipts {
|
||||
nids[i] = receipts[i].nid
|
||||
}
|
||||
|
||||
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
if err := d.FederationQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var deleteNIDs []int64
|
||||
for _, nid := range nids {
|
||||
count, err := d.FederationQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err)
|
||||
}
|
||||
if count == 0 {
|
||||
deleteNIDs = append(deleteNIDs, nid)
|
||||
d.Cache.EvictFederationQueuedPDU(nid)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deleteNIDs) > 0 {
|
||||
if err := d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil {
|
||||
return fmt.Errorf("DeleteQueueJSON: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// GetPendingPDUCount returns the number of PDUs waiting to be
|
||||
// sent for a given servername.
|
||||
func (d *Database) GetPendingPDUCount(
|
||||
ctx context.Context,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
) (int64, error) {
|
||||
return d.FederationQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName)
|
||||
}
|
||||
|
||||
// GetPendingServerNames returns the server names that have PDUs
|
||||
// waiting to be sent.
|
||||
func (d *Database) GetPendingPDUServerNames(
|
||||
ctx context.Context,
|
||||
) ([]gomatrixserverlib.ServerName, error) {
|
||||
return d.FederationQueuePDUs.SelectQueuePDUServerNames(ctx, nil)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue