Hook up federation event receiving

This commit is contained in:
Mark Haines 2017-06-01 20:43:06 +01:00
parent 32dc94f877
commit 1b2b837029
3 changed files with 151 additions and 10 deletions

View file

@ -18,11 +18,14 @@ import (
"encoding/base64" "encoding/base64"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/config"
"github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
@ -40,7 +43,10 @@ var (
// openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\ // openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\
// python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")' // python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")'
// //
tlsFingerprint = os.Getenv("TLS_FINGERPRINT") tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
roomserverURL = os.Getenv("ROOMSERVER_URL")
roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
) )
func main() { func main() {
@ -57,6 +63,18 @@ func main() {
log.Panic("No TLS_FINGERPRINT environment variable found.") log.Panic("No TLS_FINGERPRINT environment variable found.")
} }
if len(kafkaURIs) == 0 {
// the kafka default is :9092
kafkaURIs = []string{"localhost:9092"}
}
if roomserverURL == "" {
log.Panic("No ROOMSERVER_URL environment variable found.")
}
if roomserverInputTopic == "" {
log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.")
}
cfg := config.FederationAPI{ cfg := config.FederationAPI{
ServerName: serverName, ServerName: serverName,
// TODO: make the validity period configurable. // TODO: make the validity period configurable.
@ -75,6 +93,37 @@ func main() {
} }
cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}} cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}}
routing.Setup(http.DefaultServeMux, cfg) federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
keyRing := gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{
// TODO: Use perspective key fetchers for production.
&gomatrixserverlib.DirectKeyFetcher{federation.Client},
},
KeyDatabase: &dummyKeyDatabase{},
}
queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil)
roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic)
if err != nil {
log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err)
}
routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing)
log.Fatal(http.ListenAndServe(bindAddr, nil)) log.Fatal(http.ListenAndServe(bindAddr, nil))
} }
// TODO: Implement a proper key database.
type dummyKeyDatabase struct{}
func (d *dummyKeyDatabase) FetchKeys(
requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) {
return nil, nil
}
func (d *dummyKeyDatabase) StoreKeys(
map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys,
) error {
return nil
}

View file

@ -16,21 +16,34 @@ package routing
import ( import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/config"
"github.com/matrix-org/dendrite/federationapi/readers" "github.com/matrix-org/dendrite/federationapi/readers"
"github.com/matrix-org/dendrite/federationapi/writers"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"net/http" "net/http"
"time"
) )
const ( const (
pathPrefixV2Keys = "/_matrix/key/v2" pathPrefixV2Keys = "/_matrix/key/v2"
pathPrefixV1Federation = "/_matrix/federation/v1"
) )
// Setup registers HTTP handlers with the given ServeMux. // Setup registers HTTP handlers with the given ServeMux.
func Setup(servMux *http.ServeMux, cfg config.FederationAPI) { func Setup(
servMux *http.ServeMux,
cfg config.FederationAPI,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
) {
apiMux := mux.NewRouter() apiMux := mux.NewRouter()
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter() v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter()
localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse { localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse {
return readers.LocalKeys(req, cfg) return readers.LocalKeys(req, cfg)
@ -43,6 +56,17 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) {
v2keysmux.Handle("/server/{keyID}", localKeys) v2keysmux.Handle("/server/{keyID}", localKeys)
v2keysmux.Handle("/server/", localKeys) v2keysmux.Handle("/server/", localKeys)
v1fedmux.Handle("/send/{txnID}/", makeAPI("send",
func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return writers.Send(
req, gomatrixserverlib.TransactionID(vars["txnID"]),
time.Now(),
cfg, query, producer, keys,
)
},
))
servMux.Handle("/metrics", prometheus.Handler()) servMux.Handle("/metrics", prometheus.Handler())
servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
} }

View file

@ -3,7 +3,9 @@ package writers
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/config" "github.com/matrix-org/dendrite/federationapi/config"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -18,6 +20,8 @@ func Send(
txnID gomatrixserverlib.TransactionID, txnID gomatrixserverlib.TransactionID,
now time.Time, now time.Time,
cfg config.FederationAPI, cfg config.FederationAPI,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
) util.JSONResponse { ) util.JSONResponse {
request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys) request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys)
@ -37,21 +41,76 @@ func Send(
content.TransactionID = txnID content.TransactionID = txnID
content.Destination = cfg.ServerName content.Destination = cfg.ServerName
// TODO: process the transaction. resp, err := processTransaction(content, query, producer, keys)
if err != nil {
return httputil.LogThenError(req, err)
}
return util.JSONResponse{ return util.JSONResponse{
Code: 200, Code: 200,
JSON: gomatrixserverlib.RespSend{}, JSON: resp,
} }
} }
func processTransaction(t gomatrixserverlib.Transaction, query api.RoomserverQueryAPI) { func processTransaction(
t gomatrixserverlib.Transaction,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
) (*gomatrixserverlib.RespSend, error) {
// Check the event signatures
if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, keys); err != nil {
return nil, err
}
// Process the events.
results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range t.PDUs {
err := processEvent(e, query, producer)
if err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
switch err.(type) {
case unknownRoomError:
case *gomatrixserverlib.NotAllowed:
default:
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
return nil, err
}
results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()}
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
}
}
// TODO: Process the EDUs.
return &gomatrixserverlib.RespSend{PDUs: results}, nil
} }
var errUnknownRoom = fmt.Errorf("unknown room") type unknownRoomError string
func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error { func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e) }
func processEvent(
e gomatrixserverlib.Event,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
) error {
refs := e.PrevEvents() refs := e.PrevEvents()
prevEventIDs := make([]string, len(refs)) prevEventIDs := make([]string, len(refs))
for i := range refs { for i := range refs {
@ -77,7 +136,7 @@ func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error
// that this server is unaware of. // that this server is unaware of.
// However generally speaking we should reject events for rooms we // However generally speaking we should reject events for rooms we
// aren't a member of. // aren't a member of.
return errUnknownRoom return unknownRoomError(e.RoomID())
} }
if !stateResp.PrevEventsExist { if !stateResp.PrevEventsExist {
@ -101,6 +160,7 @@ func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error
panic(fmt.Errorf("Receiving events with missing prev_events is no implemented")) panic(fmt.Errorf("Receiving events with missing prev_events is no implemented"))
} }
// Check that the event is allowed by the state at the event.
authUsingState := gomatrixserverlib.NewAuthEvents(nil) authUsingState := gomatrixserverlib.NewAuthEvents(nil)
for i := range stateResp.StateEvents { for i := range stateResp.StateEvents {
authUsingState.AddEvent(&stateResp.StateEvents[i]) authUsingState.AddEvent(&stateResp.StateEvents[i])
@ -110,5 +170,13 @@ func processEvent(e gomatrixserverlib.Event, query api.RoomserverQueryAPI) error
return err return err
} }
// TODO: Check that the roomserver has a copy of all of the auth_events.
// TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver
if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil {
return err
}
return nil return nil
} }