Pinecone P2P demo (#1856)

* Pinecone demo

* Enable multicast, fix HTTP routing

* Fix multicast import

* Fix build

* Update Pinecone demo

* Fix the keys

* Tweaks

* Pinecone room directory support (early)

* Fix gobind-pinecone

* Add pinecone listener

* Fix public key value

* Use AuthenticatedConnect for dial

* Fix gobind-pinecone

* Stop panics

* Give fsAPI to keyserver

* Pinecone demo fixes

* Update gobind build scripts

* Account creation

* Tweaks

* Setup tweaks

* API tweaks

* API tweaks

* API tweaks

* Port mutex

* Re-enable multicast

* Add ReadCopy

* Update quic-go, fixes

* Shutdowns fixed for iOS

* Update build script

* Add WebSocket support

* Bug fixes

* Netconn context

* Fix WebSocket connectivity

* Fixes to gobind API

* Strip frameworks

* Configurability updates

* Update go.mod

* Update go.mod/go.sum

* Update go.mod/go.sum

* Update go.mod/go.sum

* Try to stay connected tto static peer

* Update gobind-pinecone

* Update go.mod/go.sum

* Test uTP+TLS

* Use HTTP/2

* Don't use HTTP/2

* Update go.mod/go.sum

* Attempt to reconnect to the static peer if it drops

* Stay connected to static peers more stickily

* Retry room directory lookups if they fail

* NewQUIC -> NewSessions

* Storage updates

* Don't return immediately when there's nothing to sync

* Updates

* Try to reconnect to static peer more

* Update go.mod/go.sum

* Require Go 1.14

* Update go.mod/go.sum

* Update go.mod/go.sum
This commit is contained in:
Neil Alexander 2021-05-06 12:00:42 +01:00 committed by GitHub
parent 464b908bd0
commit 1002e87b60
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1264 additions and 25 deletions

View file

@ -0,0 +1,91 @@
package conn
import (
"fmt"
"net"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/gomatrixserverlib"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
var parent net.Conn
if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") {
c, _, err := websocket.DefaultDialer.Dial(peer, nil)
if err != nil {
return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err)
}
parent = WrapWebSocketConn(c)
} else {
var err error
parent, err = net.Dial("tcp", peer)
if err != nil {
return fmt.Errorf("net.Dial: %w", err)
}
}
if parent == nil {
return fmt.Errorf("failed to wrap connection")
}
_, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote)
return err
}
type RoundTripper struct {
inner *http.Transport
}
func (y *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.URL.Scheme = "http"
return y.inner.RoundTrip(req)
}
func CreateClient(
base *setup.BaseDendrite, s *pineconeSessions.Sessions,
) *gomatrixserverlib.Client {
tr := &http.Transport{}
tr.RegisterProtocol(
"matrix", &RoundTripper{
inner: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 5,
Dial: s.Dial,
DialContext: s.DialContext,
DialTLS: s.DialTLS,
DialTLSContext: s.DialTLSContext,
},
},
)
return gomatrixserverlib.NewClient(
gomatrixserverlib.WithTransport(tr),
)
}
func CreateFederationClient(
base *setup.BaseDendrite, s *pineconeSessions.Sessions,
) *gomatrixserverlib.FederationClient {
tr := &http.Transport{}
tr.RegisterProtocol(
"matrix", &RoundTripper{
inner: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 5,
Dial: s.Dial,
DialContext: s.DialContext,
DialTLS: s.DialTLS,
DialTLSContext: s.DialTLSContext,
},
},
)
return gomatrixserverlib.NewFederationClient(
base.Cfg.Global.ServerName,
base.Cfg.Global.KeyID,
base.Cfg.Global.PrivateKey,
gomatrixserverlib.WithTransport(tr),
)
}

View file

@ -0,0 +1,81 @@
package conn
import (
"io"
"net"
"time"
"github.com/gorilla/websocket"
)
func WrapWebSocketConn(c *websocket.Conn) *WebSocketConn {
return &WebSocketConn{c: c}
}
type WebSocketConn struct {
r io.Reader
c *websocket.Conn
}
func (c *WebSocketConn) Write(p []byte) (int, error) {
err := c.c.WriteMessage(websocket.BinaryMessage, p)
if err != nil {
return 0, err
}
return len(p), nil
}
func (c *WebSocketConn) Read(p []byte) (int, error) {
for {
if c.r == nil {
// Advance to next message.
var err error
_, c.r, err = c.c.NextReader()
if err != nil {
return 0, err
}
}
n, err := c.r.Read(p)
if err == io.EOF {
// At end of message.
c.r = nil
if n > 0 {
return n, nil
} else {
// No data read, continue to next message.
continue
}
}
return n, err
}
}
func (c *WebSocketConn) Close() error {
return c.c.Close()
}
func (c *WebSocketConn) LocalAddr() net.Addr {
return c.c.LocalAddr()
}
func (c *WebSocketConn) RemoteAddr() net.Addr {
return c.c.RemoteAddr()
}
func (c *WebSocketConn) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
if err := c.SetWriteDeadline(t); err != nil {
return err
}
return nil
}
func (c *WebSocketConn) SetReadDeadline(t time.Time) error {
return c.c.SetReadDeadline(t)
}
func (c *WebSocketConn) SetWriteDeadline(t time.Time) error {
return c.c.SetWriteDeadline(t)
}

View file

@ -0,0 +1,9 @@
// +build !riotweb
package embed
import "github.com/gorilla/mux"
func Embed(_ *mux.Router, _ int, _ string) {
}

View file

@ -0,0 +1,83 @@
// +build riotweb
package embed
import (
"fmt"
"io"
"net/http"
"regexp"
"github.com/gorilla/mux"
"github.com/tidwall/sjson"
)
// From within the Riot Web directory:
// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_riotweb.go -private -pkg embed .
var cssFile = regexp.MustCompile("\\.css$")
var jsFile = regexp.MustCompile("\\.js$")
type mimeFixingHandler struct {
fs http.Handler
}
func (h mimeFixingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ruri := r.RequestURI
fmt.Println(ruri)
switch {
case cssFile.MatchString(ruri):
w.Header().Set("Content-Type", "text/css")
case jsFile.MatchString(ruri):
w.Header().Set("Content-Type", "application/javascript")
default:
}
h.fs.ServeHTTP(w, r)
}
func Embed(rootMux *mux.Router, listenPort int, serverName string) {
embeddedFS := _escFS(false)
embeddedServ := mimeFixingHandler{http.FileServer(embeddedFS)}
rootMux.NotFoundHandler = embeddedServ
rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, r *http.Request) {
url := fmt.Sprintf("http://%s:%d", r.Header("Host"), listenPort)
configFile, err := embeddedFS.Open("/config.sample.json")
if err != nil {
w.WriteHeader(500)
io.WriteString(w, "Couldn't open the file: "+err.Error())
return
}
configFileInfo, err := configFile.Stat()
if err != nil {
w.WriteHeader(500)
io.WriteString(w, "Couldn't stat the file: "+err.Error())
return
}
buf := make([]byte, configFileInfo.Size())
n, err := configFile.Read(buf)
if err != nil {
w.WriteHeader(500)
io.WriteString(w, "Couldn't read the file: "+err.Error())
return
}
if int64(n) != configFileInfo.Size() {
w.WriteHeader(500)
io.WriteString(w, "The returned file size didn't match what we expected")
return
}
js, _ := sjson.SetBytes(buf, "default_server_config.m\\.homeserver.base_url", url)
js, _ = sjson.SetBytes(js, "default_server_config.m\\.homeserver.server_name", serverName)
js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Riot %s", serverName))
js, _ = sjson.SetBytes(js, "disable_guests", true)
js, _ = sjson.SetBytes(js, "disable_3pid_login", true)
js, _ = sjson.DeleteBytes(js, "welcomeUserId")
_, _ = w.Write(js)
})
fmt.Println("*-------------------------------*")
fmt.Println("| This build includes Riot Web! |")
fmt.Println("*-------------------------------*")
fmt.Println("Point your browser to:", url)
fmt.Println()
}

View file

@ -0,0 +1,279 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"crypto/ed25519"
"crypto/tls"
"encoding/hex"
"flag"
"fmt"
"io/ioutil"
"log"
"math"
"net"
"net/http"
"os"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
"go.uber.org/atomic"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
pineconeTypes "github.com/matrix-org/pinecone/types"
"github.com/sirupsen/logrus"
)
var (
instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance")
instancePort = flag.Int("port", 8008, "the port that the client API will listen on")
instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to")
instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to")
)
// nolint:gocyclo
func main() {
flag.Parse()
internal.SetupPprof()
var pk ed25519.PublicKey
var sk ed25519.PrivateKey
keyfile := *instanceName + ".key"
if _, err := os.Stat(keyfile); os.IsNotExist(err) {
if pk, sk, err = ed25519.GenerateKey(nil); err != nil {
panic(err)
}
if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil {
panic(err)
}
} else if err == nil {
if sk, err = ioutil.ReadFile(keyfile); err != nil {
panic(err)
}
if len(sk) != ed25519.PrivateKeySize {
panic("the private key is not long enough")
}
pk = sk.Public().(ed25519.PublicKey)
}
logger := log.New(os.Stdout, "", 0)
pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil)
go func() {
listener, err := net.Listen("tcp", *instanceListen)
if err != nil {
panic(err)
}
fmt.Println("Listening on", listener.Addr())
for {
conn, err := listener.Accept()
if err != nil {
logrus.WithError(err).Error("listener.Accept failed")
continue
}
port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote)
if err != nil {
logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed")
continue
}
fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port)
}
}()
pQUIC := pineconeSessions.NewSessions(logger, pRouter)
pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
pMulticast.Start()
var staticPeerAttempts atomic.Uint32
var connectToStaticPeer func()
connectToStaticPeer = func() {
uri := *instancePeer
if uri == "" {
return
}
if err := conn.ConnectToPeer(pRouter, uri); err != nil {
exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc())))
time.AfterFunc(exp, connectToStaticPeer)
} else {
staticPeerAttempts.Store(0)
}
}
pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
if peertype == pineconeRouter.PeerTypeRemote && err != nil {
staticPeerAttempts.Store(0)
time.AfterFunc(time.Second, connectToStaticPeer)
}
})
go connectToStaticPeer()
cfg := &config.Dendrite{}
cfg.Defaults()
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
cfg.Global.PrivateKey = sk
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Global.Kafka.UseNaffka = true
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName))
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName))
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
if err := cfg.Derive(); err != nil {
panic(err)
}
base := setup.NewBaseDendrite(cfg, "Monolith", false)
defer base.Close() // nolint: errcheck
accountDB := base.CreateAccountsDB()
federation := conn.CreateFederationClient(base, pQUIC)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
rsComponent := roomserver.NewInternalAPI(
base, keyRing,
)
rsAPI := rsComponent
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
)
keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
keyAPI.SetUserAPI(userAPI)
eduInputAPI := eduserver.NewInternalAPI(
base, cache.New(), userAPI,
)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsComponent.SetFederationSenderAPI(fsAPI)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
Client: conn.CreateClient(base, pQUIC),
FedClient: federation,
KeyRing: keyRing,
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation),
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
base.PublicClientAPIMux,
base.PublicFederationAPIMux,
base.PublicKeyAPIMux,
base.PublicMediaAPIMux,
)
wsUpgrader := websocket.Upgrader{}
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
c, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
logrus.WithError(err).Error("Failed to upgrade WebSocket connection")
return
}
conn := conn.WrapWebSocketConn(c)
if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote); err != nil {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
}
})
embed.Embed(httpRouter, *instancePort, "Pinecone Demo")
pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux)
pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
pHTTP := pQUIC.HTTP()
pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
// Build both ends of a HTTP multiplex.
httpServer := &http.Server{
Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
BaseContext: func(_ net.Listener) context.Context {
return context.Background()
},
Handler: pMux,
}
go func() {
pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
logrus.Fatal(httpServer.Serve(pQUIC))
}()
go func() {
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
logrus.Info("Listening on ", httpBindAddr)
logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
}()
go func() {
logrus.Info("Sending wake-up message to known nodes")
req := &api.PerformBroadcastEDURequest{}
res := &api.PerformBroadcastEDUResponse{}
if err := fsAPI.PerformBroadcastEDU(context.TODO(), req, res); err != nil {
logrus.WithError(err).Error("Failed to send wake-up message to known nodes")
}
}()
base.WaitForShutdown()
}

View file

@ -0,0 +1,150 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rooms
import (
"context"
"crypto/ed25519"
"encoding/hex"
"sync"
"time"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
const pineconeRoomAttempts = 3
type PineconeRoomProvider struct {
r *pineconeRouter.Router
s *pineconeSessions.Sessions
fedSender api.FederationSenderInternalAPI
fedClient *gomatrixserverlib.FederationClient
}
func NewPineconeRoomProvider(
r *pineconeRouter.Router,
s *pineconeSessions.Sessions,
fedSender api.FederationSenderInternalAPI,
fedClient *gomatrixserverlib.FederationClient,
) *PineconeRoomProvider {
p := &PineconeRoomProvider{
r: r,
s: s,
fedSender: fedSender,
fedClient: fedClient,
}
return p
}
func (p *PineconeRoomProvider) Rooms() []gomatrixserverlib.PublicRoom {
known := []ed25519.PublicKey{}
for _, k := range p.r.KnownNodes() {
known = append(known, k[:])
}
known = append(known, p.s.Sessions()...)
list := []gomatrixserverlib.ServerName{}
for _, k := range known {
if len(k) == ed25519.PublicKeySize {
list = append(list, gomatrixserverlib.ServerName(hex.EncodeToString(k)))
}
}
return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, list)
}
// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
// Returns a list of public rooms.
func bulkFetchPublicRoomsFromServers(
ctx context.Context, fedClient *gomatrixserverlib.FederationClient,
homeservers []gomatrixserverlib.ServerName,
) (publicRooms []gomatrixserverlib.PublicRoom) {
limit := 200
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
// goroutines send rooms to this channel
roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
// signalling channel to tell goroutines to stop sending rooms and quit
done := make(chan bool)
// signalling to say when we can close the room channel
var wg sync.WaitGroup
wg.Add(len(homeservers))
// concurrently query for public rooms
for _, hs := range homeservers {
go func(homeserverDomain gomatrixserverlib.ServerName) {
defer wg.Done()
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
var fres gomatrixserverlib.RespPublicRooms
var err error
for i := 0; i < pineconeRoomAttempts; i++ {
fres, err = fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "")
if err != nil {
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchPublicRoomsFromServers: failed to query hs",
)
if i == pineconeRoomAttempts-1 {
return
}
} else {
break
}
}
for _, room := range fres.Chunk {
// atomically send a room or stop
select {
case roomCh <- room:
case <-done:
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
return
}
}
}(hs)
}
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
// closed.
go func() {
wg.Wait()
util.GetLogger(ctx).Info("Cleaning up resources")
close(roomCh)
}()
// fan-in results with timeout. We stop when we reach the limit.
FanIn:
for len(publicRooms) < int(limit) || limit == 0 {
// add a room or timeout
select {
case room, ok := <-roomCh:
if !ok {
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
break FanIn
}
publicRooms = append(publicRooms, room)
case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got.
util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early")
break FanIn
case <-ctx.Done(): // the client hung up on us, let's stop.
util.GetLogger(ctx).Info("Client hung up, returning early")
break FanIn
}
}
// tell goroutines to stop
close(done)
return publicRooms
}

View file

@ -1,6 +1,6 @@
# Yggdrasil Demo
This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.13 or later.
This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.14 or later.
To run the homeserver, start at the root of the Dendrite repository and run: