Configuration format v1 (#1230)

* Initial pass at refactoring config (not finished)

* Don't forget current state and EDU servers

* More shifting around

* Update server key API tests

* Fix roomserver test

* Fix more tests

* Further tweaks

* Fix current state server test (sort of)

* Maybe fix appservices

* Fix client API test

* Include database connection string in database options

* Fix sync API build

* Update config test

* Fix unit tests

* Fix federation sender build

* Fix gobind build

* Set Listen address for all services in HTTP monolith mode

* Validate config, reinstate appservice derived in directory, tweaks

* Tweak federation API test

* Set MaxOpenConnections/MaxIdleConnections to previous values

* Update generate-config
This commit is contained in:
Neil Alexander 2020-08-10 14:18:04 +01:00 committed by GitHub
parent fdabba1851
commit 4b09f445c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
155 changed files with 1716 additions and 1503 deletions

View file

@ -43,27 +43,27 @@ type OutputEDUConsumer struct {
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputEDUConsumer(
cfg *config.Dendrite,
cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
) *OutputEDUConsumer {
c := &OutputEDUConsumer{
typingConsumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Topic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
sendToDeviceConsumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
Topic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
TypingTopic: string(cfg.Kafka.Topics.OutputTypingEvent),
SendToDeviceTopic: string(cfg.Kafka.Topics.OutputSendToDeviceEvent),
TypingTopic: string(cfg.Matrix.Kafka.Topics.OutputTypingEvent),
SendToDeviceTopic: string(cfg.Matrix.Kafka.Topics.OutputSendToDeviceEvent),
}
c.typingConsumer.ProcessMessage = c.onTypingEvent
c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent

View file

@ -41,7 +41,7 @@ type KeyChangeConsumer struct {
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
cfg *config.Dendrite,
cfg *config.KeyServer,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
@ -49,7 +49,7 @@ func NewKeyChangeConsumer(
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
consumer: &internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent),
Topic: string(cfg.Matrix.Kafka.Topics.OutputKeyChangeEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
},

View file

@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
cfg *config.Dendrite
cfg *config.FederationSender
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
@ -42,14 +42,14 @@ type OutputRoomEventConsumer struct {
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
cfg *config.FederationSender,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Topic: string(cfg.Matrix.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}

View file

@ -45,27 +45,29 @@ func NewInternalAPI(
stateAPI stateapi.CurrentStateInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties())
cfg := &base.Cfg.FederationSender
federationSenderDB, err := storage.NewDatabase(&cfg.Database)
if err != nil {
logrus.WithError(err).Panic("failed to connect to federation sender db")
}
stats := &statistics.Statistics{
DB: federationSenderDB,
FailuresUntilBlacklist: base.Cfg.Matrix.FederationMaxRetries,
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
queues := queue.NewOutgoingQueues(
federationSenderDB, base.Cfg.Matrix.ServerName, federation, rsAPI, stats,
federationSenderDB, cfg.Matrix.ServerName, federation, rsAPI, stats,
&queue.SigningInfo{
KeyID: base.Cfg.Matrix.KeyID,
PrivateKey: base.Cfg.Matrix.PrivateKey,
ServerName: base.Cfg.Matrix.ServerName,
KeyID: cfg.Matrix.KeyID,
PrivateKey: cfg.Matrix.PrivateKey,
ServerName: cfg.Matrix.ServerName,
},
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
cfg, base.KafkaConsumer, queues,
federationSenderDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@ -73,17 +75,17 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
&base.Cfg.KeyServer, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
)
if err := keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
}
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues)
return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues)
}

View file

@ -12,7 +12,7 @@ import (
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
db storage.Database
cfg *config.Dendrite
cfg *config.FederationSender
statistics *statistics.Statistics
rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
@ -21,7 +21,7 @@ type FederationSenderInternalAPI struct {
}
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.Dendrite,
db storage.Database, cfg *config.FederationSender,
rsAPI api.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,

View file

@ -19,6 +19,7 @@ import (
"database/sql"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
@ -30,10 +31,10 @@ type Database struct {
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (*Database, error) {
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open("postgres", dataSourceName, dbProperties); err != nil {
if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
joinedHosts, err := NewPostgresJoinedHostsTable(d.db)

View file

@ -21,6 +21,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
@ -32,14 +33,10 @@ type Database struct {
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
cs, err := sqlutil.ParseFileURI(dataSourceName)
if err != nil {
return nil, err
}
if d.db, err = sqlutil.Open(sqlutil.SQLiteDriverName(), cs, nil); err != nil {
if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
joinedHosts, err := NewSQLiteJoinedHostsTable(d.db)

View file

@ -17,25 +17,21 @@
package storage
import (
"net/url"
"fmt"
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/internal/config"
)
// NewDatabase opens a new database
func NewDatabase(dataSourceName string, dbProperties sqlutil.DbProperties) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return postgres.NewDatabase(dataSourceName, dbProperties)
}
switch uri.Scheme {
case "file":
return sqlite3.NewDatabase(dataSourceName)
case "postgres":
return postgres.NewDatabase(dataSourceName, dbProperties)
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(dbProperties)
default:
return postgres.NewDatabase(dataSourceName, dbProperties)
return nil, fmt.Errorf("unexpected database type")
}
}

View file

@ -16,27 +16,19 @@ package storage
import (
"fmt"
"net/url"
"github.com/matrix-org/dendrite/federationsender/storage/sqlite3"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/internal/config"
)
// NewDatabase opens a new database
func NewDatabase(
dataSourceName string,
dbProperties sqlutil.DbProperties, // nolint:unparam
) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, fmt.Errorf("Cannot use postgres implementation")
}
switch uri.Scheme {
case "file":
return sqlite3.NewDatabase(dataSourceName)
case "postgres":
return nil, fmt.Errorf("Cannot use postgres implementation")
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation")
default:
return nil, fmt.Errorf("Cannot use postgres implementation")
return nil, fmt.Errorf("unexpected database type")
}
}