mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-01 13:52:46 +00:00
Add NATS JetStream support
Update shopify/sarama
This commit is contained in:
parent
6a35d9f1b1
commit
f6b5b2c52f
4 changed files with 167 additions and 177 deletions
|
@ -12,8 +12,18 @@ const (
|
|||
TopicOutputReceiptEvent = "OutputReceiptEvent"
|
||||
)
|
||||
|
||||
// KafkaTopics is a convenience slice to access all defined Kafka topics.
|
||||
var KafkaTopics = []string{
|
||||
TopicOutputTypingEvent,
|
||||
TopicOutputSendToDeviceEvent,
|
||||
TopicOutputKeyChangeEvent,
|
||||
TopicOutputRoomEvent,
|
||||
TopicOutputClientData,
|
||||
TopicOutputReceiptEvent,
|
||||
}
|
||||
|
||||
type Kafka struct {
|
||||
// A list of kafka addresses to connect to.
|
||||
// A list of kafka/NATS addresses to connect to.
|
||||
Addresses []string `yaml:"addresses"`
|
||||
// The prefix to use for Kafka topic names for this homeserver - really only
|
||||
// useful if running more than one Dendrite on the same Kafka deployment.
|
||||
|
@ -28,6 +38,8 @@ type Kafka struct {
|
|||
// The max size a Kafka message passed between consumer/producer can have
|
||||
// Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka
|
||||
MaxMessageBytes *int `yaml:"max_message_bytes"`
|
||||
// Whether to use NATS instead of kafka/naffka
|
||||
UseNATS bool `yaml:"use_nats"`
|
||||
}
|
||||
|
||||
func (k *Kafka) TopicFor(name string) string {
|
||||
|
@ -37,7 +49,12 @@ func (k *Kafka) TopicFor(name string) string {
|
|||
func (c *Kafka) Defaults() {
|
||||
c.UseNaffka = true
|
||||
c.Database.Defaults()
|
||||
c.Addresses = []string{"localhost:2181"}
|
||||
if c.UseNATS {
|
||||
c.UseNaffka = false
|
||||
c.Addresses = []string{"localhost:2181"}
|
||||
} else {
|
||||
c.Addresses = []string{"nats://127.0.0.1:4222"}
|
||||
}
|
||||
c.Database.ConnectionString = DataSource("file:naffka.db")
|
||||
c.TopicPrefix = "Dendrite"
|
||||
|
||||
|
@ -52,7 +69,7 @@ func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
|
|||
}
|
||||
checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString))
|
||||
} else {
|
||||
// If we aren't using naffka then we need to have at least one kafka
|
||||
// If we aren't using naffka then we need to have at least one kafka/nats
|
||||
// server to talk to.
|
||||
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
|
||||
}
|
||||
|
|
|
@ -1,10 +1,14 @@
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
js "github.com/S7evinK/saramajetstream"
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/naffka"
|
||||
naffkaStorage "github.com/matrix-org/naffka/storage"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -12,6 +16,9 @@ func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProdu
|
|||
if cfg.UseNaffka {
|
||||
return setupNaffka(cfg)
|
||||
}
|
||||
if cfg.UseNATS {
|
||||
return setupNATS(cfg)
|
||||
}
|
||||
return setupKafka(cfg)
|
||||
}
|
||||
|
||||
|
@ -56,3 +63,48 @@ func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
|||
}
|
||||
return naffkaInstance, naffkaInstance
|
||||
}
|
||||
|
||||
func setupNATS(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) {
|
||||
logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats")
|
||||
nc, err := nats.Connect(cfg.Addresses[0])
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("failed to connect to nats")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s, err := nc.JetStream()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("unable to get jetstream context")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// create a stream for every topic
|
||||
for _, topic := range config.KafkaTopics {
|
||||
sn := cfg.TopicFor(topic)
|
||||
stream, err := s.StreamInfo(sn)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warn("unable to get stream info")
|
||||
}
|
||||
|
||||
if stream == nil {
|
||||
maxLifeTime := time.Second * 0
|
||||
|
||||
// Typing events can be removed from the stream, as they are only relevant for a short time
|
||||
if topic == config.TopicOutputTypingEvent {
|
||||
maxLifeTime = time.Second * 30
|
||||
}
|
||||
_, _ = s.AddStream(&nats.StreamConfig{
|
||||
Name: sn,
|
||||
Subjects: []string{topic},
|
||||
MaxBytes: int64(*cfg.MaxMessageBytes),
|
||||
MaxMsgSize: int32(*cfg.MaxMessageBytes),
|
||||
MaxAge: maxLifeTime,
|
||||
Duplicates: maxLifeTime / 2,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
consumer := js.NewJetStreamConsumer(s, cfg.TopicPrefix)
|
||||
producer := js.NewJetStreamProducer(s, cfg.TopicPrefix)
|
||||
return consumer, producer
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue