Prefix-defined Kafka topics (#1254)

* Prefix-defined Kafka topics

* Fix current state server test
This commit is contained in:
Neil Alexander 2020-08-10 15:18:37 +01:00 committed by GitHub
parent 4b09f445c9
commit 52eeeb1627
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 79 additions and 96 deletions

View file

@ -60,60 +60,6 @@ func (c *Global) Verify(configErrs *ConfigErrors, isMonolith bool) {
c.Metrics.Verify(configErrs, isMonolith)
}
type Kafka struct {
// A list of kafka addresses to connect to.
Addresses []string `yaml:"addresses"`
// Whether to use naffka instead of kafka.
// Naffka can only be used when running dendrite as a single monolithic server.
// Kafka can be used both with a monolithic server and when running the
// components as separate servers.
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
// The names of the topics to use when reading and writing from kafka.
Topics struct {
// Topic for roomserver/api.OutputRoomEvent events.
OutputRoomEvent Topic `yaml:"output_room_event"`
// Topic for sending account data from client API to sync API
OutputClientData Topic `yaml:"output_client_data"`
// Topic for eduserver/api.OutputTypingEvent events.
OutputTypingEvent Topic `yaml:"output_typing_event"`
// Topic for eduserver/api.OutputSendToDeviceEvent events.
OutputSendToDeviceEvent Topic `yaml:"output_send_to_device_event"`
// Topic for keyserver when new device keys are added.
OutputKeyChangeEvent Topic `yaml:"output_key_change_event"`
}
}
func (c *Kafka) Defaults() {
c.UseNaffka = true
c.Database.Defaults()
c.Database.ConnectionString = DataSource("file:naffka.db")
c.Topics.OutputRoomEvent = "OutputRoomEventTopic"
c.Topics.OutputClientData = "OutputClientDataTopic"
c.Topics.OutputTypingEvent = "OutputTypingEventTopic"
c.Topics.OutputSendToDeviceEvent = "OutputSendToDeviceEventTopic"
c.Topics.OutputKeyChangeEvent = "OutputKeyChangeEventTopic"
}
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
if c.UseNaffka {
if !isMonolith {
configErrs.Add("naffka can only be used in a monolithic server")
}
checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString))
} else {
// If we aren't using naffka then we need to have at least one kafka
// server to talk to.
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
}
checkNotEmpty(configErrs, "global.kafka.topics.output_room_event", string(c.Topics.OutputRoomEvent))
checkNotEmpty(configErrs, "global.kafka.topics.output_client_data", string(c.Topics.OutputClientData))
checkNotEmpty(configErrs, "global.kafka.topics.output_typing_event", string(c.Topics.OutputTypingEvent))
checkNotEmpty(configErrs, "global.kafka.topics.output_send_to_device_event", string(c.Topics.OutputSendToDeviceEvent))
checkNotEmpty(configErrs, "global.kafka.topics.output_key_change_event", string(c.Topics.OutputKeyChangeEvent))
}
// The configuration to use for Prometheus metrics
type Metrics struct {
// Whether or not the metrics are enabled

View file

@ -0,0 +1,52 @@
package config
import "fmt"
// Defined Kafka topics.
const (
TopicOutputTypingEvent = "OutputTypingEvent"
TopicOutputSendToDeviceEvent = "OutputSendToDeviceEvent"
TopicOutputKeyChangeEvent = "OutputKeyChangeEvent"
TopicOutputRoomEvent = "OutputRoomEvent"
TopicOutputClientData = "OutputClientData"
)
type Kafka struct {
// A list of kafka addresses to connect to.
Addresses []string `yaml:"addresses"`
// Whether to use naffka instead of kafka.
// Naffka can only be used when running dendrite as a single monolithic server.
// Kafka can be used both with a monolithic server and when running the
// components as separate servers.
UseNaffka bool `yaml:"use_naffka"`
// The Naffka database is used internally by the naffka library, if used.
Database DatabaseOptions `yaml:"naffka_database"`
// The prefix to use for Kafka topic names for this homeserver - really only
// useful if running more than one Dendrite on the same Kafka deployment.
TopicPrefix string `yaml:"topic_prefix"`
}
func (k *Kafka) TopicFor(name string) string {
return fmt.Sprintf("%s%s", k.TopicPrefix, name)
}
func (c *Kafka) Defaults() {
c.UseNaffka = true
c.Database.Defaults()
c.Database.ConnectionString = DataSource("file:naffka.db")
c.TopicPrefix = "Dendrite"
}
func (c *Kafka) Verify(configErrs *ConfigErrors, isMonolith bool) {
if c.UseNaffka {
if !isMonolith {
configErrs.Add("naffka can only be used in a monolithic server")
}
checkNotEmpty(configErrs, "global.kafka.database.connection_string", string(c.Database.ConnectionString))
} else {
// If we aren't using naffka then we need to have at least one kafka
// server to talk to.
checkNotZero(configErrs, "global.kafka.addresses", int64(len(c.Addresses)))
}
checkNotEmpty(configErrs, "global.kafka.topic_prefix", string(c.TopicPrefix))
}