diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go index 656b6ada..e7579531 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-sender-server/main.go @@ -15,74 +15,19 @@ package main import ( - "flag" - "net/http" - "os" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/common" - "github.com/matrix-org/dendrite/common/config" - "github.com/matrix-org/dendrite/federationsender/consumers" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" - - log "github.com/sirupsen/logrus" - sarama "gopkg.in/Shopify/sarama.v1" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/federationsender" ) -var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.") - func main() { - common.SetupLogging(os.Getenv("LOG_DIR")) + base := basecomponent.NewBaseDendrite("FederationSender") + defer base.Close() // nolint: errcheck - flag.Parse() + federation := base.CreateFederationClient() - if *configPath == "" { - log.Fatal("--config must be supplied") - } - cfg, err := config.Load(*configPath) - if err != nil { - log.Fatalf("Invalid config file: %s", err) - } - - closer, err := cfg.SetupTracing("DendriteFederationSender") - if err != nil { - log.WithError(err).Fatalf("Failed to start tracer") - } - defer closer.Close() // nolint: errcheck - - kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil) - if err != nil { - log.WithFields(log.Fields{ - log.ErrorKey: err, - "addresses": cfg.Kafka.Addresses, - }).Panic("Failed to setup kafka consumers") - } - - queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil) - - db, err := storage.NewDatabase(string(cfg.Database.FederationSender)) - if err != nil { - log.Panicf("startup: failed to create federation sender database with data source %s : %s", cfg.Database.FederationSender, err) - } - - federation := gomatrixserverlib.NewFederationClient( - cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey, + federationsender.SetupFederationSenderComponent( + base, federation, ) - queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation) - - consumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, queues, db, queryAPI) - if err = consumer.Start(); err != nil { - log.WithError(err).Panicf("startup: failed to start room server consumer") - } - - api := mux.NewRouter() - common.SetupHTTPAPI(http.DefaultServeMux, api) - - if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil { - panic(err) - } + base.SetupAndServeHTTP(string(base.Cfg.Listen.FederationSender)) } diff --git a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go new file mode 100644 index 00000000..1dae79df --- /dev/null +++ b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go @@ -0,0 +1,46 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package federationsender + +import ( + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/federationsender/consumers" + "github.com/matrix-org/dendrite/federationsender/queue" + "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +// SetupFederationSenderComponent sets up and registers HTTP handlers for the +// FederationSender component. +func SetupFederationSenderComponent( + base *basecomponent.BaseDendrite, + federation *gomatrixserverlib.FederationClient, +) { + federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) + if err != nil { + logrus.WithError(err).Panic("failed to connect to federation sender db") + } + + queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation) + + consumer := consumers.NewOutputRoomEventConsumer( + base.Cfg, base.KafkaConsumer, queues, + federationSenderDB, base.QueryAPI(), + ) + if err = consumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start room server consumer") + } +}