mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Start Kafka connections for each component that needs them (#1527)
* Start Kafka connection for each component that needs one * Fix roomserver unit tests * Rename to naffkaInstance (@Kegsay review comment) * Fix import cycle
This commit is contained in:
parent
10f1beb0de
commit
49abe359e6
20 changed files with 143 additions and 115 deletions
|
@ -26,13 +26,9 @@ import (
|
|||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
||||
"github.com/matrix-org/naffka"
|
||||
naffkaStorage "github.com/matrix-org/naffka/storage"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/userapi/storage/accounts"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
|
@ -73,8 +69,8 @@ type BaseDendrite struct {
|
|||
httpClient *http.Client
|
||||
Cfg *config.Dendrite
|
||||
Caches *caching.Caches
|
||||
KafkaConsumer sarama.Consumer
|
||||
KafkaProducer sarama.SyncProducer
|
||||
// KafkaConsumer sarama.Consumer
|
||||
// KafkaProducer sarama.SyncProducer
|
||||
}
|
||||
|
||||
const HTTPServerTimeout = time.Minute * 5
|
||||
|
@ -106,14 +102,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
|
|||
logrus.WithError(err).Panicf("failed to start opentracing")
|
||||
}
|
||||
|
||||
var kafkaConsumer sarama.Consumer
|
||||
var kafkaProducer sarama.SyncProducer
|
||||
if cfg.Global.Kafka.UseNaffka {
|
||||
kafkaConsumer, kafkaProducer = setupNaffka(cfg)
|
||||
} else {
|
||||
kafkaConsumer, kafkaProducer = setupKafka(cfg)
|
||||
}
|
||||
|
||||
cache, err := caching.NewInMemoryLRUCache(true)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warnf("Failed to create cache")
|
||||
|
@ -152,8 +140,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo
|
|||
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
|
||||
apiHttpClient: &apiClient,
|
||||
httpClient: &client,
|
||||
KafkaConsumer: kafkaConsumer,
|
||||
KafkaProducer: kafkaProducer,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -334,31 +320,3 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
|||
|
||||
select {}
|
||||
}
|
||||
|
||||
// setupKafka creates kafka consumer/producer pair from the config.
|
||||
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||
consumer, err := sarama.NewConsumer(cfg.Global.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("failed to start kafka consumer")
|
||||
}
|
||||
|
||||
producer, err := sarama.NewSyncProducer(cfg.Global.Kafka.Addresses, nil)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("failed to setup kafka producers")
|
||||
}
|
||||
|
||||
return consumer, producer
|
||||
}
|
||||
|
||||
// setupNaffka creates kafka consumer/producer pair from the config.
|
||||
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
|
||||
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
||||
}
|
||||
naff, err := naffka.New(naffkaDB)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("Failed to setup naffka")
|
||||
}
|
||||
return naff, naff
|
||||
}
|
||||
|
|
53
internal/setup/kafka/kafka.go
Normal file
53
internal/setup/kafka/kafka.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/matrix-org/dendrite/internal/config"
|
||||
"github.com/matrix-org/naffka"
|
||||
naffkaStorage "github.com/matrix-org/naffka/storage"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||
if cfg.UseNaffka {
|
||||
return setupNaffka(cfg)
|
||||
}
|
||||
return setupKafka(cfg)
|
||||
}
|
||||
|
||||
// setupKafka creates kafka consumer/producer pair from the config.
|
||||
func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||
consumer, err := sarama.NewConsumer(cfg.Addresses, nil)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("failed to start kafka consumer")
|
||||
}
|
||||
|
||||
producer, err := sarama.NewSyncProducer(cfg.Addresses, nil)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("failed to setup kafka producers")
|
||||
}
|
||||
|
||||
return consumer, producer
|
||||
}
|
||||
|
||||
// In monolith mode with Naffka, we don't have the same constraints about
|
||||
// consuming the same topic from more than one place like we do with Kafka.
|
||||
// Therefore, we will only open one Naffka connection in case Naffka is
|
||||
// running on SQLite.
|
||||
var naffkaInstance *naffka.Naffka
|
||||
|
||||
// setupNaffka creates kafka consumer/producer pair from the config.
|
||||
func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||
if naffkaInstance != nil {
|
||||
return naffkaInstance, naffkaInstance
|
||||
}
|
||||
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("Failed to setup naffka database")
|
||||
}
|
||||
naffkaInstance, err = naffka.New(naffkaDB)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("Failed to setup naffka")
|
||||
}
|
||||
return naffkaInstance, naffkaInstance
|
||||
}
|
|
@ -15,7 +15,6 @@
|
|||
package setup
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/gorilla/mux"
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/clientapi"
|
||||
|
@ -38,13 +37,11 @@ import (
|
|||
// Monolith represents an instantiation of all dependencies required to build
|
||||
// all components of Dendrite, for use in monolith mode.
|
||||
type Monolith struct {
|
||||
Config *config.Dendrite
|
||||
AccountDB accounts.Database
|
||||
KeyRing *gomatrixserverlib.KeyRing
|
||||
Client *gomatrixserverlib.Client
|
||||
FedClient *gomatrixserverlib.FederationClient
|
||||
KafkaConsumer sarama.Consumer
|
||||
KafkaProducer sarama.SyncProducer
|
||||
Config *config.Dendrite
|
||||
AccountDB accounts.Database
|
||||
KeyRing *gomatrixserverlib.KeyRing
|
||||
Client *gomatrixserverlib.Client
|
||||
FedClient *gomatrixserverlib.FederationClient
|
||||
|
||||
AppserviceAPI appserviceAPI.AppServiceQueryAPI
|
||||
EDUInternalAPI eduServerAPI.EDUServerInputAPI
|
||||
|
@ -61,7 +58,7 @@ type Monolith struct {
|
|||
// AddAllPublicRoutes attaches all public paths to the given router
|
||||
func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router) {
|
||||
clientapi.AddPublicRoutes(
|
||||
csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
|
||||
csMux, &m.Config.ClientAPI, m.AccountDB,
|
||||
m.FedClient, m.RoomserverAPI,
|
||||
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
|
||||
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
|
||||
|
@ -73,7 +70,7 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
|
|||
)
|
||||
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
|
||||
syncapi.AddPublicRoutes(
|
||||
csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
|
||||
csMux, m.UserAPI, m.RoomserverAPI,
|
||||
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue