mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Yggdrasil demo updates (#1241)
* PerformServersAlive in PerformBroadcastEDU * Don't double-pointer * More reliable QUIC session handling * Direct peer lookup, other tweaks * Tweaks * Try to wake up queues on incoming QUIC session * Set session callbak on gobind build * Fix incoming session storage * Stateless reset, other tweaks * Reset sessions when coordinates change * Disable HTTP connection reuse, tweak timeouts
This commit is contained in:
parent
642f9cb964
commit
b7491aae03
8 changed files with 271 additions and 89 deletions
|
@ -24,9 +24,11 @@ func (n *Node) CreateClient(
|
|||
tr.RegisterProtocol(
|
||||
"matrix", &yggroundtripper{
|
||||
inner: &http.Transport{
|
||||
TLSHandshakeTimeout: 20 * time.Second,
|
||||
MaxIdleConns: -1,
|
||||
MaxIdleConnsPerHost: -1,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ResponseHeaderTimeout: 10 * time.Second,
|
||||
IdleConnTimeout: 60 * time.Second,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
DialContext: n.DialerContext,
|
||||
},
|
||||
},
|
||||
|
@ -41,9 +43,11 @@ func (n *Node) CreateFederationClient(
|
|||
tr.RegisterProtocol(
|
||||
"matrix", &yggroundtripper{
|
||||
inner: &http.Transport{
|
||||
TLSHandshakeTimeout: 20 * time.Second,
|
||||
MaxIdleConns: -1,
|
||||
MaxIdleConnsPerHost: -1,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ResponseHeaderTimeout: 10 * time.Second,
|
||||
IdleConnTimeout: 60 * time.Second,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
DialContext: n.DialerContext,
|
||||
TLSClientConfig: n.tlsConfig,
|
||||
},
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/lucas-clemente/quic-go"
|
||||
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"go.uber.org/atomic"
|
||||
|
||||
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
|
||||
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
|
||||
|
@ -41,17 +42,20 @@ import (
|
|||
)
|
||||
|
||||
type Node struct {
|
||||
core *yggdrasil.Core
|
||||
config *yggdrasilconfig.NodeConfig
|
||||
state *yggdrasilconfig.NodeState
|
||||
multicast *yggdrasilmulticast.Multicast
|
||||
log *gologme.Logger
|
||||
listener quic.Listener
|
||||
tlsConfig *tls.Config
|
||||
quicConfig *quic.Config
|
||||
sessions sync.Map // string -> quic.Session
|
||||
incoming chan QUICStream
|
||||
NewSession func(remote gomatrixserverlib.ServerName)
|
||||
core *yggdrasil.Core
|
||||
config *yggdrasilconfig.NodeConfig
|
||||
state *yggdrasilconfig.NodeState
|
||||
multicast *yggdrasilmulticast.Multicast
|
||||
log *gologme.Logger
|
||||
listener quic.Listener
|
||||
tlsConfig *tls.Config
|
||||
quicConfig *quic.Config
|
||||
sessions sync.Map // string -> *session
|
||||
sessionCount atomic.Uint32
|
||||
sessionFunc func(address string)
|
||||
coords sync.Map // string -> yggdrasil.Coords
|
||||
incoming chan QUICStream
|
||||
NewSession func(remote gomatrixserverlib.ServerName)
|
||||
}
|
||||
|
||||
func (n *Node) Dialer(_, address string) (net.Conn, error) {
|
||||
|
@ -90,6 +94,19 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
|
|||
}
|
||||
}
|
||||
|
||||
n.core.SetCoordChangeCallback(func(old, new yggdrasil.Coords) {
|
||||
fmt.Println("COORDINATE CHANGE!")
|
||||
fmt.Println("Old:", old)
|
||||
fmt.Println("New:", new)
|
||||
n.sessions.Range(func(k, v interface{}) bool {
|
||||
if s, ok := v.(*session); ok {
|
||||
fmt.Println("Killing session", k)
|
||||
s.kill()
|
||||
}
|
||||
return true
|
||||
})
|
||||
})
|
||||
|
||||
n.config.Peers = []string{}
|
||||
n.config.AdminListen = "none"
|
||||
n.config.MulticastInterfaces = []string{}
|
||||
|
@ -124,8 +141,9 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
|
|||
MaxIncomingUniStreams: 0,
|
||||
KeepAlive: true,
|
||||
MaxIdleTimeout: time.Minute * 30,
|
||||
HandshakeTimeout: time.Second * 30,
|
||||
HandshakeTimeout: time.Second * 15,
|
||||
}
|
||||
copy(n.quicConfig.StatelessResetKey, n.EncryptionPublicKey())
|
||||
|
||||
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
|
||||
n.log.Println("Public ed25519:", n.core.SigningPublicKey())
|
||||
|
@ -173,17 +191,25 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey {
|
|||
return ed25519.PrivateKey(privBytes)
|
||||
}
|
||||
|
||||
func (n *Node) SetSessionFunc(f func(address string)) {
|
||||
n.sessionFunc = f
|
||||
}
|
||||
|
||||
func (n *Node) PeerCount() int {
|
||||
return len(n.core.GetPeers()) - 1
|
||||
}
|
||||
|
||||
func (n *Node) SessionCount() int {
|
||||
return int(n.sessionCount.Load())
|
||||
}
|
||||
|
||||
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
|
||||
nodemap := map[string]struct{}{
|
||||
"b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{},
|
||||
"b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": {},
|
||||
}
|
||||
/*
|
||||
for _, peer := range n.core.GetSwitchPeers() {
|
||||
nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
|
||||
nodemap[hex.EncodeToString(peer.PublicKey[:])] = struct{}{}
|
||||
}
|
||||
*/
|
||||
n.sessions.Range(func(_, v interface{}) bool {
|
||||
|
|
|
@ -31,8 +31,32 @@ import (
|
|||
|
||||
"github.com/lucas-clemente/quic-go"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
|
||||
"github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
|
||||
)
|
||||
|
||||
type session struct {
|
||||
node *Node
|
||||
session quic.Session
|
||||
address string
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (n *Node) newSession(sess quic.Session, address string) *session {
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
return &session{
|
||||
node: n,
|
||||
session: sess,
|
||||
address: address,
|
||||
context: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *session) kill() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (n *Node) listenFromYgg() {
|
||||
var err error
|
||||
n.listener, err = quic.Listen(
|
||||
|
@ -55,22 +79,31 @@ func (n *Node) listenFromYgg() {
|
|||
_ = session.CloseWithError(0, "expected a peer certificate")
|
||||
continue
|
||||
}
|
||||
address := session.ConnectionState().PeerCertificates[0].Subject.CommonName
|
||||
address := session.ConnectionState().PeerCertificates[0].DNSNames[0]
|
||||
n.log.Infoln("Accepted connection from", address)
|
||||
go n.listenFromQUIC(session, address)
|
||||
go n.newSession(session, address).listenFromQUIC()
|
||||
go n.sessionFunc(address)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Node) listenFromQUIC(session quic.Session, address string) {
|
||||
n.sessions.Store(address, session)
|
||||
defer n.sessions.Delete(address)
|
||||
func (s *session) listenFromQUIC() {
|
||||
if existing, ok := s.node.sessions.Load(s.address); ok {
|
||||
if existingSession, ok := existing.(*session); ok {
|
||||
fmt.Println("Killing existing session to replace", s.address)
|
||||
existingSession.kill()
|
||||
}
|
||||
}
|
||||
s.node.sessionCount.Inc()
|
||||
s.node.sessions.Store(s.address, s)
|
||||
defer s.node.sessions.Delete(s.address)
|
||||
defer s.node.sessionCount.Dec()
|
||||
for {
|
||||
st, err := session.AcceptStream(context.TODO())
|
||||
st, err := s.session.AcceptStream(s.context)
|
||||
if err != nil {
|
||||
n.log.Println("session.AcceptStream:", err)
|
||||
s.node.log.Println("session.AcceptStream:", err)
|
||||
return
|
||||
}
|
||||
n.incoming <- QUICStream{st, session}
|
||||
s.node.incoming <- QUICStream{st, s.session}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,53 +128,124 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
|
|||
}
|
||||
|
||||
// Implements http.Transport.DialContext
|
||||
// nolint:gocyclo
|
||||
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
s, ok1 := n.sessions.Load(address)
|
||||
session, ok2 := s.(quic.Session)
|
||||
if !ok1 || !ok2 || (ok1 && ok2 && session.ConnectionState().HandshakeComplete) {
|
||||
dest, err := hex.DecodeString(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(dest) != crypto.BoxPubKeyLen {
|
||||
return nil, errors.New("invalid key length supplied")
|
||||
}
|
||||
var pubKey crypto.BoxPubKey
|
||||
copy(pubKey[:], dest)
|
||||
nodeID := crypto.GetNodeID(&pubKey)
|
||||
nodeMask := &crypto.NodeID{}
|
||||
for i := range nodeMask {
|
||||
nodeMask[i] = 0xFF
|
||||
session, ok2 := s.(*session)
|
||||
if !ok1 || !ok2 {
|
||||
// First of all, check if we think we know the coords of this
|
||||
// node. If we do then we'll try to dial to it directly. This
|
||||
// will either succeed or fail.
|
||||
if v, ok := n.coords.Load(address); ok {
|
||||
coords, ok := v.(yggdrasil.Coords)
|
||||
if !ok {
|
||||
n.coords.Delete(address)
|
||||
return nil, errors.New("should have found yggdrasil.Coords but didn't")
|
||||
}
|
||||
n.log.Infof("Coords %s for %q cached, trying to dial", coords.String(), address)
|
||||
var err error
|
||||
// We think we know the coords. Try to dial the node.
|
||||
if session, err = n.tryDial(address, coords); err != nil {
|
||||
// We thought we knew the coords but it didn't result
|
||||
// in a successful dial. Nuke them from the cache.
|
||||
n.coords.Delete(address)
|
||||
n.log.Infof("Cached coords %s for %q failed", coords.String(), address)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("Resolving coords")
|
||||
coords, err := n.core.Resolve(nodeID, nodeMask)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("n.core.Resolve: %w", err)
|
||||
}
|
||||
fmt.Println("Found coords:", coords)
|
||||
fmt.Println("Dialling")
|
||||
// We either don't know the coords for the node, or we failed
|
||||
// to dial it before, in which case try to resolve the coords.
|
||||
if _, ok := n.coords.Load(address); !ok {
|
||||
var coords yggdrasil.Coords
|
||||
var err error
|
||||
|
||||
session, err = quic.Dial(
|
||||
n.core, // yggdrasil.PacketConn
|
||||
coords, // dial address
|
||||
address, // dial SNI
|
||||
n.tlsConfig, // TLS config
|
||||
n.quicConfig, // QUIC config
|
||||
)
|
||||
if err != nil {
|
||||
n.log.Println("n.dialer.DialContext:", err)
|
||||
return nil, err
|
||||
// First look and see if the node is something that we already
|
||||
// know about from our direct switch peers.
|
||||
for _, peer := range n.core.GetSwitchPeers() {
|
||||
if peer.PublicKey.String() == address {
|
||||
coords = peer.Coords
|
||||
n.log.Infof("%q is a direct peer, coords are %s", address, coords.String())
|
||||
n.coords.Store(address, coords)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If it isn' a node that we know directly then try to search
|
||||
// the network.
|
||||
if coords == nil {
|
||||
n.log.Infof("Searching for coords for %q", address)
|
||||
dest, derr := hex.DecodeString(address)
|
||||
if derr != nil {
|
||||
return nil, derr
|
||||
}
|
||||
if len(dest) != crypto.BoxPubKeyLen {
|
||||
return nil, errors.New("invalid key length supplied")
|
||||
}
|
||||
var pubKey crypto.BoxPubKey
|
||||
copy(pubKey[:], dest)
|
||||
nodeID := crypto.GetNodeID(&pubKey)
|
||||
nodeMask := &crypto.NodeID{}
|
||||
for i := range nodeMask {
|
||||
nodeMask[i] = 0xFF
|
||||
}
|
||||
|
||||
fmt.Println("Resolving coords")
|
||||
coords, err = n.core.Resolve(nodeID, nodeMask)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("n.core.Resolve: %w", err)
|
||||
}
|
||||
fmt.Println("Found coords:", coords)
|
||||
n.coords.Store(address, coords)
|
||||
}
|
||||
|
||||
// We now know the coords in theory. Let's try dialling the
|
||||
// node again.
|
||||
if session, err = n.tryDial(address, coords); err != nil {
|
||||
return nil, fmt.Errorf("n.tryDial: %w", err)
|
||||
}
|
||||
}
|
||||
fmt.Println("Dial OK")
|
||||
go n.listenFromQUIC(session, address)
|
||||
}
|
||||
st, err := session.OpenStream()
|
||||
|
||||
if session == nil {
|
||||
return nil, fmt.Errorf("should have found session but didn't")
|
||||
}
|
||||
|
||||
st, err := session.session.OpenStream()
|
||||
if err != nil {
|
||||
n.log.Println("session.OpenStream:", err)
|
||||
_ = session.session.CloseWithError(0, "expected to be able to open session")
|
||||
return nil, err
|
||||
}
|
||||
return QUICStream{st, session}, nil
|
||||
return QUICStream{st, session.session}, nil
|
||||
}
|
||||
|
||||
func (n *Node) tryDial(address string, coords yggdrasil.Coords) (*session, error) {
|
||||
quicSession, err := quic.Dial(
|
||||
n.core, // yggdrasil.PacketConn
|
||||
coords, // dial address
|
||||
address, // dial SNI
|
||||
n.tlsConfig, // TLS config
|
||||
n.quicConfig, // QUIC config
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(quicSession.ConnectionState().PeerCertificates) != 1 {
|
||||
_ = quicSession.CloseWithError(0, "expected a peer certificate")
|
||||
return nil, errors.New("didn't receive a peer certificate")
|
||||
}
|
||||
if len(quicSession.ConnectionState().PeerCertificates[0].DNSNames) != 1 {
|
||||
_ = quicSession.CloseWithError(0, "expected a DNS name")
|
||||
return nil, errors.New("didn't receive a DNS name")
|
||||
}
|
||||
if gotAddress := quicSession.ConnectionState().PeerCertificates[0].DNSNames[0]; address != gotAddress {
|
||||
_ = quicSession.CloseWithError(0, "you aren't the host I was hoping for")
|
||||
return nil, fmt.Errorf("expected %q but dialled %q", address, gotAddress)
|
||||
}
|
||||
session := n.newSession(quicSession, address)
|
||||
go session.listenFromQUIC()
|
||||
go n.sessionFunc(address)
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (n *Node) generateTLSConfig() *tls.Config {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue