Fix public room directory in Pinecone demo

This commit is contained in:
Neil Alexander 2021-05-07 12:17:14 +01:00
parent aa672068ab
commit 603bf590f0
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 29 additions and 41 deletions

View file

@ -16,8 +16,6 @@ package rooms
import (
"context"
"crypto/ed25519"
"encoding/hex"
"sync"
"time"
@ -54,16 +52,11 @@ func NewPineconeRoomProvider(
}
func (p *PineconeRoomProvider) Rooms() []gomatrixserverlib.PublicRoom {
known := []ed25519.PublicKey{}
for _, k := range p.r.KnownNodes() {
known = append(known, k[:])
}
known = append(known, p.s.Sessions()...)
known := p.r.KnownNodes()
//known = append(known, p.s.Sessions()...)
list := []gomatrixserverlib.ServerName{}
for _, k := range known {
if len(k) == ed25519.PublicKeySize {
list = append(list, gomatrixserverlib.ServerName(hex.EncodeToString(k)))
}
list = append(list, gomatrixserverlib.ServerName(k.String()))
}
return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, list)
}
@ -84,16 +77,17 @@ func bulkFetchPublicRoomsFromServers(
var wg sync.WaitGroup
wg.Add(len(homeservers))
// concurrently query for public rooms
reqctx, reqcancel := context.WithCancel(ctx)
for _, hs := range homeservers {
go func(homeserverDomain gomatrixserverlib.ServerName) {
defer wg.Done()
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
var fres gomatrixserverlib.RespPublicRooms
var err error
for i := 0; i < pineconeRoomAttempts; i++ {
fres, err = fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "")
fres, err = fedClient.GetPublicRooms(reqctx, homeserverDomain, int(limit), "", false, "")
if err != nil {
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchPublicRoomsFromServers: failed to query hs",
)
if i == pineconeRoomAttempts-1 {
@ -108,43 +102,25 @@ func bulkFetchPublicRoomsFromServers(
select {
case roomCh <- room:
case <-done:
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
return
}
}
}(hs)
}
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
// closed.
go func() {
select {
case <-time.After(5 * time.Second):
default:
wg.Wait()
util.GetLogger(ctx).Info("Cleaning up resources")
close(roomCh)
}()
// fan-in results with timeout. We stop when we reach the limit.
FanIn:
for len(publicRooms) < int(limit) || limit == 0 {
// add a room or timeout
select {
case room, ok := <-roomCh:
if !ok {
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
break FanIn
}
publicRooms = append(publicRooms, room)
case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got.
util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early")
break FanIn
case <-ctx.Done(): // the client hung up on us, let's stop.
util.GetLogger(ctx).Info("Client hung up, returning early")
break FanIn
}
}
// tell goroutines to stop
reqcancel()
close(done)
close(roomCh)
for room := range roomCh {
publicRooms = append(publicRooms, room)
}
return publicRooms
}