Support connecting to multiple Pinecone static peers in the P2P demos (supply a comma-separated list)

This commit is contained in:
Neil Alexander 2021-11-25 09:46:26 +00:00
parent 25dcf80180
commit 9bc1c36ff6
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
3 changed files with 62 additions and 27 deletions

View file

@ -13,6 +13,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
@ -75,7 +76,7 @@ func (m *DendriteMonolith) BaseURL() string {
} }
func (m *DendriteMonolith) PeerCount(peertype int) int { func (m *DendriteMonolith) PeerCount(peertype int) int {
return m.PineconeRouter.PeerCount(peertype) return m.PineconeRouter.PeerCount(pineconeRouter.ConnectionPeerType(peertype))
} }
func (m *DendriteMonolith) SessionCount() int { func (m *DendriteMonolith) SessionCount() int {
@ -87,15 +88,15 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
m.PineconeMulticast.Start() m.PineconeMulticast.Start()
} else { } else {
m.PineconeMulticast.Stop() m.PineconeMulticast.Stop()
m.DisconnectType(pineconeRouter.PeerTypeMulticast) m.DisconnectType(int(pineconeRouter.PeerTypeMulticast))
} }
} }
func (m *DendriteMonolith) SetStaticPeer(uri string) { func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Lock() m.staticPeerMutex.Lock()
m.staticPeerURI = uri m.staticPeerURI = strings.TrimSpace(uri)
m.staticPeerMutex.Unlock() m.staticPeerMutex.Unlock()
m.DisconnectType(pineconeRouter.PeerTypeRemote) m.DisconnectType(int(pineconeRouter.PeerTypeRemote))
if uri != "" { if uri != "" {
go func() { go func() {
m.staticPeerAttempt <- struct{}{} m.staticPeerAttempt <- struct{}{}
@ -105,7 +106,7 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
func (m *DendriteMonolith) DisconnectType(peertype int) { func (m *DendriteMonolith) DisconnectType(peertype int) {
for _, p := range m.PineconeRouter.Peers() { for _, p := range m.PineconeRouter.Peers() {
if peertype == p.PeerType { if int(peertype) == p.PeerType {
m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil) m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil)
} }
} }
@ -133,7 +134,11 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error)
for i := 1; i <= 10; i++ { for i := 1; i <= 10; i++ {
logrus.Errorf("Attempting authenticated connect (attempt %d)", i) logrus.Errorf("Attempting authenticated connect (attempt %d)", i)
var err error var err error
conduit.port, err = m.PineconeRouter.AuthenticatedConnect(l, zone, peertype, true) conduit.port, err = m.PineconeRouter.Connect(
l,
pineconeRouter.ConnectionZone(zone),
pineconeRouter.ConnectionPeerType(peertype),
)
switch err { switch err {
case io.ErrClosedPipe: case io.ErrClosedPipe:
logrus.Errorf("Authenticated connect failed due to closed pipe (attempt %d)", i) logrus.Errorf("Authenticated connect failed due to closed pipe (attempt %d)", i)
@ -195,19 +200,31 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
} }
func (m *DendriteMonolith) staticPeerConnect() { func (m *DendriteMonolith) staticPeerConnect() {
connected := map[string]bool{} // URI -> connected?
attempt := func() { attempt := func() {
if m.PineconeRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
m.staticPeerMutex.RLock() m.staticPeerMutex.RLock()
uri := m.staticPeerURI uri := m.staticPeerURI
m.staticPeerMutex.RUnlock() m.staticPeerMutex.RUnlock()
if uri == "" { if uri == "" {
return return
} }
if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil { for k := range connected {
connected[k] = false
}
for _, uri := range strings.Split(uri, ",") {
connected[strings.TrimSpace(uri)] = false
}
for _, info := range m.PineconeRouter.Peers() {
connected[info.URI] = true
}
for k, online := range connected {
if !online {
if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer") logrus.WithError(err).Error("Failed to connect to static peer")
} }
} }
} }
}
for { for {
select { select {
case <-m.processContext.Context().Done(): case <-m.processContext.Context().Done():

View file

@ -34,7 +34,12 @@ func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
if parent == nil { if parent == nil {
return fmt.Errorf("failed to wrap connection") return fmt.Errorf("failed to wrap connection")
} }
_, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote, true) _, err := pRouter.Connect(
parent,
pineconeRouter.ConnectionZone("static"),
pineconeRouter.PeerTypeRemote,
pineconeRouter.ConnectionURI(peer),
)
return err return err
} }

View file

@ -26,6 +26,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
@ -61,7 +62,7 @@ import (
var ( var (
instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance") instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance")
instancePort = flag.Int("port", 8008, "the port that the client API will listen on") instancePort = flag.Int("port", 8008, "the port that the client API will listen on")
instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to") instancePeer = flag.String("peer", "", "the static Pinecone peers to connect to, comma separated-list")
instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to") instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to")
) )
@ -109,9 +110,9 @@ func main() {
continue continue
} }
port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote, true) port, err := pRouter.Connect(conn, pineconeRouter.PeerTypeRemote)
if err != nil { if err != nil {
logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed") logrus.WithError(err).Error("pSwitch.Connect failed")
continue continue
} }
@ -124,17 +125,25 @@ func main() {
pMulticast.Start() pMulticast.Start()
connectToStaticPeer := func() { connectToStaticPeer := func() {
attempt := func() { connected := map[string]bool{} // URI -> connected?
if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 { for _, uri := range strings.Split(*instancePeer, ",") {
uri := *instancePeer connected[strings.TrimSpace(uri)] = false
if uri == "" {
return
} }
if err := conn.ConnectToPeer(pRouter, uri); err != nil { attempt := func() {
for k := range connected {
connected[k] = false
}
for _, info := range pRouter.Peers() {
connected[info.URI] = true
}
for k, online := range connected {
if !online {
if err := conn.ConnectToPeer(pRouter, k); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer") logrus.WithError(err).Error("Failed to connect to static peer")
} }
} }
} }
}
for { for {
attempt() attempt()
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
@ -230,7 +239,11 @@ func main() {
return return
} }
conn := conn.WrapWebSocketConn(c) conn := conn.WrapWebSocketConn(c)
if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote, true); err != nil { if _, err = pRouter.Connect(
conn,
pineconeRouter.ConnectionZone("websocket"),
pineconeRouter.PeerTypeRemote,
); err != nil {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch") logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
} }
}) })