mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Handle inbound federation E2E key queries/claims (#1215)
* Handle inbound /keys/claim and /keys/query requests * Add display names to device key responses * Linting
This commit is contained in:
parent
1e71fd645e
commit
541a23f712
25 changed files with 321 additions and 35 deletions
|
@ -24,7 +24,9 @@ import (
|
|||
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/tidwall/sjson"
|
||||
)
|
||||
|
@ -33,6 +35,7 @@ type KeyInternalAPI struct {
|
|||
DB storage.Database
|
||||
ThisServer gomatrixserverlib.ServerName
|
||||
FedClient *gomatrixserverlib.FederationClient
|
||||
UserAPI userapi.UserInternalAPI
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
|
||||
|
@ -66,11 +69,25 @@ func (a *KeyInternalAPI) PerformClaimKeys(ctx context.Context, req *api.PerformC
|
|||
Err: fmt.Sprintf("failed to ClaimKeys locally: %s", err),
|
||||
}
|
||||
}
|
||||
mergeInto(res.OneTimeKeys, keys)
|
||||
util.GetLogger(ctx).WithField("keys_claimed", len(keys)).WithField("num_users", len(local)).Info("Claimed local keys")
|
||||
for _, key := range keys {
|
||||
_, ok := res.OneTimeKeys[key.UserID]
|
||||
if !ok {
|
||||
res.OneTimeKeys[key.UserID] = make(map[string]map[string]json.RawMessage)
|
||||
}
|
||||
_, ok = res.OneTimeKeys[key.UserID][key.DeviceID]
|
||||
if !ok {
|
||||
res.OneTimeKeys[key.UserID][key.DeviceID] = make(map[string]json.RawMessage)
|
||||
}
|
||||
for keyID, keyJSON := range key.KeyJSON {
|
||||
res.OneTimeKeys[key.UserID][key.DeviceID][keyID] = keyJSON
|
||||
}
|
||||
}
|
||||
delete(domainToDeviceKeys, string(a.ThisServer))
|
||||
}
|
||||
// claim remote keys
|
||||
a.claimRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys)
|
||||
if len(domainToDeviceKeys) > 0 {
|
||||
a.claimRemoteKeys(ctx, req.Timeout, res, domainToDeviceKeys)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) claimRemoteKeys(
|
||||
|
@ -82,6 +99,7 @@ func (a *KeyInternalAPI) claimRemoteKeys(
|
|||
wg.Add(len(domainToDeviceKeys))
|
||||
// mutex for failures
|
||||
var failMu sync.Mutex
|
||||
util.GetLogger(ctx).WithField("num_servers", len(domainToDeviceKeys)).Info("Claiming remote keys from servers")
|
||||
|
||||
// fan out
|
||||
for d, k := range domainToDeviceKeys {
|
||||
|
@ -91,6 +109,7 @@ func (a *KeyInternalAPI) claimRemoteKeys(
|
|||
defer cancel()
|
||||
claimKeyRes, err := a.FedClient.ClaimKeys(fedCtx, gomatrixserverlib.ServerName(domain), keysToClaim)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).WithField("server", domain).Error("ClaimKeys failed")
|
||||
failMu.Lock()
|
||||
res.Failures[domain] = map[string]interface{}{
|
||||
"message": err.Error(),
|
||||
|
@ -108,6 +127,7 @@ func (a *KeyInternalAPI) claimRemoteKeys(
|
|||
close(resultCh)
|
||||
}()
|
||||
|
||||
keysClaimed := 0
|
||||
for result := range resultCh {
|
||||
for userID, nest := range result.OneTimeKeys {
|
||||
res.OneTimeKeys[userID] = make(map[string]map[string]json.RawMessage)
|
||||
|
@ -119,10 +139,12 @@ func (a *KeyInternalAPI) claimRemoteKeys(
|
|||
continue
|
||||
}
|
||||
res.OneTimeKeys[userID][deviceID][keyIDWithAlgo] = keyJSON
|
||||
keysClaimed++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
util.GetLogger(ctx).WithField("num_keys", keysClaimed).Info("Claimed remote keys")
|
||||
}
|
||||
|
||||
func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysRequest, res *api.QueryKeysResponse) {
|
||||
|
@ -145,13 +167,28 @@ func (a *KeyInternalAPI) QueryKeys(ctx context.Context, req *api.QueryKeysReques
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// pull out display names after we have the keys so we handle wildcards correctly
|
||||
var dids []string
|
||||
for _, dk := range deviceKeys {
|
||||
dids = append(dids, dk.DeviceID)
|
||||
}
|
||||
var queryRes userapi.QueryDeviceInfosResponse
|
||||
err = a.UserAPI.QueryDeviceInfos(ctx, &userapi.QueryDeviceInfosRequest{
|
||||
DeviceIDs: dids,
|
||||
}, &queryRes)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).Warnf("Failed to QueryDeviceInfos for device IDs, display names will be missing")
|
||||
}
|
||||
|
||||
if res.DeviceKeys[userID] == nil {
|
||||
res.DeviceKeys[userID] = make(map[string]json.RawMessage)
|
||||
}
|
||||
for _, dk := range deviceKeys {
|
||||
// inject an empty 'unsigned' key which should be used for display names
|
||||
// (but not via this API? unsure when they should be added)
|
||||
dk.KeyJSON, _ = sjson.SetBytes(dk.KeyJSON, "unsigned", struct{}{})
|
||||
// inject display name if known
|
||||
dk.KeyJSON, _ = sjson.SetBytes(dk.KeyJSON, "unsigned", struct {
|
||||
DisplayName string `json:"device_display_name,omitempty"`
|
||||
}{queryRes.DeviceInfo[dk.DeviceID].DisplayName})
|
||||
res.DeviceKeys[userID][dk.DeviceID] = dk.KeyJSON
|
||||
}
|
||||
} else {
|
||||
|
@ -298,19 +335,3 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
|
|||
func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
func mergeInto(dst map[string]map[string]map[string]json.RawMessage, src []api.OneTimeKeys) {
|
||||
for _, key := range src {
|
||||
_, ok := dst[key.UserID]
|
||||
if !ok {
|
||||
dst[key.UserID] = make(map[string]map[string]json.RawMessage)
|
||||
}
|
||||
_, ok = dst[key.UserID][key.DeviceID]
|
||||
if !ok {
|
||||
dst[key.UserID][key.DeviceID] = make(map[string]json.RawMessage)
|
||||
}
|
||||
for keyID, keyJSON := range key.KeyJSON {
|
||||
dst[key.UserID][key.DeviceID][keyID] = keyJSON
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -33,7 +34,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
|
|||
|
||||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient) api.KeyInternalAPI {
|
||||
func NewInternalAPI(cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI) api.KeyInternalAPI {
|
||||
db, err := storage.NewDatabase(
|
||||
string(cfg.Database.E2EKey),
|
||||
cfg.DbProperties(),
|
||||
|
@ -45,5 +46,6 @@ func NewInternalAPI(cfg *config.Dendrite, fedClient *gomatrixserverlib.Federatio
|
|||
DB: db,
|
||||
ThisServer: cfg.Matrix.ServerName,
|
||||
FedClient: fedClient,
|
||||
UserAPI: userAPI,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue