Yggdrasil demo initial public room directory (#1181)

* Don't return null to public directory request

* Initial support for finding public rooms in Yggdrasil demo (incomplete)

* Increase QUIC idle time to 15 minutes
This commit is contained in:
Neil Alexander 2020-07-03 14:28:43 +01:00 committed by GitHub
parent 3797c38ec8
commit 3a28ddfb7a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 165 additions and 17 deletions

View file

@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
@ -131,6 +132,9 @@ func main() {
UserAPI: userAPI,
StateAPI: stateAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,
),
}
monolith.AddAllPublicRoutes(base.PublicAPIMux)

View file

@ -31,6 +31,7 @@ import (
"github.com/lucas-clemente/quic-go"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
"github.com/matrix-org/gomatrixserverlib"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
@ -140,7 +141,7 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
MaxIncomingStreams: 0,
MaxIncomingUniStreams: 0,
KeepAlive: true,
MaxIdleTimeout: time.Second * 120,
MaxIdleTimeout: time.Second * 900,
HandshakeTimeout: time.Second * 30,
}
@ -183,3 +184,29 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey {
func (n *Node) PeerCount() int {
return len(n.core.GetPeers()) - 1
}
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
nodemap := map[string]struct{}{}
/*
for _, peer := range n.core.GetSwitchPeers() {
nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
}
*/
n.sessions.Range(func(_, v interface{}) bool {
session, ok := v.(quic.Session)
if !ok {
return true
}
if len(session.ConnectionState().PeerCertificates) != 1 {
return true
}
subjectName := session.ConnectionState().PeerCertificates[0].Subject.CommonName
nodemap[subjectName] = struct{}{}
return true
})
var nodes []gomatrixserverlib.ServerName
for node := range nodemap {
nodes = append(nodes, gomatrixserverlib.ServerName(node))
}
return nodes
}

View file

@ -20,6 +20,7 @@ import (
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/hex"
"encoding/pem"
"errors"
@ -127,6 +128,9 @@ func (n *Node) generateTLSConfig() *tls.Config {
panic(err)
}
template := x509.Certificate{
Subject: pkix.Name{
CommonName: n.DerivedServerName(),
},
SerialNumber: big.NewInt(1),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
DNSNames: []string{n.DerivedSessionName()},

View file

@ -0,0 +1,120 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package yggrooms
import (
"context"
"sync"
"time"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
type YggdrasilRoomProvider struct {
node *yggconn.Node
fedSender api.FederationSenderInternalAPI
fedClient *gomatrixserverlib.FederationClient
}
func NewYggdrasilRoomProvider(
node *yggconn.Node, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient,
) *YggdrasilRoomProvider {
p := &YggdrasilRoomProvider{
node: node,
fedSender: fedSender,
fedClient: fedClient,
}
return p
}
func (p *YggdrasilRoomProvider) Rooms() []gomatrixserverlib.PublicRoom {
return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.node.KnownNodes())
}
// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
// Returns a list of public rooms.
func bulkFetchPublicRoomsFromServers(
ctx context.Context, fedClient *gomatrixserverlib.FederationClient,
homeservers []gomatrixserverlib.ServerName,
) (publicRooms []gomatrixserverlib.PublicRoom) {
limit := 200
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
// goroutines send rooms to this channel
roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
// signalling channel to tell goroutines to stop sending rooms and quit
done := make(chan bool)
// signalling to say when we can close the room channel
var wg sync.WaitGroup
wg.Add(len(homeservers))
// concurrently query for public rooms
for _, hs := range homeservers {
go func(homeserverDomain gomatrixserverlib.ServerName) {
defer wg.Done()
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
fres, err := fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "")
if err != nil {
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchPublicRoomsFromServers: failed to query hs",
)
return
}
for _, room := range fres.Chunk {
// atomically send a room or stop
select {
case roomCh <- room:
case <-done:
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
return
}
}
}(hs)
}
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
// closed.
go func() {
wg.Wait()
util.GetLogger(ctx).Info("Cleaning up resources")
close(roomCh)
}()
// fan-in results with timeout. We stop when we reach the limit.
FanIn:
for len(publicRooms) < int(limit) || limit == 0 {
// add a room or timeout
select {
case room, ok := <-roomCh:
if !ok {
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
break FanIn
}
publicRooms = append(publicRooms, room)
case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got.
util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early")
break FanIn
case <-ctx.Done(): // the client hung up on us, let's stop.
util.GetLogger(ctx).Info("Client hung up, returning early")
break FanIn
}
}
// tell goroutines to stop
close(done)
return publicRooms
}