mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 23:48:27 +00:00
Fix kafka consumer setup in monolith. (#184)
We can't consume the same topic on a single kafka consumer more than once. So when using kafka we have to create a new consumer for each component in the monolith.
This commit is contained in:
parent
c27d1fdfb4
commit
ba8b5d8bf9
1 changed files with 20 additions and 13 deletions
|
@ -127,7 +127,7 @@ type monolith struct {
|
||||||
queryAPI *roomserver_query.RoomserverQueryAPI
|
queryAPI *roomserver_query.RoomserverQueryAPI
|
||||||
aliasAPI *roomserver_alias.RoomserverAliasAPI
|
aliasAPI *roomserver_alias.RoomserverAliasAPI
|
||||||
|
|
||||||
kafkaConsumer sarama.Consumer
|
naffka *naffka.Naffka
|
||||||
kafkaProducer sarama.SyncProducer
|
kafkaProducer sarama.SyncProducer
|
||||||
|
|
||||||
roomServerProducer *producers.RoomserverProducer
|
roomServerProducer *producers.RoomserverProducer
|
||||||
|
@ -196,16 +196,9 @@ func (m *monolith) setupKafka() {
|
||||||
log.ErrorKey: err,
|
log.ErrorKey: err,
|
||||||
}).Panic("Failed to setup naffka")
|
}).Panic("Failed to setup naffka")
|
||||||
}
|
}
|
||||||
m.kafkaConsumer = naff
|
m.naffka = naff
|
||||||
m.kafkaProducer = naff
|
m.kafkaProducer = naff
|
||||||
} else {
|
} else {
|
||||||
m.kafkaConsumer, err = sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.WithFields(log.Fields{
|
|
||||||
log.ErrorKey: err,
|
|
||||||
"addresses": m.cfg.Kafka.Addresses,
|
|
||||||
}).Panic("Failed to setup kafka consumers")
|
|
||||||
}
|
|
||||||
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
m.kafkaProducer, err = sarama.NewSyncProducer(m.cfg.Kafka.Addresses, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
|
@ -216,6 +209,20 @@ func (m *monolith) setupKafka() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *monolith) kafkaConsumer() sarama.Consumer {
|
||||||
|
if m.cfg.Kafka.UseNaffka {
|
||||||
|
return m.naffka
|
||||||
|
}
|
||||||
|
consumer, err := sarama.NewConsumer(m.cfg.Kafka.Addresses, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.WithFields(log.Fields{
|
||||||
|
log.ErrorKey: err,
|
||||||
|
"addresses": m.cfg.Kafka.Addresses,
|
||||||
|
}).Panic("Failed to setup kafka consumers")
|
||||||
|
}
|
||||||
|
return consumer
|
||||||
|
}
|
||||||
|
|
||||||
func (m *monolith) setupRoomServer() {
|
func (m *monolith) setupRoomServer() {
|
||||||
m.inputAPI = &roomserver_input.RoomserverInputAPI{
|
m.inputAPI = &roomserver_input.RoomserverInputAPI{
|
||||||
DB: m.roomServerDB,
|
DB: m.roomServerDB,
|
||||||
|
@ -263,21 +270,21 @@ func (m *monolith) setupConsumers() {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
|
clientAPIConsumer := clientapi_consumers.NewOutputRoomEvent(
|
||||||
m.cfg, m.kafkaConsumer, m.accountDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), m.accountDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = clientAPIConsumer.Start(); err != nil {
|
if err = clientAPIConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer")
|
log.Panicf("startup: failed to start room server consumer")
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
|
syncAPIRoomConsumer := syncapi_consumers.NewOutputRoomEvent(
|
||||||
m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = syncAPIRoomConsumer.Start(); err != nil {
|
if err = syncAPIRoomConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start room server consumer: %s", err)
|
log.Panicf("startup: failed to start room server consumer: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
|
syncAPIClientConsumer := syncapi_consumers.NewOutputClientData(
|
||||||
m.cfg, m.kafkaConsumer, m.syncAPINotifier, m.syncAPIDB,
|
m.cfg, m.kafkaConsumer(), m.syncAPINotifier, m.syncAPIDB,
|
||||||
)
|
)
|
||||||
if err = syncAPIClientConsumer.Start(); err != nil {
|
if err = syncAPIClientConsumer.Start(); err != nil {
|
||||||
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
log.Panicf("startup: failed to start client API server consumer: %s", err)
|
||||||
|
@ -286,7 +293,7 @@ func (m *monolith) setupConsumers() {
|
||||||
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
federationSenderQueues := queue.NewOutgoingQueues(m.cfg.Matrix.ServerName, m.federation)
|
||||||
|
|
||||||
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
|
federationSenderRoomConsumer := federationsender_consumers.NewOutputRoomEvent(
|
||||||
m.cfg, m.kafkaConsumer, federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
m.cfg, m.kafkaConsumer(), federationSenderQueues, m.federationSenderDB, m.queryAPI,
|
||||||
)
|
)
|
||||||
if err = federationSenderRoomConsumer.Start(); err != nil {
|
if err = federationSenderRoomConsumer.Start(); err != nil {
|
||||||
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
log.WithError(err).Panicf("startup: failed to start room server consumer")
|
||||||
|
|
Loading…
Reference in a new issue