Send /createRoom events to kafka (#33)

This commit is contained in:
Kegsay 2017-03-10 16:19:23 +00:00 committed by GitHub
parent 49ed708ca4
commit 2fcf6fd6eb
4 changed files with 73 additions and 12 deletions

View file

@ -12,6 +12,7 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/matrix-org/dugong" "github.com/matrix-org/dugong"
sarama "gopkg.in/Shopify/sarama.v1"
) )
func setupLogging(logDir string) { func setupLogging(logDir string) {
@ -38,17 +39,29 @@ func main() {
if logDir != "" { if logDir != "" {
setupLogging(logDir) setupLogging(logDir)
} }
log.Info("Starting clientapi")
// TODO: Rather than generating a new key on every startup, we should be // TODO: Rather than generating a new key on every startup, we should be
// reading a PEM formatted file instead. // reading a PEM formatted file instead.
_, privKey, err := ed25519.GenerateKey(nil) _, privKey, err := ed25519.GenerateKey(nil)
if err != nil { if err != nil {
log.Panicf("Failed to generate private key: %s", err) log.Panicf("Failed to generate private key: %s", err)
} }
routing.Setup(http.DefaultServeMux, http.DefaultClient, config.ClientAPI{
ServerName: "localhost", cfg := config.ClientAPI{
KeyID: "ed25519:something", ServerName: "localhost",
PrivateKey: privKey, KeyID: "ed25519:something",
}) PrivateKey: privKey,
KafkaProducerURIs: []string{"localhost:9092"},
ClientAPIOutputTopic: "clientapiOutput",
}
log.Info("Starting clientapi")
producer, err := sarama.NewSyncProducer(cfg.KafkaProducerURIs, nil)
if err != nil {
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err)
}
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, producer)
log.Fatal(http.ListenAndServe(bindAddr, nil)) log.Fatal(http.ListenAndServe(bindAddr, nil))
} }

View file

@ -4,7 +4,15 @@ import "golang.org/x/crypto/ed25519"
// ClientAPI contains the config information necessary to spin up a clientapi process. // ClientAPI contains the config information necessary to spin up a clientapi process.
type ClientAPI struct { type ClientAPI struct {
// The name of the server. This is usually the domain name, e.g 'matrix.org', 'localhost'.
ServerName string ServerName string
// The private key which will be used to sign events.
PrivateKey ed25519.PrivateKey PrivateKey ed25519.PrivateKey
KeyID string // An arbitrary string used to uniquely identify the PrivateKey. Must start with the
// prefix "ed25519:".
KeyID string
// A list of URIs to send events to. These kafka logs should be consumed by a Room Server.
KafkaProducerURIs []string
// The topic for events which are written to the logs.
ClientAPIOutputTopic string
} }

View file

@ -9,17 +9,18 @@ import (
"github.com/matrix-org/dendrite/clientapi/writers" "github.com/matrix-org/dendrite/clientapi/writers"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
sarama "gopkg.in/Shopify/sarama.v1"
) )
const pathPrefixR0 = "/_matrix/client/r0" const pathPrefixR0 = "/_matrix/client/r0"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client // Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests. // to clients which need to make outbound HTTP requests.
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI) { func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer sarama.SyncProducer) {
apiMux := mux.NewRouter() apiMux := mux.NewRouter()
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { r0mux.Handle("/createRoom", make("createRoom", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {
return writers.CreateRoom(req, cfg) return writers.CreateRoom(req, cfg, producer)
}))) })))
r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { r0mux.Handle("/sync", make("sync", util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse {
return readers.Sync(req) return readers.Sync(req)

View file

@ -14,8 +14,10 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
sarama "gopkg.in/Shopify/sarama.v1"
) )
// https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom // https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-createroom
@ -77,15 +79,15 @@ type fledglingEvent struct {
} }
// CreateRoom implements /createRoom // CreateRoom implements /createRoom
func CreateRoom(req *http.Request, cfg config.ClientAPI) util.JSONResponse { func CreateRoom(req *http.Request, cfg config.ClientAPI, producer sarama.SyncProducer) util.JSONResponse {
// TODO: Check room ID doesn't clash with an existing one, and we // TODO: Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs? // probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName) roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.ServerName)
return createRoom(req, cfg, roomID) return createRoom(req, cfg, roomID, producer)
} }
// createRoom implements /createRoom // createRoom implements /createRoom
func createRoom(req *http.Request, cfg config.ClientAPI, roomID string) util.JSONResponse { func createRoom(req *http.Request, cfg config.ClientAPI, roomID string, producer sarama.SyncProducer) util.JSONResponse {
logger := util.GetLogger(req.Context()) logger := util.GetLogger(req.Context())
userID, resErr := auth.VerifyAccessToken(req) userID, resErr := auth.VerifyAccessToken(req)
if resErr != nil { if resErr != nil {
@ -176,7 +178,15 @@ func createRoom(req *http.Request, cfg config.ClientAPI, roomID string) util.JSO
// Add the event to the list of auth events // Add the event to the list of auth events
builtEventMap[common.StateKeyTuple{e.Type, e.StateKey}] = ev builtEventMap[common.StateKeyTuple{e.Type, e.StateKey}] = ev
builtEvents = append(builtEvents, ev) builtEvents = append(builtEvents, ev)
}
// send events to the room server
msgs, err := eventsToMessages(builtEvents, cfg.ClientAPIOutputTopic)
if err != nil {
return util.ErrorResponse(err)
}
if err = producer.SendMessages(msgs); err != nil {
return util.ErrorResponse(err)
} }
return util.JSONResponse{ return util.JSONResponse{
@ -242,6 +252,35 @@ func authEventsFromStateNeeded(eventsNeeded gomatrixserverlib.StateNeeded,
return return
} }
func eventsToMessages(events []*gomatrixserverlib.Event, topic string) ([]*sarama.ProducerMessage, error) {
msgs := make([]*sarama.ProducerMessage, len(events))
for i, e := range events {
var m sarama.ProducerMessage
// map auth event references to IDs
var authEventIDs []string
for _, ref := range e.AuthEvents() {
authEventIDs = append(authEventIDs, ref.EventID)
}
ire := api.InputRoomEvent{
Kind: api.KindNew,
Event: e.JSON(),
AuthEventIDs: authEventIDs,
}
value, err := json.Marshal(ire)
if err != nil {
return nil, err
}
m.Topic = topic
m.Key = sarama.StringEncoder(e.EventID())
m.Value = sarama.ByteEncoder(value)
msgs[i] = &m
}
return msgs, nil
}
type authEventProvider struct { type authEventProvider struct {
events map[common.StateKeyTuple]*gomatrixserverlib.Event events map[common.StateKeyTuple]*gomatrixserverlib.Event
} }