diff --git a/appservice/appservice.go b/appservice/appservice.go index 782e090a..4ff42360 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -32,7 +32,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -58,7 +58,7 @@ func NewInternalAPI( }, }, } - consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + consumer, _ := jetstream.SetupConsumerProducer(&base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 2ad7f68f..f9790a0b 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" @@ -52,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "appservice/roomserver", - Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), + Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), Consumer: kafkaConsumer, PartitionStore: appserviceDB, } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 562d89d2..ffab1337 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -26,7 +26,7 @@ import ( keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/gomatrixserverlib" @@ -49,11 +49,11 @@ func AddPublicRoutes( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, ) { - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) syncProducer := &producers.SyncAPIProducer{ Producer: producer, - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), } routing.Setup( diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 6b0e57d8..dc9ff860 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -122,7 +122,6 @@ func main() { cfg.Global.ServerName = "p2p" cfg.Global.PrivateKey = privKey cfg.Global.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName)) - cfg.Global.Kafka.UseNaffka = true cfg.FederationSender.FederationMaxRetries = 6 cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) @@ -132,7 +131,6 @@ func main() { cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName)) cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836"} cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", *instanceName)) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 2712ed4a..5ee455eb 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -144,7 +144,6 @@ func main() { cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) cfg.Global.PrivateKey = sk cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Global.Kafka.UseNaffka = true cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) @@ -154,7 +153,6 @@ func main() { cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} if err := cfg.Derive(); err != nil { panic(err) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index abeefbe5..2cb0a7df 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -72,7 +72,6 @@ func main() { cfg.Global.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Global.PrivateKey = ygg.SigningPrivateKey() cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Global.Kafka.UseNaffka = true cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) @@ -82,7 +81,6 @@ func main() { cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) - cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836"} cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", *instanceName)) if err = cfg.Derive(); err != nil { diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 7cc40510..2e8ef189 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -23,8 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/input" "github.com/matrix-org/dendrite/eduserver/inthttp" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" ) @@ -43,15 +42,15 @@ func NewInternalAPI( ) api.EDUServerInputAPI { cfg := &base.Cfg.EDUServer - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) return &input.EDUServerInputAPI{ Cache: eduCache, UserAPI: userAPI, Producer: producer, - OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), - OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), - OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), ServerName: cfg.Matrix.ServerName, } } diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index 9a1ec1e2..63a367f5 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -55,29 +56,29 @@ func NewOutputEDUConsumer( typingConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/typing", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), Consumer: kafkaConsumer, PartitionStore: store, }, sendToDeviceConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/sendtodevice", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), Consumer: kafkaConsumer, PartitionStore: store, }, receiptConsumer: &internal.ContinualConsumer{ Process: process, ComponentName: "eduserver/receipt", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), Consumer: kafkaConsumer, PartitionStore: store, }, queues: queues, db: store, ServerName: cfg.Matrix.ServerName, - TypingTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent), - SendToDeviceTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent), + TypingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + SendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), } c.typingConsumer.ProcessMessage = c.onTypingEvent c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent diff --git a/federationsender/consumers/keychange.go b/federationsender/consumers/keychange.go index 9e146390..4226d492 100644 --- a/federationsender/consumers/keychange.go +++ b/federationsender/consumers/keychange.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -53,7 +54,7 @@ func NewKeyChangeConsumer( consumer: &internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/keychange", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, }, diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index f9c4a5c2..b145bc50 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -26,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" @@ -52,7 +53,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "federationsender/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go index 0732c5d3..ee89d782 100644 --- a/federationsender/federationsender.go +++ b/federationsender/federationsender.go @@ -25,7 +25,7 @@ import ( "github.com/matrix-org/dendrite/federationsender/storage" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -61,7 +61,7 @@ func NewInternalAPI( FailuresUntilBlacklist: cfg.FederationMaxRetries, } - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + consumer, _ := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) queues := queue.NewOutgoingQueues( federationSenderDB, base.ProcessContext, diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 7e8fc2e0..ed345dcb 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -23,7 +23,7 @@ import ( "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" ) @@ -38,14 +38,14 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { func NewInternalAPI( cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) db, err := storage.NewDatabase(&cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key server database") } keyChangeProducer := &producers.KeyChange{ - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), Producer: producer, DB: db, } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 396a1def..192a056b 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -23,8 +23,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" ) @@ -42,7 +41,7 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + _, producer := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { @@ -55,7 +54,7 @@ func NewInternalAPI( } return internal.NewRoomserverAPI( - cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, producer, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), base.Caches, keyRing, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5c954007..335d2a4c 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -19,6 +19,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) @@ -161,12 +162,11 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro cfg := &config.Dendrite{} cfg.Defaults() cfg.Global.ServerName = testOrigin - cfg.Global.Kafka.UseNaffka = true cfg.RoomServer.Database = config.DatabaseOptions{ ConnectionString: roomserverDBFileURI, } dp := &dummyProducer{ - topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), } cache, err := caching.NewInMemoryLRUCache(false) if err != nil { @@ -181,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro logrus.WithError(err).Panicf("failed to connect to room server db") } return internal.NewRoomserverAPI( - &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), + &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent)), base.Caches, &test.NopJSONVerifier{}, nil, ), dp } diff --git a/setup/config/config.go b/setup/config/config.go index c9fdbf6c..52307ee5 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -329,7 +329,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { } func (c *Dendrite) Wiring() { - c.Global.Kafka.Matrix = &c.Global + c.Global.JetStream.Matrix = &c.Global c.ClientAPI.Matrix = &c.Global c.EDUServer.Matrix = &c.Global c.FederationAPI.Matrix = &c.Global diff --git a/setup/config/config_global.go b/setup/config/config_global.go index 90a92f2b..7b3d583b 100644 --- a/setup/config/config_global.go +++ b/setup/config/config_global.go @@ -43,8 +43,8 @@ type Global struct { // Defaults to an empty array. TrustedIDServers []string `yaml:"trusted_third_party_id_servers"` - // Kafka/Naffka configuration - Kafka Kafka `yaml:"kafka"` + // JetStream configuration + JetStream JetStream `yaml:"jetstream"` // Metrics configuration Metrics Metrics `yaml:"metrics"` @@ -63,7 +63,7 @@ func (c *Global) Defaults() { c.KeyID = "ed25519:auto" c.KeyValidityPeriod = time.Hour * 24 * 7 - c.Kafka.Defaults() + c.JetStream.Defaults() c.Metrics.Defaults() c.DNSCache.Defaults() c.Sentry.Defaults() @@ -73,7 +73,7 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) { checkNotEmpty(configErrs, "global.server_name", string(c.ServerName)) checkNotEmpty(configErrs, "global.private_key", string(c.PrivateKeyPath)) - c.Kafka.Verify(configErrs, isMonolith) + c.JetStream.Verify(configErrs, isMonolith) c.Metrics.Verify(configErrs, isMonolith) c.Sentry.Verify(configErrs, isMonolith) c.DNSCache.Verify(configErrs, isMonolith) diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go new file mode 100644 index 00000000..53b1e163 --- /dev/null +++ b/setup/config/config_jetstream.go @@ -0,0 +1,31 @@ +package config + +import "fmt" + +type JetStream struct { + Matrix *Global `yaml:"-"` + + // A list of NATS addresses to connect to. If none are specified, an + // internal NATS server will be used when running in monolith mode only. + Addresses []string `yaml:"addresses"` + // The prefix to use for stream names for this homeserver - really only + // useful if running more than one Dendrite on the same NATS deployment. + TopicPrefix string `yaml:"topic_prefix"` +} + +func (k *JetStream) TopicFor(name string) string { + return fmt.Sprintf("%s%s", k.TopicPrefix, name) +} + +func (c *JetStream) Defaults() { + c.Addresses = []string{} + c.TopicPrefix = "Dendrite" +} + +func (c *JetStream) Verify(configErrs *ConfigErrors, isMonolith bool) { + // If we are running in a polylith deployment then we need at least + // one NATS JetStream server to talk to. + if !isMonolith { + checkNotZero(configErrs, "global.jetstream.addresses", int64(len(c.Addresses))) + } +} diff --git a/setup/config/config_kafka.go b/setup/config/config_kafka.go deleted file mode 100644 index c7c7f09f..00000000 --- a/setup/config/config_kafka.go +++ /dev/null @@ -1,75 +0,0 @@ -package config - -import "fmt" - -// Defined Kafka topics. -const ( - TopicOutputTypingEvent = "OutputTypingEvent" - TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent" - TopicOutputKeyChangeEvent = "OutputKeyChangeEvent" - TopicOutputRoomEvent = "OutputRoomEvent" - TopicOutputClientData = "OutputClientData" - TopicOutputReceiptEvent = "OutputReceiptEvent" -) - -// KafkaTopics is a convenience slice to access all defined Kafka topics. -var KafkaTopics = []string{ - TopicOutputTypingEvent, - TopicOutputSendToDeviceEvent, - TopicOutputKeyChangeEvent, - TopicOutputRoomEvent, - TopicOutputClientData, - TopicOutputReceiptEvent, -} - -type Kafka struct { - Matrix *Global `yaml:"-"` - - // A list of kafka/NATS addresses to connect to. - Addresses []string `yaml:"addresses"` - // The prefix to use for Kafka topic names for this homeserver - really only - // useful if running more than one Dendrite on the same Kafka deployment. - TopicPrefix string `yaml:"topic_prefix"` - // Whether to use naffka instead of kafka. - // Naffka can only be used when running dendrite as a single monolithic server. - // Kafka can be used both with a monolithic server and when running the - // components as separate servers. - UseNaffka bool `yaml:"use_naffka"` - // The Naffka database is used internally by the naffka library, if used. - Database DatabaseOptions `yaml:"naffka_database"` - // The max size a Kafka message passed between consumer/producer can have - // Equals roughly max.message.bytes / fetch.message.max.bytes in Kafka - MaxMessageBytes *int `yaml:"max_message_bytes"` - // Whether to use NATS instead of kafka/naffka - UseNATS bool `yaml:"use_nats"` -} - -func (k *Kafka) TopicFor(name string) string { - return fmt.Sprintf("%s%s", k.TopicPrefix, name) -} - -func (c *Kafka) Defaults() { - c.UseNaffka = true - c.Database.Defaults(10) - c.Addresses = []string{"localhost:2181"} - c.Database.ConnectionString = DataSource("file:naffka.db") - c.TopicPrefix = "Dendrite" - - maxBytes := 1024 * 1024 * 8 // about 8MB - c.MaxMessageBytes = &maxBytes -} - -func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) { - if c.UseNaffka { - if !isMonolith { - configErrs.Add("naffka can only be used in a monolithic server") - } - checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString)) - } else { - // If we aren't using naffka then we need to have at least one kafka/nats - // server to talk to. - checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses))) - } - checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix)) - checkPositive(configErrs, "global.kafka.max_message_bytes", int64(*c.MaxMessageBytes)) -} diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go new file mode 100644 index 00000000..4127129a --- /dev/null +++ b/setup/jetstream/nats.go @@ -0,0 +1,88 @@ +package jetstream + +import ( + "strings" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/setup/config" + "github.com/sirupsen/logrus" + + saramajs "github.com/S7evinK/saramajetstream" + natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + natsclient "github.com/nats-io/nats.go" +) + +var natsServer *natsserver.Server +var natsServerMutex sync.Mutex + +func SetupConsumerProducer(cfg *config.JetStream) (sarama.Consumer, sarama.SyncProducer) { + natsServerMutex.Lock() + s := natsServer + if s == nil { + var err error + natsServer, err = natsserver.NewServer(&natsserver.Options{ + ServerName: "monolith", + DontListen: true, + JetStream: true, + StoreDir: string(cfg.Matrix.ServerName), + }) + if err != nil { + panic(err) + } + natsServer.ConfigureLogger() + go natsServer.Start() + s = natsServer + } + natsServerMutex.Unlock() + if !natsServer.ReadyForConnections(time.Second * 10) { + logrus.Fatalln("NATS did not start in time") + } + conn, err := s.InProcessConn() + if err != nil { + logrus.Fatalln("Failed to get a NATS in-process conn") + } + nc, err := natsclient.Connect("", natsclient.InProcessConn(conn)) + if err != nil { + logrus.Fatalln("Failed to create NATS client") + } + return setupNATS(cfg, nc) +} + +func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (sarama.Consumer, sarama.SyncProducer) { + if nc == nil { + var err error + nc, err = nats.Connect(strings.Join(cfg.Addresses, ",")) + if err != nil { + logrus.WithError(err).Panic("Unable to connect to NATS") + return nil, nil + } + } + + s, err := nc.JetStream() + if err != nil { + logrus.WithError(err).Panic("Unable to get JetStream context") + return nil, nil + } + + for _, stream := range streams { + info, err := s.StreamInfo(stream.Name) + if err != nil && err != natsclient.ErrStreamNotFound { + logrus.WithError(err).Fatal("Unable to get stream info") + } + if info == nil { + stream.Name = cfg.TopicFor(stream.Name) + stream.Subjects = []string{stream.Name} + + if _, err = s.AddStream(stream); err != nil { + logrus.WithError(err).WithField("stream", stream.Name).Fatal("Unable to add stream") + } + } + } + + consumer := saramajs.NewJetStreamConsumer(nc, s, "") + producer := saramajs.NewJetStreamProducer(nc, s, "") + return consumer, producer +} diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go new file mode 100644 index 00000000..326e62a9 --- /dev/null +++ b/setup/jetstream/streams.go @@ -0,0 +1,50 @@ +package jetstream + +import ( + "time" + + "github.com/nats-io/nats.go" +) + +var ( + OutputRoomEvent = "OutputRoomEvent" + OutputSendToDeviceEvent = "OutputSendToDeviceEvent" + OutputKeyChangeEvent = "OutputKeyChangeEvent" + OutputTypingEvent = "OutputTypingEvent" + OutputClientData = "OutputClientData" + OutputReceiptEvent = "OutputReceiptEvent" +) + +var streams = []*nats.StreamConfig{ + { + Name: OutputRoomEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputSendToDeviceEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputKeyChangeEvent, + Retention: nats.LimitsPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputTypingEvent, + Retention: nats.InterestPolicy, + Storage: nats.MemoryStorage, + MaxAge: time.Second * 60, + }, + { + Name: OutputClientData, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, + { + Name: OutputReceiptEvent, + Retention: nats.InterestPolicy, + Storage: nats.FileStorage, + }, +} diff --git a/setup/kafka/kafka.go b/setup/kafka/kafka.go deleted file mode 100644 index 78b8767d..00000000 --- a/setup/kafka/kafka.go +++ /dev/null @@ -1,163 +0,0 @@ -package kafka - -import ( - "strings" - "sync" - "time" - - js "github.com/S7evinK/saramajetstream" - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/naffka" - naffkaStorage "github.com/matrix-org/naffka/storage" - "github.com/nats-io/nats-server/v2/server" - natsserver "github.com/nats-io/nats-server/v2/server" - natsclient "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -var natsServer *natsserver.Server -var natsServerMutex sync.Mutex - -func SetupConsumerProducer(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - /* - if cfg.UseNaffka { - return setupNaffka(cfg) - } - */ - if true || cfg.UseNATS { - if true { - natsServerMutex.Lock() - s := natsServer - if s == nil { - logrus.Infof("Starting NATS") - var err error - natsServer, err = natsserver.NewServer(&server.Options{ - ServerName: "monolith", - DontListen: true, - JetStream: true, - StoreDir: string(cfg.Matrix.ServerName), - LogFile: "nats.log", - Debug: true, - }) - if err != nil { - panic(err) - } - natsServer.ConfigureLogger() - go natsServer.Start() - s = natsServer - } - natsServerMutex.Unlock() - if !natsServer.ReadyForConnections(time.Second * 10) { - logrus.Fatalln("NATS did not start in time") - } - conn, err := s.InProcessConn() - if err != nil { - logrus.Fatalln("Failed to get a NATS in-process conn") - } - nc, err := natsclient.Connect("", natsclient.InProcessConn(conn)) - if err != nil { - logrus.Fatalln("Failed to create NATS client") - } - return setupNATS(cfg, nc) - } else { - return setupNATS(cfg, nil) - } - } - return setupKafka(cfg) -} - -// setupKafka creates kafka consumer/producer pair from the config. -func setupKafka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - sCfg := sarama.NewConfig() - sCfg.Producer.MaxMessageBytes = *cfg.MaxMessageBytes - sCfg.Producer.Return.Successes = true - sCfg.Consumer.Fetch.Default = int32(*cfg.MaxMessageBytes) - - consumer, err := sarama.NewConsumer(cfg.Addresses, sCfg) - if err != nil { - logrus.WithError(err).Panic("failed to start kafka consumer") - } - - producer, err := sarama.NewSyncProducer(cfg.Addresses, sCfg) - if err != nil { - logrus.WithError(err).Panic("failed to setup kafka producers") - } - - return consumer, producer -} - -// In monolith mode with Naffka, we don't have the same constraints about -// consuming the same topic from more than one place like we do with Kafka. -// Therefore, we will only open one Naffka connection in case Naffka is -// running on SQLite. -var naffkaInstance *naffka.Naffka - -// setupNaffka creates kafka consumer/producer pair from the config. -func setupNaffka(cfg *config.Kafka) (sarama.Consumer, sarama.SyncProducer) { - if naffkaInstance != nil { - return naffkaInstance, naffkaInstance - } - naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Database.ConnectionString)) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka database") - } - naffkaInstance, err = naffka.New(naffkaDB) - if err != nil { - logrus.WithError(err).Panic("Failed to setup naffka") - } - return naffkaInstance, naffkaInstance -} - -func setupNATS(cfg *config.Kafka, nc *natsclient.Conn) (sarama.Consumer, sarama.SyncProducer) { - logrus.WithField("servers", cfg.Addresses).Debug("connecting to nats") - - if nc == nil { - var err error - nc, err = nats.Connect(strings.Join(cfg.Addresses, ",")) - if err != nil { - logrus.WithError(err).Panic("unable to connect to NATS") - return nil, nil - } - } - - s, err := nc.JetStream() - if err != nil { - logrus.WithError(err).Panic("unable to get jetstream context") - return nil, nil - } - - // create a stream for every topic - for _, topic := range config.KafkaTopics { - sn := cfg.TopicFor(topic) - stream, err := s.StreamInfo(sn) - if err != nil { - logrus.WithError(err).Warn("unable to get stream info") - } - - if stream == nil { - maxLifeTime := time.Second * 0 - - // Typing events can be removed from the stream, as they are only relevant for a short time - if topic == config.TopicOutputTypingEvent { - maxLifeTime = time.Second * 60 - } - _, err = s.AddStream(&nats.StreamConfig{ - Name: sn, - Subjects: []string{topic}, - MaxBytes: int64(*cfg.MaxMessageBytes), - MaxMsgSize: int32(*cfg.MaxMessageBytes), - MaxAge: maxLifeTime, - Duplicates: maxLifeTime / 2, - }) - if err != nil { - logrus.WithError(err).WithField("stream", sn).Fatal("unable to add nats stream") - } - } - } - - consumer := js.NewJetStreamConsumer(nc, s, cfg.TopicPrefix) - producer := js.NewJetStreamProducer(nc, s, cfg.TopicPrefix) - return consumer, producer -} diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index a166ae14..a50e9c8f 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -50,7 +51,7 @@ func NewOutputClientDataConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/clientapi", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 668f945b..2b25985f 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -52,7 +53,7 @@ func NewOutputReceiptEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/receipt", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), + Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 5e626aef..4de8eeb7 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -55,7 +56,7 @@ func NewOutputSendToDeviceEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/sendtodevice", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index c10d1e94..343f777e 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -53,7 +54,7 @@ func NewOutputTypingEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/eduserver/typing", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index a6aeee3a..68a80952 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -24,6 +24,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" @@ -58,7 +59,7 @@ func NewOutputRoomEventConsumer( consumer := internal.ContinualConsumer{ Process: process, ComponentName: "syncapi/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, } diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140c..16e222cb 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -24,7 +24,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -48,7 +48,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + consumer, _ := jetstream.SetupConsumerProducer(&cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -65,7 +65,7 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), + process, cfg.Matrix.ServerName, string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)), consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil {