Merge branch 'master' into add-nats-support

This commit is contained in:
Neil Alexander 2021-11-02 17:36:22 +00:00
commit 73d6964fb4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
205 changed files with 5074 additions and 1217 deletions

View file

@ -20,6 +20,7 @@ import (
"fmt"
"github.com/Shopify/sarama"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/internal"
@ -29,7 +30,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)
// KeyChangeConsumer consumes events that originate in key server.
@ -81,10 +82,21 @@ func (t *KeyChangeConsumer) Start() error {
func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var m api.DeviceMessage
if err := json.Unmarshal(msg.Value, &m); err != nil {
log.WithError(err).Errorf("failed to read device message from key change topic")
logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil
}
logger := log.WithField("user_id", m.UserID)
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
case api.TypeDeviceKeyUpdate:
fallthrough
default:
return t.onDeviceKeyMessage(m)
}
}
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
logger := logrus.WithField("user_id", m.UserID)
// only send key change events which originated from us
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
@ -130,7 +142,51 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return err
}
log.Infof("Sending device list update message to %q", destinations)
logrus.Infof("Sending device list update message to %q", destinations)
return t.queues.SendEDU(edu, t.serverName, destinations)
}
func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
return nil
}
if host != gomatrixserverlib.ServerName(t.serverName) {
// Ignore any messages that didn't originate locally, otherwise we'll
// end up parroting information we received from other servers.
return nil
}
logger := logrus.WithField("user_id", output.UserID)
var queryRes roomserverAPI.QueryRoomsForUserResponse
err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
UserID: output.UserID,
WantMembership: "join",
}, &queryRes)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
return nil
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
return nil
}
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: eduserverAPI.MSigningKeyUpdate,
Origin: string(t.serverName),
}
if edu.Content, err = json.Marshal(output); err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
return nil
}
logger.Infof("Sending cross-signing update message to %q", destinations)
return t.queues.SendEDU(edu, t.serverName, destinations)
}

View file

@ -409,7 +409,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
return fmt.Errorf("sanityCheckAuthChain: %w", err)
}
if err = respState.Check(ctx, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName)); err != nil {
return fmt.Errorf("Error checking state returned from peeking: %w", err)
return fmt.Errorf("error checking state returned from peeking: %w", err)
}
// If we've got this far, the remote server is peeking.
@ -523,7 +523,7 @@ func (r *FederationSenderInternalAPI) PerformLeave(
// If we reach here then we didn't complete a leave for some reason.
return fmt.Errorf(
"Failed to leave room %q through %d server(s)",
"failed to leave room %q through %d server(s)",
request.RoomID, len(request.ServerNames),
)
}
@ -713,14 +713,8 @@ func federatedAuthProvider(
}
// Check the signatures of the event.
if res, err := gomatrixserverlib.VerifyEventSignatures(ctx, []*gomatrixserverlib.Event{ev}, keyRing); err != nil {
if err := ev.VerifyEventSignatures(ctx, keyRing); err != nil {
return nil, fmt.Errorf("missingAuth VerifyEventSignatures: %w", err)
} else {
for _, err := range res {
if err != nil {
return nil, fmt.Errorf("missingAuth VerifyEventSignatures: %w", err)
}
}
}
// If the event is OK then add it to the results and the retry map.

View file

@ -29,7 +29,6 @@ import (
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
@ -72,7 +71,7 @@ type destinationQueue struct {
// start sending events to that destination.
func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, receipt *shared.Receipt) {
if event == nil {
log.Errorf("attempt to send nil PDU with destination %q", oq.destination)
logrus.Errorf("attempt to send nil PDU with destination %q", oq.destination)
return
}
// Create a database entry that associates the given PDU NID with
@ -84,7 +83,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table
); err != nil {
log.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
logrus.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination)
return
}
// Check if the destination is blacklisted. If it isn't then wake
@ -116,7 +115,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re
// start sending events to that destination.
func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *shared.Receipt) {
if event == nil {
log.Errorf("attempt to send nil EDU with destination %q", oq.destination)
logrus.Errorf("attempt to send nil EDU with destination %q", oq.destination)
return
}
// Create a database entry that associates the given PDU NID with
@ -127,7 +126,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
oq.destination, // the destination server name
receipt, // NIDs from federationsender_queue_json table
); err != nil {
log.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
logrus.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination)
return
}
// Check if the destination is blacklisted. If it isn't then wake
@ -281,7 +280,7 @@ func (oq *destinationQueue) backgroundSend() {
// It's been suggested that we should give up because the backoff
// has exceeded a maximum allowable value. Clean up the in-memory
// buffers at this point. The PDU clean-up is already on a defer.
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
logrus.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
oq.pendingMutex.Lock()
for i := range oq.pendingPDUs {
oq.pendingPDUs[i] = nil
@ -298,7 +297,7 @@ func (oq *destinationQueue) backgroundSend() {
// We haven't backed off yet, so wait for the suggested amount of
// time.
duration := time.Until(*until)
log.Warnf("Backing off %q for %s", oq.destination, duration)
logrus.Warnf("Backing off %q for %s", oq.destination, duration)
oq.backingOff.Store(true)
destinationQueueBackingOff.Inc()
select {
@ -421,13 +420,13 @@ func (oq *destinationQueue) nextTransaction(
if pduReceipts != nil {
//logrus.Infof("Cleaning PDUs %q", pduReceipt.String())
if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil {
log.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination)
}
}
if eduReceipts != nil {
//logrus.Infof("Cleaning EDUs %q", eduReceipt.String())
if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil {
log.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination)
}
}
// Reset the transaction ID.
@ -440,9 +439,9 @@ func (oq *destinationQueue) nextTransaction(
// will retry again, subject to backoff.
return false, 0, 0, err
default:
log.WithFields(log.Fields{
"destination": oq.destination,
log.ErrorKey: err,
logrus.WithFields(logrus.Fields{
"destination": oq.destination,
logrus.ErrorKey: err,
}).Debugf("Failed to send transaction %q", t.TransactionID)
return false, 0, 0, err
}

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !wasm
// +build !wasm
package storage