mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 13:22:46 +00:00
More wiring/boilerplate
This commit is contained in:
parent
a3962bd9b8
commit
cd59f54af7
15 changed files with 101 additions and 16 deletions
61
keyserver/consumers/eduserver.go
Normal file
61
keyserver/consumers/eduserver.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package consumers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
eduapi "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type OutputSigningKeyUpdateConsumer struct {
|
||||
eduServerConsumer *internal.ContinualConsumer
|
||||
keyDB storage.Database
|
||||
keyAPI api.KeyInternalAPI
|
||||
serverName string
|
||||
}
|
||||
|
||||
func NewOutputSigningKeyUpdateConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.Dendrite,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
keyDB storage.Database,
|
||||
keyAPI api.KeyInternalAPI,
|
||||
) *OutputSigningKeyUpdateConsumer {
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "keyserver/eduserver",
|
||||
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
|
||||
Consumer: kafkaConsumer,
|
||||
PartitionStore: keyDB,
|
||||
}
|
||||
s := &OutputSigningKeyUpdateConsumer{
|
||||
eduServerConsumer: &consumer,
|
||||
keyDB: keyDB,
|
||||
keyAPI: keyAPI,
|
||||
serverName: string(cfg.Global.ServerName),
|
||||
}
|
||||
consumer.ProcessMessage = s.onMessage
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *OutputSigningKeyUpdateConsumer) Start() error {
|
||||
return s.eduServerConsumer.Start()
|
||||
}
|
||||
|
||||
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||
var output eduapi.OutputSigningKeyUpdate
|
||||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
log.WithError(err).Errorf("eduserver output log: message parse failure")
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -18,10 +18,12 @@ import (
|
|||
"github.com/gorilla/mux"
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationsender/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/consumers"
|
||||
"github.com/matrix-org/dendrite/keyserver/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/inthttp"
|
||||
"github.com/matrix-org/dendrite/keyserver/producers"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/kafka"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -36,9 +38,9 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
|
|||
// NewInternalAPI returns a concerete implementation of the internal API. Callers
|
||||
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
|
||||
func NewInternalAPI(
|
||||
cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
|
||||
base *setup.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
|
||||
) api.KeyInternalAPI {
|
||||
_, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||
consumer, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
|
||||
|
||||
db, err := storage.NewDatabase(&cfg.Database)
|
||||
if err != nil {
|
||||
|
@ -55,11 +57,21 @@ func NewInternalAPI(
|
|||
logrus.WithError(err).Panicf("failed to start device list updater")
|
||||
}
|
||||
}()
|
||||
return &internal.KeyInternalAPI{
|
||||
|
||||
ap := &internal.KeyInternalAPI{
|
||||
DB: db,
|
||||
ThisServer: cfg.Matrix.ServerName,
|
||||
FedClient: fedClient,
|
||||
Producer: keyChangeProducer,
|
||||
Updater: updater,
|
||||
}
|
||||
|
||||
keyconsumer := consumers.NewOutputSigningKeyUpdateConsumer(
|
||||
base.ProcessContext, base.Cfg, consumer, db, ap,
|
||||
)
|
||||
if err := keyconsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
|
||||
}
|
||||
|
||||
return ap
|
||||
}
|
||||
|
|
|
@ -18,11 +18,14 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type Database interface {
|
||||
internal.PartitionStorer
|
||||
|
||||
// ExistingOneTimeKeys returns a map of keyIDWithAlgorithm to key JSON for the given parameters. If no keys exist with this combination
|
||||
// of user/device/key/algorithm 4-uple then it is omitted from the map. Returns an error when failing to communicate with the database.
|
||||
ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error)
|
||||
|
|
|
@ -55,7 +55,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &shared.Database{
|
||||
d := &shared.Database{
|
||||
DB: db,
|
||||
Writer: sqlutil.NewDummyWriter(),
|
||||
OneTimeKeysTable: otk,
|
||||
|
@ -65,5 +65,9 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
|||
CrossSigningKeysTable: csk,
|
||||
CrossSigningSigsTable: css,
|
||||
CrossSigningStreamsTable: cst,
|
||||
}, nil
|
||||
}
|
||||
if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ type Database struct {
|
|||
CrossSigningKeysTable tables.CrossSigningKeys
|
||||
CrossSigningSigsTable tables.CrossSigningSigs
|
||||
CrossSigningStreamsTable tables.CrossSigningStreams
|
||||
sqlutil.PartitionOffsetStatements
|
||||
}
|
||||
|
||||
func (d *Database) ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error) {
|
||||
|
|
|
@ -53,7 +53,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &shared.Database{
|
||||
d := &shared.Database{
|
||||
DB: db,
|
||||
Writer: sqlutil.NewExclusiveWriter(),
|
||||
OneTimeKeysTable: otk,
|
||||
|
@ -63,5 +63,9 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*shared.Database, error)
|
|||
CrossSigningKeysTable: csk,
|
||||
CrossSigningSigsTable: css,
|
||||
CrossSigningStreamsTable: cst,
|
||||
}, nil
|
||||
}
|
||||
if err = d.PartitionOffsetStatements.Prepare(db, d.Writer, "keyserver"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue