Update Naffka (#1295)

* Update Naffka

* Fix Naffka setup

* Update Naffka

* Update Naffka
This commit is contained in:
Neil Alexander 2020-08-24 13:49:20 +01:00 committed by GitHub
parent 10461b8870
commit 4b2db1dff5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 40 deletions

View file

@ -24,11 +24,12 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/naffka"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/matrix-org/naffka"
naffkaStorage "github.com/matrix-org/naffka/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
@ -356,36 +357,13 @@ func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
// setupNaffka creates kafka consumer/producer pair from the config.
func setupNaffka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {
var naffkaDB *naffka.DatabaseImpl
db, err := sqlutil.Open(&cfg.Global.Kafka.Database)
naffkaDB, err := naffkaStorage.NewDatabase(string(cfg.Global.Kafka.Database.ConnectionString))
if err != nil {
logrus.WithError(err).Panic("Failed to open naffka database")
logrus.WithError(err).Panic("Failed to setup naffka database")
}
switch {
case cfg.Global.Kafka.Database.ConnectionString.IsSQLite():
naffkaDB, err = naffka.NewSqliteDatabase(db)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
case cfg.Global.Kafka.Database.ConnectionString.IsPostgres():
naffkaDB, err = naffka.NewPostgresqlDatabase(db)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka database")
}
default:
panic("unknown naffka database type")
}
if naffkaDB == nil {
panic("naffka connection string not understood")
}
naff, err := naffka.New(naffkaDB)
if err != nil {
logrus.WithError(err).Panic("Failed to setup naffka")
}
return naff, naff
}