Merge branch 'master' into nats

This commit is contained in:
Neil Alexander 2021-11-24 11:00:21 +00:00
commit 843139c4f1
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
146 changed files with 1619 additions and 2031 deletions

View file

@ -0,0 +1,304 @@
package internal
import (
"context"
"crypto/ed25519"
"encoding/base64"
"sync"
"time"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/cache"
"github.com/matrix-org/dendrite/internal/caching"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// FederationInternalAPI is an implementation of api.FederationInternalAPI
type FederationInternalAPI struct {
db storage.Database
cfg *config.FederationAPI
statistics *statistics.Statistics
rsAPI roomserverAPI.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
queues *queue.OutgoingQueues
joins sync.Map // joins currently in progress
}
func NewFederationInternalAPI(
db storage.Database, cfg *config.FederationAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
statistics *statistics.Statistics,
caches *caching.Caches,
queues *queue.OutgoingQueues,
) *FederationInternalAPI {
serverKeyDB, err := cache.NewKeyDatabase(db, caches)
if err != nil {
logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database")
}
keyRing := &gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{},
KeyDatabase: serverKeyDB,
}
addDirectFetcher := func() {
keyRing.KeyFetchers = append(
keyRing.KeyFetchers,
&gomatrixserverlib.DirectKeyFetcher{
Client: federation,
},
)
}
if cfg.PreferDirectFetch {
addDirectFetcher()
} else {
defer addDirectFetcher()
}
var b64e = base64.StdEncoding.WithPadding(base64.NoPadding)
for _, ps := range cfg.KeyPerspectives {
perspective := &gomatrixserverlib.PerspectiveKeyFetcher{
PerspectiveServerName: ps.ServerName,
PerspectiveServerKeys: map[gomatrixserverlib.KeyID]ed25519.PublicKey{},
Client: federation,
}
for _, key := range ps.Keys {
rawkey, err := b64e.DecodeString(key.PublicKey)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": ps.ServerName,
"public_key": key.PublicKey,
}).Warn("Couldn't parse perspective key")
continue
}
perspective.PerspectiveServerKeys[key.KeyID] = rawkey
}
keyRing.KeyFetchers = append(keyRing.KeyFetchers, perspective)
logrus.WithFields(logrus.Fields{
"server_name": ps.ServerName,
"num_public_keys": len(ps.Keys),
}).Info("Enabled perspective key fetcher")
}
return &FederationInternalAPI{
db: db,
cfg: cfg,
rsAPI: rsAPI,
keyRing: keyRing,
federation: federation,
statistics: statistics,
queues: queues,
}
}
func (a *FederationInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) {
stats := a.statistics.ForServer(s)
until, blacklisted := stats.BackoffInfo()
if blacklisted {
return stats, &api.FederationClientError{
Blacklisted: true,
}
}
now := time.Now()
if until != nil && now.Before(*until) {
return stats, &api.FederationClientError{
RetryAfter: time.Until(*until),
}
}
return stats, nil
}
func failBlacklistableError(err error, stats *statistics.ServerStatistics) (until time.Time, blacklisted bool) {
if err == nil {
return
}
mxerr, ok := err.(gomatrix.HTTPError)
if !ok {
return stats.Failure()
}
if mxerr.Code == 401 { // invalid signature in X-Matrix header
return stats.Failure()
}
if mxerr.Code >= 500 && mxerr.Code < 600 { // internal server errors
return stats.Failure()
}
return
}
func (a *FederationInternalAPI) doRequest(
s gomatrixserverlib.ServerName, request func() (interface{}, error),
) (interface{}, error) {
stats, err := a.isBlacklistedOrBackingOff(s)
if err != nil {
return nil, err
}
res, err := request()
if err != nil {
until, blacklisted := failBlacklistableError(err, stats)
now := time.Now()
var retryAfter time.Duration
if until.After(now) {
retryAfter = time.Until(until)
}
return res, &api.FederationClientError{
Err: err.Error(),
Blacklisted: blacklisted,
RetryAfter: retryAfter,
}
}
stats.Success()
return res, nil
}
func (a *FederationInternalAPI) GetUserDevices(
ctx context.Context, s gomatrixserverlib.ServerName, userID string,
) (gomatrixserverlib.RespUserDevices, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.GetUserDevices(ctx, s, userID)
})
if err != nil {
return gomatrixserverlib.RespUserDevices{}, err
}
return ires.(gomatrixserverlib.RespUserDevices), nil
}
func (a *FederationInternalAPI) ClaimKeys(
ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string,
) (gomatrixserverlib.RespClaimKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
})
if err != nil {
return gomatrixserverlib.RespClaimKeys{}, err
}
return ires.(gomatrixserverlib.RespClaimKeys), nil
}
func (a *FederationInternalAPI) QueryKeys(
ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
) (gomatrixserverlib.RespQueryKeys, error) {
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.QueryKeys(ctx, s, keys)
})
if err != nil {
return gomatrixserverlib.RespQueryKeys{}, err
}
return ires.(gomatrixserverlib.RespQueryKeys), nil
}
func (a *FederationInternalAPI) Backfill(
ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string,
) (res gomatrixserverlib.Transaction, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.Backfill(ctx, s, roomID, limit, eventIDs)
})
if err != nil {
return gomatrixserverlib.Transaction{}, err
}
return ires.(gomatrixserverlib.Transaction), nil
}
func (a *FederationInternalAPI) LookupState(
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (res gomatrixserverlib.RespState, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion)
})
if err != nil {
return gomatrixserverlib.RespState{}, err
}
return ires.(gomatrixserverlib.RespState), nil
}
func (a *FederationInternalAPI) LookupStateIDs(
ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
) (res gomatrixserverlib.RespStateIDs, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.LookupStateIDs(ctx, s, roomID, eventID)
})
if err != nil {
return gomatrixserverlib.RespStateIDs{}, err
}
return ires.(gomatrixserverlib.RespStateIDs), nil
}
func (a *FederationInternalAPI) GetEvent(
ctx context.Context, s gomatrixserverlib.ServerName, eventID string,
) (res gomatrixserverlib.Transaction, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.GetEvent(ctx, s, eventID)
})
if err != nil {
return gomatrixserverlib.Transaction{}, err
}
return ires.(gomatrixserverlib.Transaction), nil
}
func (a *FederationInternalAPI) LookupServerKeys(
ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) ([]gomatrixserverlib.ServerKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.LookupServerKeys(ctx, s, keyRequests)
})
if err != nil {
return []gomatrixserverlib.ServerKeys{}, err
}
return ires.([]gomatrixserverlib.ServerKeys), nil
}
func (a *FederationInternalAPI) MSC2836EventRelationships(
ctx context.Context, s gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest,
roomVersion gomatrixserverlib.RoomVersion,
) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.MSC2836EventRelationships(ctx, s, r, roomVersion)
})
if err != nil {
return res, err
}
return ires.(gomatrixserverlib.MSC2836EventRelationshipsResponse), nil
}
func (a *FederationInternalAPI) MSC2946Spaces(
ctx context.Context, s gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest,
) (res gomatrixserverlib.MSC2946SpacesResponse, err error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ires, err := a.doRequest(s, func() (interface{}, error) {
return a.federation.MSC2946Spaces(ctx, s, roomID, r)
})
if err != nil {
return res, err
}
return ires.(gomatrixserverlib.MSC2946SpacesResponse), nil
}

View file

@ -0,0 +1,248 @@
package internal
import (
"context"
"crypto/ed25519"
"fmt"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
func (s *FederationInternalAPI) KeyRing() *gomatrixserverlib.KeyRing {
// Return a keyring that forces requests to be proxied through the
// below functions. That way we can enforce things like validity
// and keeping the cache up-to-date.
return s.keyRing
}
func (s *FederationInternalAPI) StoreKeys(
_ context.Context,
results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
// Run in a background context - we don't want to stop this work just
// because the caller gives up waiting.
ctx := context.Background()
// Store any keys that we were given in our database.
return s.keyRing.KeyDatabase.StoreKeys(ctx, results)
}
func (s *FederationInternalAPI) FetchKeys(
_ context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
// Run in a background context - we don't want to stop this work just
// because the caller gives up waiting.
ctx := context.Background()
now := gomatrixserverlib.AsTimestamp(time.Now())
results := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{}
origRequests := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp{}
for k, v := range requests {
origRequests[k] = v
}
// First, check if any of these key checks are for our own keys. If
// they are then we will satisfy them directly.
s.handleLocalKeys(ctx, requests, results)
// Then consult our local database and see if we have the requested
// keys. These might come from a cache, depending on the database
// implementation used.
if err := s.handleDatabaseKeys(ctx, now, requests, results); err != nil {
return nil, err
}
// For any key requests that we still have outstanding, next try to
// fetch them directly. We'll go through each of the key fetchers to
// ask for the remaining keys
for _, fetcher := range s.keyRing.KeyFetchers {
// If there are no more keys to look up then stop.
if len(requests) == 0 {
break
}
// Ask the fetcher to look up our keys.
if err := s.handleFetcherKeys(ctx, now, fetcher, requests, results); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"fetcher_name": fetcher.FetcherName(),
}).Errorf("Failed to retrieve %d key(s)", len(requests))
continue
}
}
// Check that we've actually satisfied all of the key requests that we
// were given. We should report an error if we didn't.
for req := range origRequests {
if _, ok := results[req]; !ok {
// The results don't contain anything for this specific request, so
// we've failed to satisfy it from local keys, database keys or from
// all of the fetchers. Report an error.
logrus.Warnf("Failed to retrieve key %q for server %q", req.KeyID, req.ServerName)
}
}
// Return the keys.
return results, nil
}
func (s *FederationInternalAPI) FetcherName() string {
return fmt.Sprintf("FederationInternalAPI (wrapping %q)", s.keyRing.KeyDatabase.FetcherName())
}
// handleLocalKeys handles cases where the key request contains
// a request for our own server keys, either current or old.
func (s *FederationInternalAPI) handleLocalKeys(
_ context.Context,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) {
for req := range requests {
if req.ServerName != s.cfg.Matrix.ServerName {
continue
}
if req.KeyID == s.cfg.Matrix.KeyID {
// We found a key request that is supposed to be for our own
// keys. Remove it from the request list so we don't hit the
// database or the fetchers for it.
delete(requests, req)
// Insert our own key into the response.
results[req] = gomatrixserverlib.PublicKeyLookupResult{
VerifyKey: gomatrixserverlib.VerifyKey{
Key: gomatrixserverlib.Base64Bytes(s.cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey)),
},
ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(s.cfg.Matrix.KeyValidityPeriod)),
}
} else {
// The key request doesn't match our current key. Let's see
// if it matches any of our old verify keys.
for _, oldVerifyKey := range s.cfg.Matrix.OldVerifyKeys {
if req.KeyID == oldVerifyKey.KeyID {
// We found a key request that is supposed to be an expired
// key.
delete(requests, req)
// Insert our own key into the response.
results[req] = gomatrixserverlib.PublicKeyLookupResult{
VerifyKey: gomatrixserverlib.VerifyKey{
Key: gomatrixserverlib.Base64Bytes(oldVerifyKey.PrivateKey.Public().(ed25519.PublicKey)),
},
ExpiredTS: oldVerifyKey.ExpiredAt,
ValidUntilTS: gomatrixserverlib.PublicKeyNotValid,
}
// No need to look at the other keys.
break
}
}
}
}
}
// handleDatabaseKeys handles cases where the key requests can be
// satisfied from our local database/cache.
func (s *FederationInternalAPI) handleDatabaseKeys(
ctx context.Context,
now gomatrixserverlib.Timestamp,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
// Ask the database/cache for the keys.
dbResults, err := s.keyRing.KeyDatabase.FetchKeys(ctx, requests)
if err != nil {
return err
}
// We successfully got some keys. Add them to the results.
for req, res := range dbResults {
// The key we've retrieved from the database/cache might
// have passed its validity period, but right now, it's
// the best thing we've got, and it might be sufficient to
// verify a past event.
results[req] = res
// If the key is valid right now then we can also remove it
// from the request list as we don't need to fetch it again
// in that case. If the key isn't valid right now, then by
// leaving it in the 'requests' map, we'll try to update the
// key using the fetchers in handleFetcherKeys.
if res.WasValidAt(now, true) {
delete(requests, req)
}
}
return nil
}
// handleFetcherKeys handles cases where a fetcher can satisfy
// the remaining requests.
func (s *FederationInternalAPI) handleFetcherKeys(
ctx context.Context,
_ gomatrixserverlib.Timestamp,
fetcher gomatrixserverlib.KeyFetcher,
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult,
) error {
logrus.WithFields(logrus.Fields{
"fetcher_name": fetcher.FetcherName(),
}).Infof("Fetching %d key(s)", len(requests))
// Create a context that limits our requests to 30 seconds.
fetcherCtx, fetcherCancel := context.WithTimeout(ctx, time.Second*30)
defer fetcherCancel()
// Try to fetch the keys.
fetcherResults, err := fetcher.FetchKeys(fetcherCtx, requests)
if err != nil {
return fmt.Errorf("fetcher.FetchKeys: %w", err)
}
// Build a map of the results that we want to commit to the
// database. We do this in a separate map because otherwise we
// might end up trying to rewrite database entries.
storeResults := map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{}
// Now let's look at the results that we got from this fetcher.
for req, res := range fetcherResults {
if prev, ok := results[req]; ok {
// We've already got a previous entry for this request
// so let's see if the newly retrieved one contains a more
// up-to-date validity period.
if res.ValidUntilTS > prev.ValidUntilTS {
// This key is newer than the one we had so let's store
// it in the database.
storeResults[req] = res
}
} else {
// We didn't already have a previous entry for this request
// so store it in the database anyway for now.
storeResults[req] = res
}
// Update the results map with this new result. If nothing
// else, we can try verifying against this key.
results[req] = res
// Remove it from the request list so we won't re-fetch it.
delete(requests, req)
}
// Store the keys from our store map.
if err = s.keyRing.KeyDatabase.StoreKeys(context.Background(), storeResults); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"fetcher_name": fetcher.FetcherName(),
"database_name": s.keyRing.KeyDatabase.FetcherName(),
}).Errorf("Failed to store keys in the database")
return fmt.Errorf("server key API failed to store retrieved keys: %w", err)
}
if len(storeResults) > 0 {
logrus.WithFields(logrus.Fields{
"fetcher_name": fetcher.FetcherName(),
}).Infof("Updated %d of %d key(s) in database (%d keys remaining)", len(storeResults), len(results), len(requests))
}
return nil
}

View file

@ -0,0 +1,727 @@
package internal
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/matrix-org/dendrite/federationapi/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
// PerformLeaveRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformDirectoryLookup(
ctx context.Context,
request *api.PerformDirectoryLookupRequest,
response *api.PerformDirectoryLookupResponse,
) (err error) {
dir, err := r.federation.LookupRoomAlias(
ctx,
request.ServerName,
request.RoomAlias,
)
if err != nil {
r.statistics.ForServer(request.ServerName).Failure()
return err
}
response.RoomID = dir.RoomID
response.ServerNames = dir.Servers
r.statistics.ForServer(request.ServerName).Success()
return nil
}
type federatedJoin struct {
UserID string
RoomID string
}
// PerformJoin implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformJoin(
ctx context.Context,
request *api.PerformJoinRequest,
response *api.PerformJoinResponse,
) {
// Check that a join isn't already in progress for this user/room.
j := federatedJoin{request.UserID, request.RoomID}
if _, found := r.joins.Load(j); found {
response.LastError = &gomatrix.HTTPError{
Code: 429,
Message: `{
"errcode": "M_LIMIT_EXCEEDED",
"error": "There is already a federated join to this room in progress. Please wait for it to finish."
}`, // TODO: Why do none of our error types play nicely with each other?
}
return
}
r.joins.Store(j, nil)
defer r.joins.Delete(j)
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[gomatrixserverlib.ServerName]bool)
var uniqueList []gomatrixserverlib.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// Try each server that we were provided until we land on one that
// successfully completes the make-join send-join dance.
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performJoinUsingServer(
ctx,
request.RoomID,
request.UserID,
request.Content,
serverName,
supportedVersions,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to join room through server")
lastErr = err
continue
}
// We're all good.
response.JoinedVia = serverName
return
}
// If we reach here then we didn't complete a join for some reason.
var httpErr gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
// Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
httpErr.WrappedError = nil
response.LastError = &httpErr
} else {
response.LastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: "Unknown HTTP error",
}
if lastErr != nil {
response.LastError.Message = lastErr.Error()
}
}
logrus.Errorf(
"failed to join user %q to room %q through %d server(s): last error %s",
request.UserID, request.RoomID, len(request.ServerNames), lastErr,
)
}
func (r *FederationInternalAPI) performJoinUsingServer(
ctx context.Context,
roomID, userID string,
content map[string]interface{},
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
) error {
// Try to perform a make_join using the information supplied in the
// request.
respMakeJoin, err := r.federation.MakeJoin(
ctx,
serverName,
roomID,
userID,
supportedVersions,
)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.MakeJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
respMakeJoin.JoinEvent.Type = gomatrixserverlib.MRoomMember
respMakeJoin.JoinEvent.Sender = userID
respMakeJoin.JoinEvent.StateKey = &userID
respMakeJoin.JoinEvent.RoomID = roomID
respMakeJoin.JoinEvent.Redacts = ""
if content == nil {
content = map[string]interface{}{}
}
content["membership"] = "join"
if err = respMakeJoin.JoinEvent.SetContent(content); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
}
if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
}
// Work out if we support the room version that has been supplied in
// the make_join response.
// "If not provided, the room version is assumed to be either "1" or "2"."
// https://matrix.org/docs/spec/server_server/unstable#get-matrix-federation-v1-make-join-roomid-userid
if respMakeJoin.RoomVersion == "" {
respMakeJoin.RoomVersion = setDefaultRoomVersionFromJoinEvent(respMakeJoin.JoinEvent)
}
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
}
// Build the join event.
event, err := respMakeJoin.JoinEvent.Build(
time.Now(),
r.cfg.Matrix.ServerName,
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
// No longer reuse the request context from this point forward.
// We don't want the client timing out to interrupt the join.
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(context.Background())
// Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin(
ctx,
serverName,
event,
respMakeJoin.RoomVersion,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
cancel()
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Sanity-check the join response to ensure that it has a create
// event, that the room version is known, etc.
if err := sanityCheckAuthChain(respSendJoin.AuthEvents); err != nil {
cancel()
return fmt.Errorf("sanityCheckAuthChain: %w", err)
}
// Process the join response in a goroutine. The idea here is
// that we'll try and wait for as long as possible for the work
// to complete, but if the client does give up waiting, we'll
// still continue to process the join anyway so that we don't
// waste the effort.
go func() {
defer cancel()
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
respState, err := respSendJoin.Check(ctx, r.keyRing, event, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName))
if err != nil {
logrus.WithFields(logrus.Fields{
"room_id": roomID,
"user_id": userID,
}).WithError(err).Error("Failed to process room join response")
return
}
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
roomserverAPI.KindNew,
respState,
event.Headered(respMakeJoin.RoomVersion),
nil, false,
); err != nil {
logrus.WithFields(logrus.Fields{
"room_id": roomID,
"user_id": userID,
}).WithError(err).Error("Failed to send room join response to roomserver")
return
}
}()
<-ctx.Done()
return nil
}
// PerformOutboundPeekRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformOutboundPeek(
ctx context.Context,
request *api.PerformOutboundPeekRequest,
response *api.PerformOutboundPeekResponse,
) error {
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Deduplicate the server names we were provided but keep the ordering
// as this encodes useful information about which servers are most likely
// to respond.
seenSet := make(map[gomatrixserverlib.ServerName]bool)
var uniqueList []gomatrixserverlib.ServerName
for _, srv := range request.ServerNames {
if seenSet[srv] {
continue
}
seenSet[srv] = true
uniqueList = append(uniqueList, srv)
}
request.ServerNames = uniqueList
// See if there's an existing outbound peek for this room ID with
// one of the specified servers.
if peeks, err := r.db.GetOutboundPeeks(ctx, request.RoomID); err == nil {
for _, peek := range peeks {
if _, ok := seenSet[peek.ServerName]; ok {
return nil
}
}
}
// Try each server that we were provided until we land on one that
// successfully completes the peek
var lastErr error
for _, serverName := range request.ServerNames {
if err := r.performOutboundPeekUsingServer(
ctx,
request.RoomID,
serverName,
supportedVersions,
); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"server_name": serverName,
"room_id": request.RoomID,
}).Warnf("Failed to peek room through server")
lastErr = err
continue
}
// We're all good.
return nil
}
// If we reach here then we didn't complete a peek for some reason.
var httpErr gomatrix.HTTPError
if ok := errors.As(lastErr, &httpErr); ok {
httpErr.Message = string(httpErr.Contents)
// Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
httpErr.WrappedError = nil
response.LastError = &httpErr
} else {
response.LastError = &gomatrix.HTTPError{
Code: 0,
WrappedError: nil,
Message: lastErr.Error(),
}
}
logrus.Errorf(
"failed to peek room %q through %d server(s): last error %s",
request.RoomID, len(request.ServerNames), lastErr,
)
return lastErr
}
func (r *FederationInternalAPI) performOutboundPeekUsingServer(
ctx context.Context,
roomID string,
serverName gomatrixserverlib.ServerName,
supportedVersions []gomatrixserverlib.RoomVersion,
) error {
// create a unique ID for this peek.
// for now we just use the room ID again. In future, if we ever
// support concurrent peeks to the same room with different filters
// then we would need to disambiguate further.
peekID := roomID
// check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have.
outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
if err != nil {
return err
}
renewing := false
if outboundPeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true
} else {
logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
return nil
}
}
// Try to perform an outbound /peek using the information supplied in the
// request.
respPeek, err := r.federation.Peek(
ctx,
serverName,
roomID,
peekID,
supportedVersions,
)
if err != nil {
r.statistics.ForServer(serverName).Failure()
return fmt.Errorf("r.federation.Peek: %w", err)
}
r.statistics.ForServer(serverName).Success()
// Work out if we support the room version that has been supplied in
// the peek response.
if respPeek.RoomVersion == "" {
respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
}
// we have the peek state now so let's process regardless of whether upstream gives up
ctx = context.Background()
respState := respPeek.ToRespState()
// authenticate the state returned (check its auth events etc)
// the equivalent of CheckSendJoinResponse()
if err = sanityCheckAuthChain(respState.AuthEvents); err != nil {
return fmt.Errorf("sanityCheckAuthChain: %w", err)
}
if err = respState.Check(ctx, r.keyRing, federatedAuthProvider(ctx, r.federation, r.keyRing, serverName)); err != nil {
return fmt.Errorf("error checking state returned from peeking: %w", err)
}
// If we've got this far, the remote server is peeking.
if renewing {
if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
} else {
if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
}
// logrus.Warnf("got respPeek %#v", respPeek)
// Send the newly returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
roomserverAPI.KindNew,
&respState,
respPeek.LatestEvent.Headered(respPeek.RoomVersion),
nil, false,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
return nil
}
// PerformLeaveRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
response *api.PerformLeaveResponse,
) (err error) {
// Deduplicate the server names we were provided.
util.SortAndUnique(request.ServerNames)
// Try each server that we were provided until we land on one that
// successfully completes the make-leave send-leave dance.
for _, serverName := range request.ServerNames {
// Try to perform a make_leave using the information supplied in the
// request.
respMakeLeave, err := r.federation.MakeLeave(
ctx,
serverName,
request.RoomID,
request.UserID,
)
if err != nil {
// TODO: Check if the user was not allowed to leave the room.
logrus.WithError(err).Warnf("r.federation.MakeLeave failed")
r.statistics.ForServer(serverName).Failure()
continue
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
respMakeLeave.LeaveEvent.Type = gomatrixserverlib.MRoomMember
respMakeLeave.LeaveEvent.Sender = request.UserID
respMakeLeave.LeaveEvent.StateKey = &request.UserID
respMakeLeave.LeaveEvent.RoomID = request.RoomID
respMakeLeave.LeaveEvent.Redacts = ""
if respMakeLeave.LeaveEvent.Content == nil {
content := map[string]interface{}{
"membership": "leave",
}
if err = respMakeLeave.LeaveEvent.SetContent(content); err != nil {
logrus.WithError(err).Warnf("respMakeLeave.LeaveEvent.SetContent failed")
continue
}
}
if err = respMakeLeave.LeaveEvent.SetUnsigned(struct{}{}); err != nil {
logrus.WithError(err).Warnf("respMakeLeave.LeaveEvent.SetUnsigned failed")
continue
}
// Work out if we support the room version that has been supplied in
// the make_leave response.
if _, err = respMakeLeave.RoomVersion.EventFormat(); err != nil {
return gomatrixserverlib.UnsupportedRoomVersionError{}
}
// Build the leave event.
event, err := respMakeLeave.LeaveEvent.Build(
time.Now(),
r.cfg.Matrix.ServerName,
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
respMakeLeave.RoomVersion,
)
if err != nil {
logrus.WithError(err).Warnf("respMakeLeave.LeaveEvent.Build failed")
continue
}
// Try to perform a send_leave using the newly built event.
err = r.federation.SendLeave(
ctx,
serverName,
event,
)
if err != nil {
logrus.WithError(err).Warnf("r.federation.SendLeave failed")
r.statistics.ForServer(serverName).Failure()
continue
}
r.statistics.ForServer(serverName).Success()
return nil
}
// If we reach here then we didn't complete a leave for some reason.
return fmt.Errorf(
"failed to leave room %q through %d server(s)",
request.RoomID, len(request.ServerNames),
)
}
// PerformLeaveRequest implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformInvite(
ctx context.Context,
request *api.PerformInviteRequest,
response *api.PerformInviteResponse,
) (err error) {
if request.Event.StateKey() == nil {
return errors.New("invite must be a state event")
}
_, destination, err := gomatrixserverlib.SplitID('@', *request.Event.StateKey())
if err != nil {
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
logrus.WithFields(logrus.Fields{
"event_id": request.Event.EventID(),
"user_id": *request.Event.StateKey(),
"room_id": request.Event.RoomID(),
"room_version": request.RoomVersion,
"destination": destination,
}).Info("Sending invite")
inviteReq, err := gomatrixserverlib.NewInviteV2Request(request.Event, request.InviteRoomState)
if err != nil {
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
}
inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq)
if err != nil {
return fmt.Errorf("r.federation.SendInviteV2: %w", err)
}
response.Event = inviteRes.Event.Headered(request.RoomVersion)
return nil
}
// PerformServersAlive implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformServersAlive(
ctx context.Context,
request *api.PerformServersAliveRequest,
response *api.PerformServersAliveResponse,
) (err error) {
for _, srv := range request.Servers {
_ = r.db.RemoveServerFromBlacklist(srv)
r.queues.RetryServer(srv)
}
return nil
}
// PerformServersAlive implements api.FederationInternalAPI
func (r *FederationInternalAPI) PerformBroadcastEDU(
ctx context.Context,
request *api.PerformBroadcastEDURequest,
response *api.PerformBroadcastEDUResponse,
) (err error) {
destinations, err := r.db.GetAllJoinedHosts(ctx)
if err != nil {
return fmt.Errorf("r.db.GetAllJoinedHosts: %w", err)
}
if len(destinations) == 0 {
return nil
}
logrus.WithContext(ctx).Infof("Sending wake-up EDU to %d destination(s)", len(destinations))
edu := &gomatrixserverlib.EDU{
Type: "org.matrix.dendrite.wakeup",
Origin: string(r.cfg.Matrix.ServerName),
}
if err = r.queues.SendEDU(edu, r.cfg.Matrix.ServerName, destinations); err != nil {
return fmt.Errorf("r.queues.SendEDU: %w", err)
}
wakeReq := &api.PerformServersAliveRequest{
Servers: destinations,
}
wakeRes := &api.PerformServersAliveResponse{}
if err := r.PerformServersAlive(ctx, wakeReq, wakeRes); err != nil {
return fmt.Errorf("r.PerformServersAlive: %w", err)
}
return nil
}
func sanityCheckAuthChain(authChain []*gomatrixserverlib.Event) error {
// sanity check we have a create event and it has a known room version
for _, ev := range authChain {
if ev.Type() == gomatrixserverlib.MRoomCreate && ev.StateKeyEquals("") {
// make sure the room version is known
content := ev.Content()
verBody := struct {
Version string `json:"room_version"`
}{}
err := json.Unmarshal(content, &verBody)
if err != nil {
return err
}
if verBody.Version == "" {
// https://matrix.org/docs/spec/client_server/r0.6.0#m-room-create
// The version of the room. Defaults to "1" if the key does not exist.
verBody.Version = "1"
}
knownVersions := gomatrixserverlib.RoomVersions()
if _, ok := knownVersions[gomatrixserverlib.RoomVersion(verBody.Version)]; !ok {
return fmt.Errorf("auth chain m.room.create event has an unknown room version: %s", verBody.Version)
}
return nil
}
}
return fmt.Errorf("auth chain response is missing m.room.create event")
}
func setDefaultRoomVersionFromJoinEvent(joinEvent gomatrixserverlib.EventBuilder) gomatrixserverlib.RoomVersion {
// if auth events are not event references we know it must be v3+
// we have to do these shenanigans to satisfy sytest, specifically for:
// "Outbound federation rejects m.room.create events with an unknown room version"
hasEventRefs := true
authEvents, ok := joinEvent.AuthEvents.([]interface{})
if ok {
if len(authEvents) > 0 {
_, ok = authEvents[0].(string)
if ok {
// event refs are objects, not strings, so we know we must be dealing with a v3+ room.
hasEventRefs = false
}
}
}
if hasEventRefs {
return gomatrixserverlib.RoomVersionV1
}
return gomatrixserverlib.RoomVersionV4
}
// FederatedAuthProvider is an auth chain provider which fetches events from the server provided
func federatedAuthProvider(
ctx context.Context, federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.JSONVerifier, server gomatrixserverlib.ServerName,
) gomatrixserverlib.AuthChainProvider {
// A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join.
retries := map[string][]*gomatrixserverlib.Event{}
// Define a function which we can pass to Check to retrieve missing
// auth events inline. This greatly increases our chances of not having
// to repeat the entire set of checks just for a missing event or two.
return func(roomVersion gomatrixserverlib.RoomVersion, eventIDs []string) ([]*gomatrixserverlib.Event, error) {
returning := []*gomatrixserverlib.Event{}
// See if we have retry entries for each of the supplied event IDs.
for _, eventID := range eventIDs {
// If we've already satisfied a request for this event ID before then
// just append the results. We won't retry the request.
if retry, ok := retries[eventID]; ok {
if retry == nil {
return nil, fmt.Errorf("missingAuth: not retrying failed event ID %q", eventID)
}
returning = append(returning, retry...)
continue
}
// Make a note of the fact that we tried to do something with this
// event ID, even if we don't succeed.
retries[eventID] = nil
// Try to retrieve the event from the server that sent us the send
// join response.
tx, txerr := federation.GetEvent(ctx, server, eventID)
if txerr != nil {
return nil, fmt.Errorf("missingAuth r.federation.GetEvent: %w", txerr)
}
// For each event returned, add it to the set of return events. We
// also will populate the retries, in case someone asks for this
// event ID again.
for _, pdu := range tx.PDUs {
// Try to parse the event.
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if everr != nil {
return nil, fmt.Errorf("missingAuth gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
}
// Check the signatures of the event.
if err := ev.VerifyEventSignatures(ctx, keyRing); err != nil {
return nil, fmt.Errorf("missingAuth VerifyEventSignatures: %w", err)
}
// If the event is OK then add it to the results and the retry map.
returning = append(returning, ev)
retries[ev.EventID()] = append(retries[ev.EventID()], ev)
}
}
return returning, nil
}
}

View file

@ -0,0 +1,97 @@
package internal
import (
"context"
"fmt"
"time"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// QueryJoinedHostServerNamesInRoom implements api.FederationInternalAPI
func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *api.QueryJoinedHostServerNamesInRoomRequest,
response *api.QueryJoinedHostServerNamesInRoomResponse,
) (err error) {
joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID})
if err != nil {
return
}
response.ServerNames = joinedHosts
return
}
func (a *FederationInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
ires, err := a.doRequest(serverName, func() (interface{}, error) {
return a.federation.GetServerKeys(ctx, serverName)
})
if err != nil {
return nil, err
}
sks := ires.(gomatrixserverlib.ServerKeys)
return &sks, nil
}
func (a *FederationInternalAPI) fetchServerKeysFromCache(
ctx context.Context, req *api.QueryServerKeysRequest,
) ([]gomatrixserverlib.ServerKeys, error) {
var results []gomatrixserverlib.ServerKeys
for keyID, criteria := range req.KeyIDToCriteria {
serverKeysResponses, _ := a.db.GetNotaryKeys(ctx, req.ServerName, []gomatrixserverlib.KeyID{keyID})
if len(serverKeysResponses) == 0 {
return nil, fmt.Errorf("failed to find server key response for key ID %s", keyID)
}
// we should only get 1 result as we only gave 1 key ID
sk := serverKeysResponses[0]
util.GetLogger(ctx).Infof("fetchServerKeysFromCache: minvalid:%v keys: %+v", criteria.MinimumValidUntilTS, sk)
if criteria.MinimumValidUntilTS != 0 {
// check if it's still valid. if they have the same value that's also valid
if sk.ValidUntilTS < criteria.MinimumValidUntilTS {
return nil, fmt.Errorf(
"found server response for key ID %s but it is no longer valid, min: %v valid_until: %v",
keyID, criteria.MinimumValidUntilTS, sk.ValidUntilTS,
)
}
}
results = append(results, sk)
}
return results, nil
}
func (a *FederationInternalAPI) QueryServerKeys(
ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse,
) error {
// attempt to satisfy the entire request from the cache first
results, err := a.fetchServerKeysFromCache(ctx, req)
if err == nil {
// satisfied entirely from cache, return it
res.ServerKeys = results
return nil
}
util.GetLogger(ctx).WithField("server", req.ServerName).WithError(err).Warn("notary: failed to satisfy keys request entirely from cache, hitting direct")
serverKeys, err := a.fetchServerKeysDirectly(ctx, req.ServerName)
if err != nil {
// try to load as much as we can from the cache in a best effort basis
util.GetLogger(ctx).WithField("server", req.ServerName).WithError(err).Warn("notary: failed to ask server for keys, returning best effort keys")
serverKeysResponses, dbErr := a.db.GetNotaryKeys(ctx, req.ServerName, req.KeyIDs())
if dbErr != nil {
return fmt.Errorf("notary: server returned %s, and db returned %s", err, dbErr)
}
res.ServerKeys = serverKeysResponses
return nil
}
// cache it!
if err = a.db.UpdateNotaryKeys(context.Background(), req.ServerName, *serverKeys); err != nil {
// non-fatal, still return the response
util.GetLogger(ctx).WithError(err).Warn("failed to UpdateNotaryKeys")
}
res.ServerKeys = []gomatrixserverlib.ServerKeys{*serverKeys}
return nil
}