WIP: Add libp2p-go (#956)

* Add libp2p-go

* Some tweaks, tidying up

(cherry picked from commit 1a5bb121f8121c4f68a27abbf25a9a35a1b7c63e)

* Move p2p dockerfile

(cherry picked from commit 8d3bf44ea1bf37f950034e73bcdc315afdabe79a)

* Remove containsBackwardsExtremity

* Fix some linter errors, update some libp2p packages/calls, other tidying up

* Add -port for dendrite-p2p-demo

* Use instance name as key ID

* Remove P2P demo docker stuff, no longer needed now that we have SQLite

* Remove Dockerfile-p2p too

* Remove p2p logic from dendrite-monolith-server

* Inject publicRoomsDB in publicroomsapi

Inject publicRoomsDB instead of switching on base.libP2P.
See: https://github.com/matrix-org/dendrite/pull/956/files?file-filters%5B%5D=.go#r406276914

* Fix lint warning

* Extract mDNSListener from base.go

* Extract CreateFederationClient into demo

* Create P2PDendrite from BaseDendrite

Extract logic specific to P2PDendrite from base.go

* Set base.go to upstream/master

* Move pubsub to demo cmd

* Move PostgreswithDHT to cmd

* Remove unstable features

* Add copyrights

* Move libp2pvalidator into p2pdendrite

* Rename dendrite-p2p-demo -> dendrite-demo-libp2p

* Update copyrights

* go mod tidy

Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
Hilmar Gústafsson 2020-04-14 17:15:59 +02:00 committed by GitHub
parent 48303d06cb
commit 73d2f59e30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 1186 additions and 16 deletions

View file

@ -0,0 +1,203 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"crypto/ed25519"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"time"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/common/transactions"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)
func createKeyDB(
base *P2PDendrite,
) keydb.Database {
db, err := keydb.NewDatabase(
string(base.Base.Cfg.Database.ServerKey),
base.Base.Cfg.Matrix.ServerName,
base.Base.Cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey),
base.Base.Cfg.Matrix.KeyID,
)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to keys db")
}
mdns := mDNSListener{
host: base.LibP2P,
keydb: db,
}
serv, err := p2pdisc.NewMdnsService(
base.LibP2PContext,
base.LibP2P,
time.Second*10,
"_matrix-dendrite-p2p._tcp",
)
if err != nil {
panic(err)
}
serv.RegisterNotifee(&mdns)
return db
}
func createFederationClient(
base *P2PDendrite,
) *gomatrixserverlib.FederationClient {
fmt.Println("Running in libp2p federation mode")
fmt.Println("Warning: Federation with non-libp2p homeservers will not work in this mode yet!")
tr := &http.Transport{}
tr.RegisterProtocol(
"matrix",
p2phttp.NewTransport(base.LibP2P, p2phttp.ProtocolOption("/matrix")),
)
return gomatrixserverlib.NewFederationClientWithTransport(
base.Base.Cfg.Matrix.ServerName, base.Base.Cfg.Matrix.KeyID, base.Base.Cfg.Matrix.PrivateKey, tr,
)
}
func main() {
instanceName := flag.String("name", "dendrite-p2p", "the name of this P2P demo instance")
instancePort := flag.Int("port", 8080, "the port that the client API will listen on")
flag.Parse()
filename := fmt.Sprintf("%s-private.key", *instanceName)
_, err := os.Stat(filename)
var privKey ed25519.PrivateKey
if os.IsNotExist(err) {
_, privKey, _ = ed25519.GenerateKey(nil)
if err = ioutil.WriteFile(filename, privKey, 0600); err != nil {
fmt.Printf("Couldn't write private key to file '%s': %s\n", filename, err)
}
} else {
privKey, err = ioutil.ReadFile(filename)
if err != nil {
fmt.Printf("Couldn't read private key from file '%s': %s\n", filename, err)
_, privKey, _ = ed25519.GenerateKey(nil)
}
}
cfg := config.Dendrite{}
cfg.Matrix.ServerName = "p2p"
cfg.Matrix.PrivateKey = privKey
cfg.Matrix.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName))
cfg.Kafka.UseNaffka = true
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
cfg.Kafka.Topics.OutputClientData = "clientapiOutput"
cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput"
cfg.Kafka.Topics.UserUpdates = "userUpdates"
cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName))
cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName))
cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
if err = cfg.Derive(); err != nil {
panic(err)
}
base := NewP2PDendrite(&cfg, "Monolith")
defer base.Base.Close() // nolint: errcheck
accountDB := base.Base.CreateAccountsDB()
deviceDB := base.Base.CreateDeviceDB()
keyDB := createKeyDB(base)
federation := createFederationClient(base)
keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
alias, input, query := roomserver.SetupRoomServerComponent(&base.Base)
eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New())
asQuery := appservice.SetupAppServiceAPIComponent(
&base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
)
fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query)
clientapi.SetupClientAPIComponent(
&base.Base, deviceDB, accountDB,
federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
}
publicroomsapi.SetupPublicRoomsAPIComponent(&base.Base, deviceDB, publicRoomsDB, query, federation, nil) // Check this later
syncapi.SetupSyncAPIComponent(&base.Base, deviceDB, accountDB, query, federation, &cfg)
httpHandler := common.WrapHandlerInCORS(base.Base.APIMux)
// Set up the API endpoints we handle. /metrics is for prometheus, and is
// not wrapped by CORS, while everything else is
http.Handle("/metrics", promhttp.Handler())
http.Handle("/", httpHandler)
// Expose the matrix APIs directly rather than putting them under a /api path.
go func() {
httpBindAddr := fmt.Sprintf(":%d", *instancePort)
logrus.Info("Listening on ", httpBindAddr)
logrus.Fatal(http.ListenAndServe(httpBindAddr, nil))
}()
// Expose the matrix APIs also via libp2p
if base.LibP2P != nil {
go func() {
logrus.Info("Listening on libp2p host ID ", base.LibP2P.ID())
listener, err := gostream.Listen(base.LibP2P, "/matrix")
if err != nil {
panic(err)
}
defer func() {
logrus.Fatal(listener.Close())
}()
logrus.Fatal(http.Serve(listener, nil))
}()
}
// We want to block forever to let the HTTP and HTTPS handler serve the APIs
select {}
}

View file

@ -0,0 +1,63 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"math"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/gomatrixserverlib"
)
type mDNSListener struct {
keydb keydb.Database
host host.Host
}
func (n *mDNSListener) HandlePeerFound(p peer.AddrInfo) {
if err := n.host.Connect(context.Background(), p); err != nil {
fmt.Println("Error adding peer", p.ID.String(), "via mDNS:", err)
}
if pubkey, err := p.ID.ExtractPublicKey(); err == nil {
raw, _ := pubkey.Raw()
if err := n.keydb.StoreKeys(
context.Background(),
map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{
{
ServerName: gomatrixserverlib.ServerName(p.ID.String()),
KeyID: "ed25519:p2pdemo",
}: {
VerifyKey: gomatrixserverlib.VerifyKey{
Key: gomatrixserverlib.Base64String(raw),
},
ValidUntilTS: math.MaxUint64 >> 1,
ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
},
},
); err != nil {
fmt.Println("Failed to store keys:", err)
}
}
fmt.Println("Discovered", len(n.host.Peerstore().Peers())-1, "other libp2p peer(s):")
for _, peer := range n.host.Peerstore().Peers() {
if peer != n.host.ID() {
fmt.Println("-", peer)
}
}
}

View file

@ -0,0 +1,126 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"errors"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
record "github.com/libp2p/go-libp2p-record"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/libp2p/go-libp2p"
circuit "github.com/libp2p/go-libp2p-circuit"
crypto "github.com/libp2p/go-libp2p-core/crypto"
routing "github.com/libp2p/go-libp2p-core/routing"
host "github.com/libp2p/go-libp2p-core/host"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/common/config"
)
// P2PDendrite is a Peer-to-Peer variant of BaseDendrite.
type P2PDendrite struct {
Base basecomponent.BaseDendrite
// Store our libp2p object so that we can make outgoing connections from it
// later
LibP2P host.Host
LibP2PContext context.Context
LibP2PCancel context.CancelFunc
LibP2PDHT *dht.IpfsDHT
LibP2PPubsub *pubsub.PubSub
}
// NewP2PDendrite creates a new instance to be used by a component.
// The componentName is used for logging purposes, and should be a friendly name
// of the component running, e.g. SyncAPI.
func NewP2PDendrite(cfg *config.Dendrite, componentName string) *P2PDendrite {
baseDendrite := basecomponent.NewBaseDendrite(cfg, componentName)
ctx, cancel := context.WithCancel(context.Background())
privKey, err := crypto.UnmarshalEd25519PrivateKey(cfg.Matrix.PrivateKey[:])
if err != nil {
panic(err)
}
//defaultIP6ListenAddr, _ := multiaddr.NewMultiaddr("/ip6/::/tcp/0")
var libp2pdht *dht.IpfsDHT
libp2p, err := libp2p.New(ctx,
libp2p.Identity(privKey),
libp2p.DefaultListenAddrs,
//libp2p.ListenAddrs(defaultIP6ListenAddr),
libp2p.DefaultTransports,
libp2p.Routing(func(h host.Host) (r routing.PeerRouting, err error) {
libp2pdht, err = dht.New(ctx, h)
if err != nil {
return nil, err
}
libp2pdht.Validator = libP2PValidator{}
r = libp2pdht
return
}),
libp2p.EnableAutoRelay(),
libp2p.EnableRelay(circuit.OptHop),
)
if err != nil {
panic(err)
}
libp2ppubsub, err := pubsub.NewFloodSub(context.Background(), libp2p, []pubsub.Option{
pubsub.WithMessageSigning(true),
}...)
if err != nil {
panic(err)
}
fmt.Println("Our public key:", privKey.GetPublic())
fmt.Println("Our node ID:", libp2p.ID())
fmt.Println("Our addresses:", libp2p.Addrs())
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(libp2p.ID().String())
return &P2PDendrite{
Base: *baseDendrite,
LibP2P: libp2p,
LibP2PContext: ctx,
LibP2PCancel: cancel,
LibP2PDHT: libp2pdht,
LibP2PPubsub: libp2ppubsub,
}
}
type libP2PValidator struct {
KeyBook pstore.KeyBook
}
func (v libP2PValidator) Validate(key string, value []byte) error {
ns, _, err := record.SplitKey(key)
if err != nil || ns != "matrix" {
return errors.New("not Matrix path")
}
return nil
}
func (v libP2PValidator) Select(k string, vals [][]byte) (int, error) {
return 0, nil
}

View file

@ -0,0 +1,164 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgreswithdht
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
"github.com/matrix-org/gomatrixserverlib"
dht "github.com/libp2p/go-libp2p-kad-dht"
)
const DHTInterval = time.Second * 10
// PublicRoomsServerDatabase represents a public rooms server database.
type PublicRoomsServerDatabase struct {
dht *dht.IpfsDHT
postgres.PublicRoomsServerDatabase
ourRoomsContext context.Context // our current value in the DHT
ourRoomsCancel context.CancelFunc // cancel when we want to expire our value
foundRooms map[string]gomatrixserverlib.PublicRoom // additional rooms we have learned about from the DHT
foundRoomsMutex sync.RWMutex // protects foundRooms
maintenanceTimer *time.Timer //
roomsAdvertised atomic.Value // stores int
roomsDiscovered atomic.Value // stores int
}
// NewPublicRoomsServerDatabase creates a new public rooms server database.
func NewPublicRoomsServerDatabase(dataSourceName string, dht *dht.IpfsDHT) (*PublicRoomsServerDatabase, error) {
pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName)
if err != nil {
return nil, err
}
provider := PublicRoomsServerDatabase{
dht: dht,
PublicRoomsServerDatabase: *pg,
}
go provider.ResetDHTMaintenance()
provider.roomsAdvertised.Store(0)
provider.roomsDiscovered.Store(0)
return &provider, nil
}
func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) {
return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID)
}
func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error {
d.ResetDHTMaintenance()
return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID)
}
func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
count, err := d.PublicRoomsServerDatabase.CountPublicRooms(ctx)
if err != nil {
return 0, err
}
d.foundRoomsMutex.RLock()
defer d.foundRoomsMutex.RUnlock()
return count + int64(len(d.foundRooms)), nil
}
func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) {
realfilter := filter
if realfilter == "__local__" {
realfilter = ""
}
rooms, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, realfilter)
if err != nil {
return []gomatrixserverlib.PublicRoom{}, err
}
if filter != "__local__" {
d.foundRoomsMutex.RLock()
defer d.foundRoomsMutex.RUnlock()
for _, room := range d.foundRooms {
rooms = append(rooms, room)
}
}
return rooms, nil
}
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error {
return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove)
}
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error {
return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event)
}
func (d *PublicRoomsServerDatabase) ResetDHTMaintenance() {
if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() {
<-d.maintenanceTimer.C
}
d.Interval()
}
func (d *PublicRoomsServerDatabase) Interval() {
if err := d.AdvertiseRoomsIntoDHT(); err != nil {
// fmt.Println("Failed to advertise room in DHT:", err)
}
if err := d.FindRoomsInDHT(); err != nil {
// fmt.Println("Failed to find rooms in DHT:", err)
}
fmt.Println("Found", d.roomsDiscovered.Load(), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)")
d.maintenanceTimer = time.AfterFunc(DHTInterval, d.Interval)
}
func (d *PublicRoomsServerDatabase) AdvertiseRoomsIntoDHT() error {
dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second)
_ = dbCancel
ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__")
if err != nil {
return err
}
if j, err := json.Marshal(ourRooms); err == nil {
d.roomsAdvertised.Store(len(ourRooms))
d.ourRoomsContext, d.ourRoomsCancel = context.WithCancel(context.Background())
if err := d.dht.PutValue(d.ourRoomsContext, "/matrix/publicRooms", j); err != nil {
return err
}
}
return nil
}
func (d *PublicRoomsServerDatabase) FindRoomsInDHT() error {
d.foundRoomsMutex.Lock()
searchCtx, searchCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer searchCancel()
defer d.foundRoomsMutex.Unlock()
results, err := d.dht.GetValues(searchCtx, "/matrix/publicRooms", 1024)
if err != nil {
return err
}
d.foundRooms = make(map[string]gomatrixserverlib.PublicRoom)
for _, result := range results {
var received []gomatrixserverlib.PublicRoom
if err := json.Unmarshal(result.Val, &received); err != nil {
return err
}
for _, room := range received {
d.foundRooms[room.RoomID] = room
}
}
d.roomsDiscovered.Store(len(d.foundRooms))
return nil
}

View file

@ -0,0 +1,179 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgreswithpubsub
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
"github.com/matrix-org/gomatrixserverlib"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
const MaintenanceInterval = time.Second * 10
type discoveredRoom struct {
time time.Time
room gomatrixserverlib.PublicRoom
}
// PublicRoomsServerDatabase represents a public rooms server database.
type PublicRoomsServerDatabase struct {
postgres.PublicRoomsServerDatabase //
pubsub *pubsub.PubSub //
subscription *pubsub.Subscription //
foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT
foundRoomsMutex sync.RWMutex // protects foundRooms
maintenanceTimer *time.Timer //
roomsAdvertised atomic.Value // stores int
}
// NewPublicRoomsServerDatabase creates a new public rooms server database.
func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub) (*PublicRoomsServerDatabase, error) {
pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName)
if err != nil {
return nil, err
}
provider := PublicRoomsServerDatabase{
pubsub: pubsub,
PublicRoomsServerDatabase: *pg,
foundRooms: make(map[string]discoveredRoom),
}
if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil {
return nil, err
} else if sub, err := topic.Subscribe(); err == nil {
provider.subscription = sub
go provider.MaintenanceTimer()
go provider.FindRooms()
provider.roomsAdvertised.Store(0)
return &provider, nil
} else {
return nil, err
}
}
func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) {
return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID)
}
func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error {
d.MaintenanceTimer()
return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID)
}
func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
d.foundRoomsMutex.RLock()
defer d.foundRoomsMutex.RUnlock()
return int64(len(d.foundRooms)), nil
}
func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) {
var rooms []gomatrixserverlib.PublicRoom
if filter == "__local__" {
if r, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, ""); err == nil {
rooms = append(rooms, r...)
} else {
return []gomatrixserverlib.PublicRoom{}, err
}
} else {
d.foundRoomsMutex.RLock()
defer d.foundRoomsMutex.RUnlock()
for _, room := range d.foundRooms {
rooms = append(rooms, room.room)
}
}
return rooms, nil
}
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error {
return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove)
}
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error {
return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event)
}
func (d *PublicRoomsServerDatabase) MaintenanceTimer() {
if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() {
<-d.maintenanceTimer.C
}
d.Interval()
}
func (d *PublicRoomsServerDatabase) Interval() {
d.foundRoomsMutex.Lock()
for k, v := range d.foundRooms {
if time.Since(v.time) > time.Minute {
delete(d.foundRooms, k)
}
}
d.foundRoomsMutex.Unlock()
if err := d.AdvertiseRooms(); err != nil {
fmt.Println("Failed to advertise room in DHT:", err)
}
d.foundRoomsMutex.RLock()
defer d.foundRoomsMutex.RUnlock()
fmt.Println("Found", len(d.foundRooms), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)")
d.maintenanceTimer = time.AfterFunc(MaintenanceInterval, d.Interval)
}
func (d *PublicRoomsServerDatabase) AdvertiseRooms() error {
dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second)
_ = dbCancel
ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__")
if err != nil {
return err
}
advertised := 0
for _, room := range ourRooms {
if j, err := json.Marshal(room); err == nil {
if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil {
fmt.Println("Failed to subscribe to topic:", err)
} else if err := topic.Publish(context.TODO(), j); err != nil {
fmt.Println("Failed to publish public room:", err)
} else {
advertised++
}
}
}
d.roomsAdvertised.Store(advertised)
return nil
}
func (d *PublicRoomsServerDatabase) FindRooms() {
for {
msg, err := d.subscription.Next(context.Background())
if err != nil {
continue
}
received := discoveredRoom{
time: time.Now(),
}
if err := json.Unmarshal(msg.Data, &received.room); err != nil {
fmt.Println("Unmarshal error:", err)
continue
}
d.foundRoomsMutex.Lock()
d.foundRooms[received.room.RoomID] = received
d.foundRoomsMutex.Unlock()
}
}

View file

@ -0,0 +1,61 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"net/url"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage/postgreswithdht"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3"
)
const schemePostgres = "postgres"
const schemeFile = "file"
// NewPublicRoomsServerDatabase opens a database connection.
func NewPublicRoomsServerDatabaseWithDHT(dataSourceName string, dht *dht.IpfsDHT) (storage.Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht)
}
switch uri.Scheme {
case schemePostgres:
return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht)
case schemeFile:
return sqlite3.NewPublicRoomsServerDatabase(dataSourceName)
default:
return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht)
}
}
// NewPublicRoomsServerDatabase opens a database connection.
func NewPublicRoomsServerDatabaseWithPubSub(dataSourceName string, pubsub *pubsub.PubSub) (storage.Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)
}
switch uri.Scheme {
case schemePostgres:
return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)
case schemeFile:
return sqlite3.NewPublicRoomsServerDatabase(dataSourceName)
default:
return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)
}
}

View file

@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -71,7 +72,11 @@ func main() {
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
}
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, federation, nil)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
httpHandler := common.WrapHandlerInCORS(base.APIMux)

View file

@ -17,6 +17,8 @@ package main
import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/sirupsen/logrus"
)
func main() {
@ -27,8 +29,11 @@ func main() {
deviceDB := base.CreateDeviceDB()
_, _, query := base.CreateHTTPRoomserverAPIs()
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, nil, nil)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
}
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, nil, nil)
base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI))

View file

@ -34,6 +34,7 @@ import (
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p"
@ -137,7 +138,11 @@ func main() {
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
}
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, federation, p2pPublicRoomProvider)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
httpHandler := common.WrapHandlerInCORS(base.APIMux)