Refactor kafka consumers

This commit is contained in:
Erik Johnston 2017-12-07 13:04:09 +00:00
parent 1233a9c0ae
commit d687ae8d96
12 changed files with 218 additions and 277 deletions

View file

@ -22,7 +22,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
// SetupClientAPIComponent sets up and registers HTTP handlers for the ClientAPI
@ -46,12 +45,10 @@ func SetupClientAPIComponent(
Topic: string(base.Cfg.Kafka.Topics.OutputClientData),
}
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, accountsDB, base.QueryAPI(),
handler := consumers.NewOutputRoomEventConsumer(
base.Cfg, accountsDB, base.QueryAPI(),
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
base.StartRoomServerConsumer(accountsDB, handler)
routing.Setup(
base.APIMux, *base.Cfg, roomserverProducer,

View file

@ -16,87 +16,55 @@ package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *accounts.Database
query api.RoomserverQueryAPI
serverName string
db *accounts.Database
query api.RoomserverQueryAPI
serverName string
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store *accounts.Database,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
query: queryAPI,
serverName: string(cfg.Matrix.ServerName),
db: store,
query: queryAPI,
serverName: string(cfg.Matrix.ServerName),
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("received event from roomserver")
events, err := s.lookupStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
// ProcessNewRoomEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewRoomEvent(ctx context.Context, event *api.OutputNewRoomEvent) error {
ev := event.Event
events, err := s.lookupStateEvents(event.AddsStateEventIDs, ev)
if err != nil {
return err
}
return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs)
return s.db.UpdateMemberships(ctx, events, event.RemovesStateEventIDs)
}
// ProcessNewInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewInviteEvent(ctx context.Context, event *api.OutputNewInviteEvent) error {
return nil
}
// ProcessNewInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessRetireInviteEvent(ctx context.Context, event *api.OutputRetireInviteEvent) error {
return nil
}
// lookupStateEvents looks up the state events that are added by a new event.

View file

@ -259,6 +259,44 @@ func (b *BaseDendrite) SetupAndServeHTTP(addr string) {
logrus.Infof("Stopped %s server on %s", b.componentName, addr)
}
// StartRoomServerConsumer starts handling OutputRoomEvent kafka topic with the
// given handler
func (b *BaseDendrite) StartRoomServerConsumer(
partitionStore common.PartitionStorer,
handler api.ProcessOutputEventHandler,
) {
processor := api.NewProcessOutputEvent(handler)
consumer := common.NewContinualConsumer(
string(b.Cfg.Kafka.Topics.OutputRoomEvent),
b.KafkaConsumer,
partitionStore,
&processor,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
}
// StartClientDataConsumer starts handling OutputClientData kafka topic with the
// given handler
func (b *BaseDendrite) StartClientDataConsumer(
partitionStore common.PartitionStorer,
handler common.ProcessKafkaMessage,
) {
consumer := common.NewContinualConsumer(
string(b.Cfg.Kafka.Topics.OutputClientData),
b.KafkaConsumer,
partitionStore,
handler,
)
if err := consumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
}
// setupKafka creates kafka consumer/producer pair from the config. Checks if
// should use naffka.
func setupKafka(cfg *config.Dendrite) (sarama.Consumer, sarama.SyncProducer) {

View file

@ -42,24 +42,35 @@ type PartitionStorer interface {
type ContinualConsumer struct {
// The kafkaesque topic to consume events from.
// This is the name used in kafka to identify the stream to consume events from.
Topic string
topic string
// A kafkaesque stream consumer providing the APIs for talking to the event source.
// The interface is taken from a client library for Apache Kafka.
// But any equivalent event streaming protocol could be made to implement the same interface.
Consumer sarama.Consumer
consumer sarama.Consumer
// A thing which can load and save partition offsets for a topic.
PartitionStore PartitionStorer
partitionStore PartitionStorer
// ProcessMessage is a function which will be called for each message in the log. Return an error to
// stop processing messages. See ErrShutdown for specific control signals.
ProcessMessage func(msg *sarama.ConsumerMessage) error
// ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
// It is optional.
ShutdownCallback func()
handler ProcessKafkaMessage
}
// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
var ErrShutdown = fmt.Errorf("shutdown")
func NewContinualConsumer(
topic string,
consumer sarama.Consumer,
partitionStore PartitionStorer,
handler ProcessKafkaMessage,
) ContinualConsumer {
return ContinualConsumer{
topic: topic,
consumer: consumer,
partitionStore: partitionStore,
handler: handler,
}
}
// Start starts the consumer consuming.
// Starts up a goroutine for each partition in the kafka stream.
// Returns nil once all the goroutines are started.
@ -67,7 +78,7 @@ var ErrShutdown = fmt.Errorf("shutdown")
func (c *ContinualConsumer) Start() error {
offsets := map[int32]int64{}
partitions, err := c.Consumer.Partitions(c.Topic)
partitions, err := c.consumer.Partitions(c.topic)
if err != nil {
return err
}
@ -76,7 +87,7 @@ func (c *ContinualConsumer) Start() error {
offsets[partition] = sarama.OffsetOldest
}
storedOffsets, err := c.PartitionStore.PartitionOffsets(context.TODO(), c.Topic)
storedOffsets, err := c.partitionStore.PartitionOffsets(context.TODO(), c.topic)
if err != nil {
return err
}
@ -89,7 +100,7 @@ func (c *ContinualConsumer) Start() error {
var partitionConsumers []sarama.PartitionConsumer
for partition, offset := range offsets {
pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset)
pc, err := c.consumer.ConsumePartition(c.topic, partition, offset)
if err != nil {
for _, p := range partitionConsumers {
p.Close() // nolint: errcheck
@ -109,17 +120,18 @@ func (c *ContinualConsumer) Start() error {
func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
defer pc.Close() // nolint: errcheck
for message := range pc.Messages() {
msgErr := c.ProcessMessage(message)
msgErr := c.handler.ProcessMessage(context.TODO(), message)
// Advance our position in the stream so that we will start at the right position after a restart.
if err := c.PartitionStore.SetPartitionOffset(context.TODO(), c.Topic, message.Partition, message.Offset); err != nil {
if err := c.partitionStore.SetPartitionOffset(context.TODO(), c.topic, message.Partition, message.Offset); err != nil {
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
}
// Shutdown if we were told to do so.
if msgErr == ErrShutdown {
if c.ShutdownCallback != nil {
c.ShutdownCallback()
}
return
}
}
}
type ProcessKafkaMessage interface {
ProcessMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
}

View file

@ -16,10 +16,8 @@ package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
@ -27,88 +25,58 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.Database
queues *queue.OutgoingQueues
query api.RoomserverQueryAPI
db *storage.Database
queues *queue.OutgoingQueues
query api.RoomserverQueryAPI
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
queues: queues,
query: queryAPI,
db: store,
queues: queues,
query: queryAPI,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the federation server receives a new event from the room server output log.
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil {
// ProcessNewRoomEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewRoomEvent(ctx context.Context, ore *api.OutputNewRoomEvent) error {
if err := s.processMessage(ctx, ore); err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": output.NewRoomEvent.AddsStateEventIDs,
"del": output.NewRoomEvent.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
return nil
"event": string(ore.Event.EventID()),
"add": ore.AddsStateEventIDs,
"del": ore.RemovesStateEventIDs,
}).WithError(err).Panicf("roomserver output log: write event failure")
}
return nil
}
// ProcessNewInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewInviteEvent(ctx context.Context, event *api.OutputNewInviteEvent) error {
return nil
}
// ProcessRetireInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessRetireInviteEvent(ctx context.Context, event *api.OutputRetireInviteEvent) error {
return nil
}
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, ore *api.OutputNewRoomEvent) error {
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event)
if err != nil {
return err
@ -169,7 +137,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// events from the room server.
// Returns an error if there was a problem talking to the room server.
func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
ore *api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
) ([]gomatrixserverlib.ServerName, error) {
// Combine the delta into a single delta so that the adds and removes can
// cancel each other out. This should reduce the number of times we need

View file

@ -36,11 +36,8 @@ func SetupFederationSenderComponent(
queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)
consumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
federationSenderDB, base.QueryAPI(),
handler := consumers.NewOutputRoomEventConsumer(
base.Cfg, queues, federationSenderDB, base.QueryAPI(),
)
if err = consumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}
base.StartRoomServerConsumer(federationSenderDB, handler)
}

View file

@ -16,82 +16,43 @@ package consumers
import (
"context"
"encoding/json"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver/api"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.PublicRoomsServerDatabase
query api.RoomserverQueryAPI
db *storage.PublicRoomsServerDatabase
query api.RoomserverQueryAPI
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
store *storage.PublicRoomsServerDatabase,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
query: queryAPI,
db: store,
query: queryAPI,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{
"event_id": ev.EventID(),
"room_id": ev.RoomID(),
"type": ev.Type(),
}).Info("received event from roomserver")
addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
// ProcessNewRoomEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewRoomEvent(ctx context.Context, event *api.OutputNewRoomEvent) error {
addQueryReq := api.QueryEventsByIDRequest{EventIDs: event.AddsStateEventIDs}
var addQueryRes api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil {
log.Warn(err)
return err
}
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
remQueryReq := api.QueryEventsByIDRequest{EventIDs: event.RemovesStateEventIDs}
var remQueryRes api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
log.Warn(err)
@ -100,3 +61,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return s.db.UpdateRoomFromEvents(context.TODO(), addQueryRes.Events, remQueryRes.Events)
}
// ProcessNewInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewInviteEvent(ctx context.Context, event *api.OutputNewInviteEvent) error {
return nil
}
// ProcessRetireInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessRetireInviteEvent(ctx context.Context, event *api.OutputRetireInviteEvent) error {
return nil
}

View file

@ -17,6 +17,7 @@ package publicroomsapi
import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/publicroomsapi/consumers"
"github.com/matrix-org/dendrite/publicroomsapi/routing"
"github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/sirupsen/logrus"
@ -33,5 +34,10 @@ func SetupPublicRoomsAPIComponent(
logrus.WithError(err).Panicf("failed to connect to public rooms db")
}
handler := consumers.NewOutputRoomEventConsumer(
base.Cfg, publicRoomsDB, base.QueryAPI(),
)
base.StartRoomServerConsumer(publicRoomsDB, handler)
routing.Setup(base.APIMux, deviceDB, publicRoomsDB)
}

View file

@ -15,7 +15,12 @@
package api
import (
"context"
"encoding/json"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
// An OutputType is a type of roomserver output.
@ -136,3 +141,44 @@ type OutputRetireInviteEvent struct {
// "leave" or "ban".
Membership string
}
type ProcessOutputEventHandler interface {
ProcessNewRoomEvent(context.Context, *OutputNewRoomEvent) error
ProcessNewInviteEvent(context.Context, *OutputNewInviteEvent) error
ProcessRetireInviteEvent(context.Context, *OutputRetireInviteEvent) error
}
type ProcessOutputEvent struct {
handler ProcessOutputEventHandler
}
func NewProcessOutputEvent(handler ProcessOutputEventHandler) ProcessOutputEvent {
return ProcessOutputEvent{handler: handler}
}
// ProcessMessage implements common.ProcessKafkaMessage
func (p *ProcessOutputEvent) ProcessMessage(
ctx context.Context, msg *sarama.ConsumerMessage,
) error {
// Parse out the event JSON
var output OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
logrus.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
switch output.Type {
case OutputTypeNewRoomEvent:
return p.handler.ProcessNewRoomEvent(ctx, output.NewRoomEvent)
case OutputTypeNewInviteEvent:
return p.handler.ProcessNewInviteEvent(ctx, output.NewInviteEvent)
case OutputTypeRetireInviteEvent:
return p.handler.ProcessRetireInviteEvent(ctx, output.RetireInviteEvent)
default:
logrus.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
}

View file

@ -28,43 +28,27 @@ import (
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
notifier *sync.Notifier
db *storage.SyncServerDatabase
notifier *sync.Notifier
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatabase,
) *OutputClientDataConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputClientData),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputClientDataConsumer{
clientAPIConsumer: &consumer,
db: store,
notifier: n,
db: store,
notifier: n,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
return s.clientAPIConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the client API server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// ProcessMessage is called when the sync server receives a new event from the client API server output log.
// ProcessMessage implements common.ProcessKafkaMessage
func (s *OutputClientDataConsumer) ProcessMessage(ctx context.Context, msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output common.AccountData
if err := json.Unmarshal(msg.Value, &output); err != nil {

View file

@ -16,10 +16,8 @@ package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
@ -27,76 +25,34 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
sarama "gopkg.in/Shopify/sarama.v1"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.SyncServerDatabase
notifier *sync.Notifier
query api.RoomserverQueryAPI
db *storage.SyncServerDatabase
notifier *sync.Notifier
query api.RoomserverQueryAPI
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer.
func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store *storage.SyncServerDatabase,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
roomServerConsumer: &consumer,
db: store,
notifier: n,
query: queryAPI,
db: store,
notifier: n,
query: queryAPI,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return s.roomServerConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
}
func (s *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
// ProcessNewRoomEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewRoomEvent(
ctx context.Context, msg *api.OutputNewRoomEvent,
) error {
ev := msg.Event
log.WithFields(log.Fields{
@ -153,8 +109,9 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return nil
}
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
// ProcessNewInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewInviteEvent(
ctx context.Context, msg *api.OutputNewInviteEvent,
) error {
syncStreamPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
@ -169,8 +126,9 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
return nil
}
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
// ProcessRetireInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessRetireInviteEvent(
ctx context.Context, msg *api.OutputRetireInviteEvent,
) error {
err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil {

View file

@ -55,19 +55,15 @@ func SetupSyncAPIComponent(
requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB)
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB, base.QueryAPI(),
roomserverHandler := consumers.NewOutputRoomEventConsumer(
base.Cfg, notifier, syncDB, base.QueryAPI(),
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
base.StartRoomServerConsumer(syncDB, roomserverHandler)
clientConsumer := consumers.NewOutputClientDataConsumer(
base.Cfg, base.KafkaConsumer, notifier, syncDB,
clientHandler := consumers.NewOutputClientDataConsumer(
base.Cfg, notifier, syncDB,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
base.StartClientDataConsumer(syncDB, clientHandler)
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
}