mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
parent
2258387d39
commit
144c060fa7
11 changed files with 1 additions and 1659 deletions
|
@ -1,230 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
gostream "github.com/libp2p/go-libp2p-gostream"
|
||||
p2phttp "github.com/libp2p/go-libp2p-http"
|
||||
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/keyserver"
|
||||
"github.com/matrix-org/dendrite/roomserver"
|
||||
"github.com/matrix-org/dendrite/setup"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/mscs"
|
||||
"github.com/matrix-org/dendrite/userapi"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func createKeyDB(
|
||||
base *P2PDendrite,
|
||||
db *gomatrixserverlib.KeyRing,
|
||||
) {
|
||||
mdns := mDNSListener{
|
||||
host: base.LibP2P,
|
||||
keydb: db,
|
||||
}
|
||||
serv, err := p2pdisc.NewMdnsService(
|
||||
base.LibP2PContext,
|
||||
base.LibP2P,
|
||||
time.Second*10,
|
||||
"_matrix-dendrite-p2p._tcp",
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
serv.RegisterNotifee(&mdns)
|
||||
}
|
||||
|
||||
func createFederationClient(
|
||||
base *P2PDendrite,
|
||||
) *gomatrixserverlib.FederationClient {
|
||||
fmt.Println("Running in libp2p federation mode")
|
||||
fmt.Println("Warning: Federation with non-libp2p homeservers will not work in this mode yet!")
|
||||
tr := &http.Transport{}
|
||||
tr.RegisterProtocol(
|
||||
"matrix",
|
||||
p2phttp.NewTransport(base.LibP2P, p2phttp.ProtocolOption("/matrix")),
|
||||
)
|
||||
return gomatrixserverlib.NewFederationClient(
|
||||
base.Base.Cfg.Global.ServerName, base.Base.Cfg.Global.KeyID,
|
||||
base.Base.Cfg.Global.PrivateKey,
|
||||
gomatrixserverlib.WithTransport(tr),
|
||||
)
|
||||
}
|
||||
|
||||
func createClient(
|
||||
base *P2PDendrite,
|
||||
) *gomatrixserverlib.Client {
|
||||
tr := &http.Transport{}
|
||||
tr.RegisterProtocol(
|
||||
"matrix",
|
||||
p2phttp.NewTransport(base.LibP2P, p2phttp.ProtocolOption("/matrix")),
|
||||
)
|
||||
return gomatrixserverlib.NewClient(
|
||||
gomatrixserverlib.WithTransport(tr),
|
||||
)
|
||||
}
|
||||
|
||||
func main() {
|
||||
instanceName := flag.String("name", "dendrite-p2p", "the name of this P2P demo instance")
|
||||
instancePort := flag.Int("port", 8080, "the port that the client API will listen on")
|
||||
flag.Parse()
|
||||
|
||||
filename := fmt.Sprintf("%s-private.key", *instanceName)
|
||||
_, err := os.Stat(filename)
|
||||
var privKey ed25519.PrivateKey
|
||||
if os.IsNotExist(err) {
|
||||
_, privKey, _ = ed25519.GenerateKey(nil)
|
||||
if err = ioutil.WriteFile(filename, privKey, 0600); err != nil {
|
||||
fmt.Printf("Couldn't write private key to file '%s': %s\n", filename, err)
|
||||
}
|
||||
} else {
|
||||
privKey, err = ioutil.ReadFile(filename)
|
||||
if err != nil {
|
||||
fmt.Printf("Couldn't read private key from file '%s': %s\n", filename, err)
|
||||
_, privKey, _ = ed25519.GenerateKey(nil)
|
||||
}
|
||||
}
|
||||
|
||||
cfg := config.Dendrite{}
|
||||
cfg.Defaults(true)
|
||||
cfg.Global.ServerName = "p2p"
|
||||
cfg.Global.PrivateKey = privKey
|
||||
cfg.Global.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName))
|
||||
cfg.FederationAPI.FederationMaxRetries = 6
|
||||
cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", *instanceName))
|
||||
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
|
||||
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
|
||||
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
|
||||
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
|
||||
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", *instanceName))
|
||||
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
|
||||
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
|
||||
cfg.MSCs.MSCs = []string{"msc2836"}
|
||||
cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", *instanceName))
|
||||
if err = cfg.Derive(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
base := NewP2PDendrite(&cfg, "Monolith")
|
||||
defer base.Base.Close() // nolint: errcheck
|
||||
|
||||
accountDB := base.Base.CreateAccountsDB()
|
||||
federation := createFederationClient(base)
|
||||
keyAPI := keyserver.NewInternalAPI(&base.Base, &base.Base.Cfg.KeyServer, federation)
|
||||
|
||||
rsAPI := roomserver.NewInternalAPI(
|
||||
&base.Base,
|
||||
)
|
||||
|
||||
userAPI := userapi.NewInternalAPI(&base.Base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.Base.PushGatewayHTTPClient())
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
|
||||
rsAPI.SetAppserviceAPI(asAPI)
|
||||
fsAPI := federationapi.NewInternalAPI(
|
||||
&base.Base, federation, rsAPI, base.Base.Caches, nil, true,
|
||||
)
|
||||
keyRing := fsAPI.KeyRing()
|
||||
rsAPI.SetFederationAPI(fsAPI, keyRing)
|
||||
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
|
||||
err = provider.Start()
|
||||
if err != nil {
|
||||
panic("failed to create new public rooms provider: " + err.Error())
|
||||
}
|
||||
|
||||
createKeyDB(
|
||||
base, keyRing,
|
||||
)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
Client: createClient(base),
|
||||
FedClient: federation,
|
||||
KeyRing: keyRing,
|
||||
|
||||
AppserviceAPI: asAPI,
|
||||
FederationAPI: fsAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
ExtPublicRoomsProvider: provider,
|
||||
}
|
||||
monolith.AddAllPublicRoutes(
|
||||
base.Base.ProcessContext,
|
||||
base.Base.PublicClientAPIMux,
|
||||
base.Base.PublicFederationAPIMux,
|
||||
base.Base.PublicKeyAPIMux,
|
||||
base.Base.PublicWellKnownAPIMux,
|
||||
base.Base.PublicMediaAPIMux,
|
||||
base.Base.SynapseAdminMux,
|
||||
)
|
||||
if err := mscs.Enable(&base.Base, &monolith); err != nil {
|
||||
logrus.WithError(err).Fatalf("Failed to enable MSCs")
|
||||
}
|
||||
|
||||
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||
httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.Base.InternalAPIMux)
|
||||
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Base.PublicClientAPIMux)
|
||||
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Base.PublicMediaAPIMux)
|
||||
embed.Embed(httpRouter, *instancePort, "Yggdrasil Demo")
|
||||
|
||||
libp2pRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||
libp2pRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.Base.PublicFederationAPIMux)
|
||||
libp2pRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(base.Base.PublicKeyAPIMux)
|
||||
libp2pRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Base.PublicMediaAPIMux)
|
||||
|
||||
// Expose the matrix APIs directly rather than putting them under a /api path.
|
||||
go func() {
|
||||
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
|
||||
logrus.Info("Listening on ", httpBindAddr)
|
||||
logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
|
||||
}()
|
||||
// Expose the matrix APIs also via libp2p
|
||||
if base.LibP2P != nil {
|
||||
go func() {
|
||||
logrus.Info("Listening on libp2p host ID ", base.LibP2P.ID())
|
||||
listener, err := gostream.Listen(base.LibP2P, "/matrix")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer func() {
|
||||
logrus.Fatal(listener.Close())
|
||||
}()
|
||||
logrus.Fatal(http.Serve(listener, libp2pRouter))
|
||||
}()
|
||||
}
|
||||
|
||||
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
|
||||
base.Base.WaitForShutdown()
|
||||
}
|
|
@ -1,62 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type mDNSListener struct {
|
||||
keydb *gomatrixserverlib.KeyRing
|
||||
host host.Host
|
||||
}
|
||||
|
||||
func (n *mDNSListener) HandlePeerFound(p peer.AddrInfo) {
|
||||
if err := n.host.Connect(context.Background(), p); err != nil {
|
||||
fmt.Println("Error adding peer", p.ID.String(), "via mDNS:", err)
|
||||
}
|
||||
if pubkey, err := p.ID.ExtractPublicKey(); err == nil {
|
||||
raw, _ := pubkey.Raw()
|
||||
if err := n.keydb.KeyDatabase.StoreKeys(
|
||||
context.Background(),
|
||||
map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{
|
||||
{
|
||||
ServerName: gomatrixserverlib.ServerName(p.ID.String()),
|
||||
KeyID: "ed25519:p2pdemo",
|
||||
}: {
|
||||
VerifyKey: gomatrixserverlib.VerifyKey{
|
||||
Key: gomatrixserverlib.Base64Bytes(raw),
|
||||
},
|
||||
ValidUntilTS: math.MaxUint64 >> 1,
|
||||
ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
|
||||
},
|
||||
},
|
||||
); err != nil {
|
||||
fmt.Println("Failed to store keys:", err)
|
||||
}
|
||||
}
|
||||
fmt.Println("Discovered", len(n.host.Peerstore().Peers())-1, "other libp2p peer(s):")
|
||||
for _, peer := range n.host.Peerstore().Peers() {
|
||||
if peer != n.host.ID() {
|
||||
fmt.Println("-", peer)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"errors"
|
||||
|
||||
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
||||
record "github.com/libp2p/go-libp2p-record"
|
||||
|
||||
"github.com/libp2p/go-libp2p"
|
||||
circuit "github.com/libp2p/go-libp2p-circuit"
|
||||
crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
routing "github.com/libp2p/go-libp2p-core/routing"
|
||||
|
||||
host "github.com/libp2p/go-libp2p-core/host"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
)
|
||||
|
||||
// P2PDendrite is a Peer-to-Peer variant of BaseDendrite.
|
||||
type P2PDendrite struct {
|
||||
Base base.BaseDendrite
|
||||
|
||||
// Store our libp2p object so that we can make outgoing connections from it
|
||||
// later
|
||||
LibP2P host.Host
|
||||
LibP2PContext context.Context
|
||||
LibP2PCancel context.CancelFunc
|
||||
LibP2PDHT *dht.IpfsDHT
|
||||
LibP2PPubsub *pubsub.PubSub
|
||||
}
|
||||
|
||||
// NewP2PDendrite creates a new instance to be used by a component.
|
||||
// The componentName is used for logging purposes, and should be a friendly name
|
||||
// of the component running, e.g. SyncAPI.
|
||||
func NewP2PDendrite(cfg *config.Dendrite, componentName string) *P2PDendrite {
|
||||
baseDendrite := base.NewBaseDendrite(cfg, componentName)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
privKey, err := crypto.UnmarshalEd25519PrivateKey(cfg.Global.PrivateKey[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
//defaultIP6ListenAddr, _ := multiaddr.NewMultiaddr("/ip6/::/tcp/0")
|
||||
var libp2pdht *dht.IpfsDHT
|
||||
libp2p, err := libp2p.New(ctx,
|
||||
libp2p.Identity(privKey),
|
||||
libp2p.DefaultListenAddrs,
|
||||
//libp2p.ListenAddrs(defaultIP6ListenAddr),
|
||||
libp2p.DefaultTransports,
|
||||
libp2p.Routing(func(h host.Host) (r routing.PeerRouting, err error) {
|
||||
libp2pdht, err = dht.New(ctx, h)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
libp2pdht.Validator = libP2PValidator{}
|
||||
r = libp2pdht
|
||||
return
|
||||
}),
|
||||
libp2p.EnableAutoRelay(),
|
||||
libp2p.EnableRelay(circuit.OptHop),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
libp2ppubsub, err := pubsub.NewFloodSub(context.Background(), libp2p, []pubsub.Option{
|
||||
pubsub.WithMessageSigning(true),
|
||||
}...)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println("Our public key:", privKey.GetPublic())
|
||||
fmt.Println("Our node ID:", libp2p.ID())
|
||||
fmt.Println("Our addresses:", libp2p.Addrs())
|
||||
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(libp2p.ID().String())
|
||||
|
||||
return &P2PDendrite{
|
||||
Base: *baseDendrite,
|
||||
LibP2P: libp2p,
|
||||
LibP2PContext: ctx,
|
||||
LibP2PCancel: cancel,
|
||||
LibP2PDHT: libp2pdht,
|
||||
LibP2PPubsub: libp2ppubsub,
|
||||
}
|
||||
}
|
||||
|
||||
type libP2PValidator struct {
|
||||
KeyBook pstore.KeyBook
|
||||
}
|
||||
|
||||
func (v libP2PValidator) Validate(key string, value []byte) error {
|
||||
ns, _, err := record.SplitKey(key)
|
||||
if err != nil || ns != "matrix" {
|
||||
return errors.New("not Matrix path")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v libP2PValidator) Select(k string, vals [][]byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
|
@ -1,153 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
const MaintenanceInterval = time.Second * 10
|
||||
|
||||
type discoveredRoom struct {
|
||||
time time.Time
|
||||
room gomatrixserverlib.PublicRoom
|
||||
}
|
||||
|
||||
type publicRoomsProvider struct {
|
||||
pubsub *pubsub.PubSub
|
||||
topic *pubsub.Topic
|
||||
subscription *pubsub.Subscription
|
||||
foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT
|
||||
foundRoomsMutex sync.RWMutex // protects foundRooms
|
||||
maintenanceTimer *time.Timer //
|
||||
roomsAdvertised atomic.Value // stores int
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||
}
|
||||
|
||||
func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI) *publicRoomsProvider {
|
||||
return &publicRoomsProvider{
|
||||
foundRooms: make(map[string]discoveredRoom),
|
||||
pubsub: ps,
|
||||
rsAPI: rsAPI,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *publicRoomsProvider) Start() error {
|
||||
if topic, err := p.pubsub.Join("/matrix/publicRooms"); err != nil {
|
||||
return err
|
||||
} else if sub, err := topic.Subscribe(); err == nil {
|
||||
p.topic = topic
|
||||
p.subscription = sub
|
||||
go p.MaintenanceTimer()
|
||||
go p.FindRooms()
|
||||
p.roomsAdvertised.Store(0)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *publicRoomsProvider) MaintenanceTimer() {
|
||||
if p.maintenanceTimer != nil && !p.maintenanceTimer.Stop() {
|
||||
<-p.maintenanceTimer.C
|
||||
}
|
||||
p.Interval()
|
||||
}
|
||||
|
||||
func (p *publicRoomsProvider) Interval() {
|
||||
p.foundRoomsMutex.Lock()
|
||||
for k, v := range p.foundRooms {
|
||||
if time.Since(v.time) > time.Minute {
|
||||
delete(p.foundRooms, k)
|
||||
}
|
||||
}
|
||||
p.foundRoomsMutex.Unlock()
|
||||
if err := p.AdvertiseRooms(); err != nil {
|
||||
fmt.Println("Failed to advertise room in DHT:", err)
|
||||
}
|
||||
p.foundRoomsMutex.RLock()
|
||||
defer p.foundRoomsMutex.RUnlock()
|
||||
fmt.Println("Found", len(p.foundRooms), "room(s), advertised", p.roomsAdvertised.Load(), "room(s)")
|
||||
p.maintenanceTimer = time.AfterFunc(MaintenanceInterval, p.Interval)
|
||||
}
|
||||
|
||||
func (p *publicRoomsProvider) AdvertiseRooms() error {
|
||||
ctx := context.Background()
|
||||
var queryRes roomserverAPI.QueryPublishedRoomsResponse
|
||||
// Query published rooms on our server. This will not invoke clientapi.ExtraPublicRoomsProvider
|
||||
err := p.rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
|
||||
return err
|
||||
}
|
||||
ourRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.rsAPI)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
|
||||
return err
|
||||
}
|
||||
advertised := 0
|
||||
for _, room := range ourRooms {
|
||||
if j, err := json.Marshal(room); err == nil {
|
||||
if err := p.topic.Publish(context.TODO(), j); err != nil {
|
||||
fmt.Println("Failed to publish public room:", err)
|
||||
} else {
|
||||
advertised++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
p.roomsAdvertised.Store(advertised)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *publicRoomsProvider) FindRooms() {
|
||||
for {
|
||||
msg, err := p.subscription.Next(context.Background())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
received := discoveredRoom{
|
||||
time: time.Now(),
|
||||
}
|
||||
if err := json.Unmarshal(msg.Data, &received.room); err != nil {
|
||||
fmt.Println("Unmarshal error:", err)
|
||||
continue
|
||||
}
|
||||
fmt.Printf("received %+v \n", received)
|
||||
p.foundRoomsMutex.Lock()
|
||||
p.foundRooms[received.room.RoomID] = received
|
||||
p.foundRoomsMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *publicRoomsProvider) Rooms() (rooms []gomatrixserverlib.PublicRoom) {
|
||||
p.foundRoomsMutex.RLock()
|
||||
defer p.foundRoomsMutex.RUnlock()
|
||||
for _, dr := range p.foundRooms {
|
||||
rooms = append(rooms, dr.room)
|
||||
}
|
||||
return
|
||||
}
|
|
@ -1,101 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"syscall/js"
|
||||
)
|
||||
|
||||
// JSServer exposes an HTTP-like server interface which allows JS to 'send' requests to it.
|
||||
type JSServer struct {
|
||||
// The router which will service requests
|
||||
Mux http.Handler
|
||||
}
|
||||
|
||||
// OnRequestFromJS is the function that JS will invoke when there is a new request.
|
||||
// The JS function signature is:
|
||||
// function(reqString: string): Promise<{result: string, error: string}>
|
||||
// Usage is like:
|
||||
// const res = await global._go_js_server.fetch(reqString);
|
||||
// if (res.error) {
|
||||
// // handle error: this is a 'network' error, not a non-2xx error.
|
||||
// }
|
||||
// const rawHttpResponse = res.result;
|
||||
func (h *JSServer) OnRequestFromJS(this js.Value, args []js.Value) interface{} {
|
||||
// we HAVE to spawn a new goroutine and return immediately or else Go will deadlock
|
||||
// if this request blocks at all e.g for /sync calls
|
||||
httpStr := args[0].String()
|
||||
promise := js.Global().Get("Promise").New(js.FuncOf(func(pthis js.Value, pargs []js.Value) interface{} {
|
||||
// The initial callback code for new Promise() is also called on the critical path, which is why
|
||||
// we need to put this in an immediately invoked goroutine.
|
||||
go func() {
|
||||
resolve := pargs[0]
|
||||
resStr, err := h.handle(httpStr)
|
||||
errStr := ""
|
||||
if err != nil {
|
||||
errStr = err.Error()
|
||||
}
|
||||
resolve.Invoke(map[string]interface{}{
|
||||
"result": resStr,
|
||||
"error": errStr,
|
||||
})
|
||||
}()
|
||||
return nil
|
||||
}))
|
||||
return promise
|
||||
}
|
||||
|
||||
// handle invokes the http.ServeMux for this request and returns the raw HTTP response.
|
||||
func (h *JSServer) handle(httpStr string) (resStr string, err error) {
|
||||
req, err := http.ReadRequest(bufio.NewReader(strings.NewReader(httpStr)))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
h.Mux.ServeHTTP(w, req)
|
||||
|
||||
res := w.Result()
|
||||
var resBuffer strings.Builder
|
||||
err = res.Write(&resBuffer)
|
||||
return resBuffer.String(), err
|
||||
}
|
||||
|
||||
// ListenAndServe registers a variable in JS-land with the given namespace. This variable is
|
||||
// a function which JS-land can call to 'send' HTTP requests. The function is attached to
|
||||
// a global object called "_go_js_server". See OnRequestFromJS for more info.
|
||||
func (h *JSServer) ListenAndServe(namespace string) {
|
||||
globalName := "_go_js_server"
|
||||
// register a hook in JS-land for it to invoke stuff
|
||||
server := js.Global().Get(globalName)
|
||||
if !server.Truthy() {
|
||||
server = js.Global().Get("Object").New()
|
||||
js.Global().Set(globalName, server)
|
||||
}
|
||||
|
||||
server.Set(namespace, js.FuncOf(h.OnRequestFromJS))
|
||||
|
||||
fmt.Printf("Listening for requests from JS on function %s.%s\n", globalName, namespace)
|
||||
// Block forever to mimic http.ListenAndServe
|
||||
select {}
|
||||
}
|
|
@ -1,87 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const libp2pMatrixKeyID = "ed25519:libp2p-dendrite"
|
||||
|
||||
type libp2pKeyFetcher struct {
|
||||
}
|
||||
|
||||
// FetchKeys looks up a batch of public keys.
|
||||
// Takes a map from (server name, key ID) pairs to timestamp.
|
||||
// The timestamp is when the keys need to be vaild up to.
|
||||
// Returns a map from (server name, key ID) pairs to server key objects for
|
||||
// that server name containing that key ID
|
||||
// The result may have fewer (server name, key ID) pairs than were in the request.
|
||||
// The result may have more (server name, key ID) pairs than were in the request.
|
||||
// Returns an error if there was a problem fetching the keys.
|
||||
func (f *libp2pKeyFetcher) FetchKeys(
|
||||
ctx context.Context,
|
||||
requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
|
||||
) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) {
|
||||
res := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult)
|
||||
for req := range requests {
|
||||
if req.KeyID != libp2pMatrixKeyID {
|
||||
return nil, fmt.Errorf("FetchKeys: cannot fetch key with ID %s, should be %s", req.KeyID, libp2pMatrixKeyID)
|
||||
}
|
||||
|
||||
// The server name is a libp2p peer ID
|
||||
peerIDStr := string(req.ServerName)
|
||||
peerID, err := peer.Decode(peerIDStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to decode peer ID from server name '%s': %w", peerIDStr, err)
|
||||
}
|
||||
pubKey, err := peerID.ExtractPublicKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to extract public key from peer ID: %w", err)
|
||||
}
|
||||
pubKeyBytes, err := pubKey.Raw()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to extract raw bytes from public key: %w", err)
|
||||
}
|
||||
b64Key := gomatrixserverlib.Base64Bytes(pubKeyBytes)
|
||||
res[req] = gomatrixserverlib.PublicKeyLookupResult{
|
||||
VerifyKey: gomatrixserverlib.VerifyKey{
|
||||
Key: b64Key,
|
||||
},
|
||||
ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
|
||||
ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(24 * time.Hour * 365)),
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// FetcherName returns the name of this fetcher, which can then be used for
|
||||
// logging errors etc.
|
||||
func (f *libp2pKeyFetcher) FetcherName() string {
|
||||
return "libp2pKeyFetcher"
|
||||
}
|
||||
|
||||
// no-op function for storing keys - we don't do any work to fetch them so don't bother storing.
|
||||
func (f *libp2pKeyFetcher) StoreKeys(ctx context.Context, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error {
|
||||
return nil
|
||||
}
|
|
@ -1,270 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"fmt"
|
||||
"syscall/js"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/dendrite/appservice"
|
||||
"github.com/matrix-org/dendrite/federationapi"
|
||||
"github.com/matrix-org/dendrite/internal/httputil"
|
||||
"github.com/matrix-org/dendrite/keyserver"
|
||||
"github.com/matrix-org/dendrite/roomserver"
|
||||
"github.com/matrix-org/dendrite/setup"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/userapi"
|
||||
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
_ "github.com/matrix-org/go-sqlite3-js"
|
||||
)
|
||||
|
||||
var GitCommit string
|
||||
|
||||
func init() {
|
||||
fmt.Printf("[%s] dendrite.js starting...\n", GitCommit)
|
||||
}
|
||||
|
||||
const keyNameEd25519 = "_go_ed25519_key"
|
||||
|
||||
func readKeyFromLocalStorage() (key ed25519.PrivateKey, err error) {
|
||||
localforage := js.Global().Get("localforage")
|
||||
if !localforage.Truthy() {
|
||||
err = fmt.Errorf("readKeyFromLocalStorage: no localforage")
|
||||
return
|
||||
}
|
||||
// https://localforage.github.io/localForage/
|
||||
item, ok := await(localforage.Call("getItem", keyNameEd25519))
|
||||
if !ok || !item.Truthy() {
|
||||
err = fmt.Errorf("readKeyFromLocalStorage: no key in localforage")
|
||||
return
|
||||
}
|
||||
fmt.Println("Found key in localforage")
|
||||
// extract []byte and make an ed25519 key
|
||||
seed := make([]byte, 32, 32)
|
||||
js.CopyBytesToGo(seed, item)
|
||||
|
||||
return ed25519.NewKeyFromSeed(seed), nil
|
||||
}
|
||||
|
||||
func writeKeyToLocalStorage(key ed25519.PrivateKey) error {
|
||||
localforage := js.Global().Get("localforage")
|
||||
if !localforage.Truthy() {
|
||||
return fmt.Errorf("writeKeyToLocalStorage: no localforage")
|
||||
}
|
||||
|
||||
// make a Uint8Array from the key's seed
|
||||
seed := key.Seed()
|
||||
jsSeed := js.Global().Get("Uint8Array").New(len(seed))
|
||||
js.CopyBytesToJS(jsSeed, seed)
|
||||
// write it
|
||||
localforage.Call("setItem", keyNameEd25519, jsSeed)
|
||||
return nil
|
||||
}
|
||||
|
||||
// taken from https://go-review.googlesource.com/c/go/+/150917
|
||||
|
||||
// await waits until the promise v has been resolved or rejected and returns the promise's result value.
|
||||
// The boolean value ok is true if the promise has been resolved, false if it has been rejected.
|
||||
// If v is not a promise, v itself is returned as the value and ok is true.
|
||||
func await(v js.Value) (result js.Value, ok bool) {
|
||||
if v.Type() != js.TypeObject || v.Get("then").Type() != js.TypeFunction {
|
||||
return v, true
|
||||
}
|
||||
done := make(chan struct{})
|
||||
onResolve := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
result = args[0]
|
||||
ok = true
|
||||
close(done)
|
||||
return nil
|
||||
})
|
||||
defer onResolve.Release()
|
||||
onReject := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
result = args[0]
|
||||
ok = false
|
||||
close(done)
|
||||
return nil
|
||||
})
|
||||
defer onReject.Release()
|
||||
v.Call("then", onResolve, onReject)
|
||||
<-done
|
||||
return
|
||||
}
|
||||
|
||||
func generateKey() ed25519.PrivateKey {
|
||||
// attempt to look for a seed in JS-land and if it exists use it.
|
||||
priv, err := readKeyFromLocalStorage()
|
||||
if err == nil {
|
||||
fmt.Println("Read key from localStorage")
|
||||
return priv
|
||||
}
|
||||
// generate a new key
|
||||
fmt.Println(err, " : Generating new ed25519 key")
|
||||
_, priv, err = ed25519.GenerateKey(nil)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to generate ed25519 key: %s", err)
|
||||
}
|
||||
if err := writeKeyToLocalStorage(priv); err != nil {
|
||||
fmt.Println("failed to write key to localStorage: ", err)
|
||||
// non-fatal, we'll just have amnesia for a while
|
||||
}
|
||||
return priv
|
||||
}
|
||||
|
||||
func createFederationClient(cfg *config.Dendrite, node *go_http_js_libp2p.P2pLocalNode) *gomatrixserverlib.FederationClient {
|
||||
fmt.Println("Running in js-libp2p federation mode")
|
||||
fmt.Println("Warning: Federation with non-libp2p homeservers will not work in this mode yet!")
|
||||
tr := go_http_js_libp2p.NewP2pTransport(node)
|
||||
|
||||
fed := gomatrixserverlib.NewFederationClient(
|
||||
cfg.Global.ServerName, cfg.Global.KeyID, cfg.Global.PrivateKey,
|
||||
gomatrixserverlib.WithTransport(tr),
|
||||
)
|
||||
|
||||
return fed
|
||||
}
|
||||
|
||||
func createClient(node *go_http_js_libp2p.P2pLocalNode) *gomatrixserverlib.Client {
|
||||
tr := go_http_js_libp2p.NewP2pTransport(node)
|
||||
return gomatrixserverlib.NewClient(
|
||||
gomatrixserverlib.WithTransport(tr),
|
||||
)
|
||||
}
|
||||
|
||||
func createP2PNode(privKey ed25519.PrivateKey) (serverName string, node *go_http_js_libp2p.P2pLocalNode) {
|
||||
hosted := "/dns4/rendezvous.matrix.org/tcp/8443/wss/p2p-websocket-star/"
|
||||
node = go_http_js_libp2p.NewP2pLocalNode("org.matrix.p2p.experiment", privKey.Seed(), []string{hosted}, "p2p")
|
||||
serverName = node.Id
|
||||
fmt.Println("p2p assigned ServerName: ", serverName)
|
||||
return
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := &config.Dendrite{}
|
||||
cfg.Defaults(true)
|
||||
cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db"
|
||||
cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db"
|
||||
cfg.FederationAPI.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db"
|
||||
cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db"
|
||||
cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db"
|
||||
cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db"
|
||||
cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db"
|
||||
cfg.Global.JetStream.StoragePath = "file:/idb/dendritejs/"
|
||||
cfg.Global.TrustedIDServers = []string{
|
||||
"matrix.org", "vector.im",
|
||||
}
|
||||
cfg.Global.KeyID = libp2pMatrixKeyID
|
||||
cfg.Global.PrivateKey = generateKey()
|
||||
|
||||
serverName, node := createP2PNode(cfg.Global.PrivateKey)
|
||||
cfg.Global.ServerName = gomatrixserverlib.ServerName(serverName)
|
||||
|
||||
if err := cfg.Derive(); err != nil {
|
||||
logrus.Fatalf("Failed to derive values from config: %s", err)
|
||||
}
|
||||
base := setup.NewBaseDendrite(cfg, "Monolith")
|
||||
defer base.Close() // nolint: errcheck
|
||||
|
||||
accountDB := base.CreateAccountsDB()
|
||||
federation := createFederationClient(cfg, node)
|
||||
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
|
||||
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
|
||||
keyAPI.SetUserAPI(userAPI)
|
||||
|
||||
fetcher := &libp2pKeyFetcher{}
|
||||
keyRing := gomatrixserverlib.KeyRing{
|
||||
KeyFetchers: []gomatrixserverlib.KeyFetcher{
|
||||
fetcher,
|
||||
},
|
||||
KeyDatabase: fetcher,
|
||||
}
|
||||
|
||||
rsAPI := roomserver.NewInternalAPI(base)
|
||||
asQuery := appservice.NewInternalAPI(
|
||||
base, userAPI, rsAPI,
|
||||
)
|
||||
rsAPI.SetAppserviceAPI(asQuery)
|
||||
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true)
|
||||
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
|
||||
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
|
||||
|
||||
psAPI := pushserver.NewInternalAPI(base)
|
||||
|
||||
monolith := setup.Monolith{
|
||||
Config: base.Cfg,
|
||||
AccountDB: accountDB,
|
||||
Client: createClient(node),
|
||||
FedClient: federation,
|
||||
KeyRing: &keyRing,
|
||||
|
||||
AppserviceAPI: asQuery,
|
||||
FederationSenderAPI: fedSenderAPI,
|
||||
RoomserverAPI: rsAPI,
|
||||
UserAPI: userAPI,
|
||||
KeyAPI: keyAPI,
|
||||
PushserverAPI: psAPI,
|
||||
//ServerKeyAPI: serverKeyAPI,
|
||||
ExtPublicRoomsProvider: p2pPublicRoomProvider,
|
||||
}
|
||||
monolith.AddAllPublicRoutes(
|
||||
base.ProcessContext,
|
||||
base.PublicClientAPIMux,
|
||||
base.PublicFederationAPIMux,
|
||||
base.PublicKeyAPIMux,
|
||||
base.PublicMediaAPIMux,
|
||||
base.SynapseAdminMux,
|
||||
)
|
||||
|
||||
httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||
httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
|
||||
httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
|
||||
httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
|
||||
|
||||
libp2pRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||
libp2pRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux)
|
||||
libp2pRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
|
||||
|
||||
// Expose the matrix APIs via libp2p-js - for federation traffic
|
||||
if node != nil {
|
||||
go func() {
|
||||
logrus.Info("Listening on libp2p-js host ID ", node.Id)
|
||||
s := JSServer{
|
||||
Mux: libp2pRouter,
|
||||
}
|
||||
s.ListenAndServe("p2p")
|
||||
}()
|
||||
}
|
||||
|
||||
// Expose the matrix APIs via fetch - for local traffic
|
||||
go func() {
|
||||
logrus.Info("Listening for service-worker fetch traffic")
|
||||
s := JSServer{
|
||||
Mux: httpRouter,
|
||||
}
|
||||
s.ListenAndServe("fetch")
|
||||
}()
|
||||
|
||||
// We want to block forever to let the fetch and libp2p handler serve the APIs
|
||||
select {}
|
||||
}
|
|
@ -1,24 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !wasm
|
||||
// +build !wasm
|
||||
|
||||
package main
|
||||
|
||||
import "fmt"
|
||||
|
||||
func main() {
|
||||
fmt.Println("dendritejs: no-op when not compiling for WebAssembly")
|
||||
}
|
|
@ -1,155 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build wasm
|
||||
// +build wasm
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/api"
|
||||
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
)
|
||||
|
||||
type libp2pPublicRoomsProvider struct {
|
||||
node *go_http_js_libp2p.P2pLocalNode
|
||||
providers []go_http_js_libp2p.PeerInfo
|
||||
fedSender api.FederationInternalAPI
|
||||
fedClient *gomatrixserverlib.FederationClient
|
||||
}
|
||||
|
||||
func NewLibP2PPublicRoomsProvider(
|
||||
node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationInternalAPI, fedClient *gomatrixserverlib.FederationClient,
|
||||
) *libp2pPublicRoomsProvider {
|
||||
p := &libp2pPublicRoomsProvider{
|
||||
node: node,
|
||||
fedSender: fedSender,
|
||||
fedClient: fedClient,
|
||||
}
|
||||
node.RegisterFoundProviders(p.foundProviders)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p.PeerInfo) {
|
||||
// work out the diff then poke for new ones
|
||||
seen := make(map[string]bool, len(p.providers))
|
||||
for _, pr := range p.providers {
|
||||
seen[pr.Id] = true
|
||||
}
|
||||
var newPeers []gomatrixserverlib.ServerName
|
||||
for _, pi := range peerInfos {
|
||||
if !seen[pi.Id] {
|
||||
newPeers = append(newPeers, gomatrixserverlib.ServerName(pi.Id))
|
||||
}
|
||||
}
|
||||
if len(newPeers) > 0 {
|
||||
var res api.PerformServersAliveResponse
|
||||
// ignore errors, we don't care.
|
||||
p.fedSender.PerformServersAlive(context.Background(), &api.PerformServersAliveRequest{
|
||||
Servers: newPeers,
|
||||
}, &res)
|
||||
}
|
||||
|
||||
p.providers = peerInfos
|
||||
}
|
||||
|
||||
func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom {
|
||||
return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.homeservers())
|
||||
}
|
||||
|
||||
func (p *libp2pPublicRoomsProvider) homeservers() []string {
|
||||
result := make([]string, len(p.providers))
|
||||
for i := range p.providers {
|
||||
result[i] = p.providers[i].Id
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
|
||||
// Returns a list of public rooms.
|
||||
func bulkFetchPublicRoomsFromServers(
|
||||
ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string,
|
||||
) (publicRooms []gomatrixserverlib.PublicRoom) {
|
||||
limit := 200
|
||||
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
|
||||
// goroutines send rooms to this channel
|
||||
roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
|
||||
// signalling channel to tell goroutines to stop sending rooms and quit
|
||||
done := make(chan bool)
|
||||
// signalling to say when we can close the room channel
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(homeservers))
|
||||
// concurrently query for public rooms
|
||||
for _, hs := range homeservers {
|
||||
go func(homeserverDomain string) {
|
||||
defer wg.Done()
|
||||
util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
|
||||
fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "")
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
|
||||
"bulkFetchPublicRoomsFromServers: failed to query hs",
|
||||
)
|
||||
return
|
||||
}
|
||||
for _, room := range fres.Chunk {
|
||||
// atomically send a room or stop
|
||||
select {
|
||||
case roomCh <- room:
|
||||
case <-done:
|
||||
util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
|
||||
return
|
||||
}
|
||||
}
|
||||
}(hs)
|
||||
}
|
||||
|
||||
// Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
|
||||
// This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
|
||||
// closed.
|
||||
go func() {
|
||||
wg.Wait()
|
||||
util.GetLogger(ctx).Info("Cleaning up resources")
|
||||
close(roomCh)
|
||||
}()
|
||||
|
||||
// fan-in results with timeout. We stop when we reach the limit.
|
||||
FanIn:
|
||||
for len(publicRooms) < int(limit) || limit == 0 {
|
||||
// add a room or timeout
|
||||
select {
|
||||
case room, ok := <-roomCh:
|
||||
if !ok {
|
||||
util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
|
||||
break FanIn
|
||||
}
|
||||
publicRooms = append(publicRooms, room)
|
||||
case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got.
|
||||
util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early")
|
||||
break FanIn
|
||||
case <-ctx.Done(): // the client hung up on us, let's stop.
|
||||
util.GetLogger(ctx).Info("Client hung up, returning early")
|
||||
break FanIn
|
||||
}
|
||||
}
|
||||
// tell goroutines to stop
|
||||
close(done)
|
||||
|
||||
return publicRooms
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue