dendrite/keyserver/consumers/eduserver.go
2021-07-28 14:29:02 +01:00

61 lines
1.7 KiB
Go

package consumers
import (
"encoding/json"
eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
)
type OutputSigningKeyUpdateConsumer struct {
eduServerConsumer *internal.ContinualConsumer
keyDB storage.Database
keyAPI api.KeyInternalAPI
serverName string
}
func NewOutputSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputSigningKeyUpdateConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "keyserver/eduserver",
Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputSigningKeyUpdate),
Consumer: kafkaConsumer,
PartitionStore: keyDB,
}
s := &OutputSigningKeyUpdateConsumer{
eduServerConsumer: &consumer,
keyDB: keyDB,
keyAPI: keyAPI,
serverName: string(cfg.Global.ServerName),
}
consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputSigningKeyUpdateConsumer) Start() error {
return s.eduServerConsumer.Start()
}
func (s *OutputSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
var output eduapi.OutputSigningKeyUpdate
if err := json.Unmarshal(msg.Value, &output); err != nil {
log.WithError(err).Errorf("eduserver output log: message parse failure")
return nil
}
return nil
}