Convert federationsender to using base component

This commit is contained in:
Erik Johnston 2017-12-06 16:43:01 +00:00
parent 906b64bd3e
commit 28b2afb611
2 changed files with 54 additions and 63 deletions

View file

@ -15,74 +15,19 @@
package main
import (
"flag"
"net/http"
"os"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationsender"
)
var configPath = flag.String("config", "dendrite.yaml", "The path to the config file. For more information, see the config file in this repository.")
func main() {
common.SetupLogging(os.Getenv("LOG_DIR"))
base := basecomponent.NewBaseDendrite("FederationSender")
defer base.Close() // nolint: errcheck
flag.Parse()
federation := base.CreateFederationClient()
if *configPath == "" {
log.Fatal("--config must be supplied")
}
cfg, err := config.Load(*configPath)
if err != nil {
log.Fatalf("Invalid config file: %s", err)
}
closer, err := cfg.SetupTracing("DendriteFederationSender")
if err != nil {
log.WithError(err).Fatalf("Failed to start tracer")
}
defer closer.Close() // nolint: errcheck
kafkaConsumer, err := sarama.NewConsumer(cfg.Kafka.Addresses, nil)
if err != nil {
log.WithFields(log.Fields{
log.ErrorKey: err,
"addresses": cfg.Kafka.Addresses,
}).Panic("Failed to setup kafka consumers")
}
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomServerURL(), nil)
db, err := storage.NewDatabase(string(cfg.Database.FederationSender))
if err != nil {
log.Panicf("startup: failed to create federation sender database with data source %s : %s", cfg.Database.FederationSender, err)
}
federation := gomatrixserverlib.NewFederationClient(
cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
federationsender.SetupFederationSenderComponent(
base, federation,
)
queues := queue.NewOutgoingQueues(cfg.Matrix.ServerName, federation)
consumer := consumers.NewOutputRoomEventConsumer(cfg, kafkaConsumer, queues, db, queryAPI)
if err = consumer.Start(); err != nil {
log.WithError(err).Panicf("startup: failed to start room server consumer")
}
api := mux.NewRouter()
common.SetupHTTPAPI(http.DefaultServeMux, api)
if err := http.ListenAndServe(string(cfg.Listen.FederationSender), nil); err != nil {
panic(err)
}
base.SetupAndServeHTTP(string(base.Cfg.Listen.FederationSender))
}

View file

@ -0,0 +1,46 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package federationsender
import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// SetupFederationSenderComponent sets up and registers HTTP handlers for the
// FederationSender component.
func SetupFederationSenderComponent(
base *basecomponent.BaseDendrite,
federation *gomatrixserverlib.FederationClient,
) {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
federationSenderDB, base.QueryAPI(),
)
if err = consumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}
}