Update Pinecone P2P demo

This commit is contained in:
Neil Alexander 2022-03-30 15:01:22 +01:00
parent 49dc49b232
commit 8213b2ba30
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
13 changed files with 157 additions and 14 deletions

View file

@ -1,3 +1,17 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package users
import (
@ -11,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/defaults"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -64,9 +79,12 @@ func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *h
}
func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error {
list := map[string]struct{}{}
list := map[gomatrixserverlib.ServerName]struct{}{}
for k := range defaults.DefaultServerNames {
list[k] = struct{}{}
}
for _, k := range p.r.Peers() {
list[k.PublicKey] = struct{}{}
list[gomatrixserverlib.ServerName(k.PublicKey)] = struct{}{}
}
res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list)
return nil
@ -77,7 +95,7 @@ func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *use
func bulkFetchUserDirectoriesFromServers(
ctx context.Context, req *userapi.QuerySearchProfilesRequest,
fedClient *gomatrixserverlib.FederationClient,
homeservers map[string]struct{},
homeservers map[gomatrixserverlib.ServerName]struct{},
) (profiles []authtypes.Profile) {
jsonBody, err := json.Marshal(req)
if err != nil {
@ -96,7 +114,7 @@ func bulkFetchUserDirectoriesFromServers(
// concurrently query for public rooms
reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5)
for hs := range homeservers {
go func(homeserverDomain string) {
go func(homeserverDomain gomatrixserverlib.ServerName) {
defer wg.Done()
util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users")
@ -115,7 +133,7 @@ func bulkFetchUserDirectoriesFromServers(
return
}
for _, profile := range res.Profiles {
profile.ServerName = homeserverDomain
profile.ServerName = string(homeserverDomain)
// atomically send a room or stop
select {
case profileCh <- profile: