mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-04 02:53:40 +00:00
Hook up tracers for consumers
This commit is contained in:
parent
43b35aaea4
commit
7e07f8ae7d
9 changed files with 137 additions and 16 deletions
|
@ -33,6 +33,8 @@ func SetupClientAPIComponent(
|
|||
federation *gomatrixserverlib.FederationClient,
|
||||
keyRing *gomatrixserverlib.KeyRing,
|
||||
) {
|
||||
tracer := base.CreateNewTracer("ClientAPI")
|
||||
|
||||
roomserverProducer := producers.NewRoomserverProducer(base.InputAPI())
|
||||
|
||||
userUpdateProducer := &producers.UserUpdateProducer{
|
||||
|
@ -48,7 +50,7 @@ func SetupClientAPIComponent(
|
|||
handler := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, accountsDB, base.QueryAPI(),
|
||||
)
|
||||
base.StartRoomServerConsumer(accountsDB, handler)
|
||||
base.StartRoomServerConsumer(tracer, accountsDB, handler)
|
||||
|
||||
routing.Setup(
|
||||
base.APIMux, *base.Cfg, roomserverProducer,
|
||||
|
|
|
@ -281,6 +281,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(addr string) {
|
|||
// StartRoomServerConsumer starts handling OutputRoomEvent kafka topic with the
|
||||
// given handler
|
||||
func (b *BaseDendrite) StartRoomServerConsumer(
|
||||
tracer opentracing.Tracer,
|
||||
partitionStore common.PartitionStorer,
|
||||
handler api.ProcessOutputEventHandler,
|
||||
) {
|
||||
|
@ -291,6 +292,7 @@ func (b *BaseDendrite) StartRoomServerConsumer(
|
|||
b.KafkaConsumer,
|
||||
partitionStore,
|
||||
&processor,
|
||||
tracer,
|
||||
)
|
||||
|
||||
if err := consumer.Start(); err != nil {
|
||||
|
@ -301,6 +303,7 @@ func (b *BaseDendrite) StartRoomServerConsumer(
|
|||
// StartClientDataConsumer starts handling OutputClientData kafka topic with the
|
||||
// given handler
|
||||
func (b *BaseDendrite) StartClientDataConsumer(
|
||||
tracer opentracing.Tracer,
|
||||
partitionStore common.PartitionStorer,
|
||||
handler common.ProcessKafkaMessage,
|
||||
) {
|
||||
|
@ -309,6 +312,7 @@ func (b *BaseDendrite) StartClientDataConsumer(
|
|||
b.KafkaConsumer,
|
||||
partitionStore,
|
||||
handler,
|
||||
tracer,
|
||||
)
|
||||
|
||||
if err := consumer.Start(); err != nil {
|
||||
|
|
|
@ -15,9 +15,17 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
"github.com/opentracing/opentracing-go/log"
|
||||
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
)
|
||||
|
||||
|
@ -52,6 +60,7 @@ type ContinualConsumer struct {
|
|||
// ProcessMessage is a function which will be called for each message in the log. Return an error to
|
||||
// stop processing messages. See ErrShutdown for specific control signals.
|
||||
handler ProcessKafkaMessage
|
||||
tracer opentracing.Tracer
|
||||
}
|
||||
|
||||
// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
|
||||
|
@ -62,12 +71,14 @@ func NewContinualConsumer(
|
|||
consumer sarama.Consumer,
|
||||
partitionStore PartitionStorer,
|
||||
handler ProcessKafkaMessage,
|
||||
tracer opentracing.Tracer,
|
||||
) ContinualConsumer {
|
||||
return ContinualConsumer{
|
||||
topic: topic,
|
||||
consumer: consumer,
|
||||
partitionStore: partitionStore,
|
||||
handler: handler,
|
||||
tracer: tracer,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,18 +131,112 @@ func (c *ContinualConsumer) Start() error {
|
|||
func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
|
||||
defer pc.Close() // nolint: errcheck
|
||||
for message := range pc.Messages() {
|
||||
msgErr := c.handler.ProcessMessage(context.TODO(), message)
|
||||
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||
if err := c.partitionStore.SetPartitionOffset(context.TODO(), c.topic, message.Partition, message.Offset); err != nil {
|
||||
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
|
||||
}
|
||||
err := c.processMessage(message)
|
||||
// Shutdown if we were told to do so.
|
||||
if msgErr == ErrShutdown {
|
||||
if err == ErrShutdown {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ContinualConsumer) processMessage(message *sarama.ConsumerMessage) error {
|
||||
span, ctx := c.DeserialiseOpentracingSpan(context.Background(), message)
|
||||
defer span.Finish()
|
||||
|
||||
msgErr := c.handler.ProcessMessage(ctx, message)
|
||||
// Advance our position in the stream so that we will start at the right position after a restart.
|
||||
if err := c.partitionStore.SetPartitionOffset(ctx, c.topic, message.Partition, message.Offset); err != nil {
|
||||
panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
|
||||
}
|
||||
|
||||
if msgErr == ErrShutdown {
|
||||
return msgErr
|
||||
}
|
||||
|
||||
if msgErr != nil {
|
||||
logrus.WithError(msgErr).WithField("topic", c.topic).Warn("Failed to handle message")
|
||||
ext.Error.Set(span, true)
|
||||
span.LogFields(log.Error(msgErr))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ProcessKafkaMessage interface {
|
||||
ProcessMessage(ctx context.Context, msg *sarama.ConsumerMessage) error
|
||||
}
|
||||
|
||||
const kafkaOpentracingHeaderKey string = "opentracingSpanContext"
|
||||
|
||||
func (c *ContinualConsumer) DeserialiseOpentracingSpan(
|
||||
ctx context.Context, msg *sarama.ConsumerMessage,
|
||||
) (opentracing.Span, context.Context) {
|
||||
spanContext := c.spanContextFromMessage(msg)
|
||||
|
||||
span := c.tracer.StartSpan(
|
||||
"process_message",
|
||||
opentracing.FollowsFrom(spanContext),
|
||||
ext.SpanKindConsumer,
|
||||
opentracing.Tag{Key: "message_bus.destination", Value: c.topic},
|
||||
)
|
||||
|
||||
return span, opentracing.ContextWithSpan(ctx, span)
|
||||
}
|
||||
|
||||
func (c *ContinualConsumer) spanContextFromMessage(msg *sarama.ConsumerMessage) opentracing.SpanContext {
|
||||
var opentracingHeader []byte
|
||||
for _, record := range msg.Headers {
|
||||
if bytes.Equal(record.Key, []byte(kafkaOpentracingHeaderKey)) {
|
||||
opentracingHeader = record.Value
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(opentracingHeader) == 0 {
|
||||
logrus.Warn("Failed to find opentracing header")
|
||||
return nil
|
||||
}
|
||||
|
||||
var tmc opentracing.TextMapCarrier
|
||||
if err := json.Unmarshal(opentracingHeader, &tmc); err != nil {
|
||||
logrus.WithError(err).Error("Failed to unmarshal opentracing header")
|
||||
return nil
|
||||
}
|
||||
|
||||
spanContext, err := c.tracer.Extract(opentracing.TextMap, tmc)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to extract spancontext from header")
|
||||
return nil
|
||||
}
|
||||
|
||||
return spanContext
|
||||
}
|
||||
|
||||
func SerialiseOpentracingSpan(
|
||||
tracer opentracing.Tracer, ctx context.Context, msg *sarama.ProducerMessage,
|
||||
) {
|
||||
tmc := make(opentracing.TextMapCarrier)
|
||||
|
||||
span := opentracing.SpanFromContext(ctx)
|
||||
if span == nil {
|
||||
logrus.Warn("Failed to find span in context")
|
||||
return
|
||||
}
|
||||
|
||||
err := tracer.Inject(span.Context(), opentracing.TextMap, tmc)
|
||||
if err != nil {
|
||||
logrus.Warn("Failed to inject span")
|
||||
return
|
||||
}
|
||||
|
||||
outputBytes, err := json.Marshal(tmc)
|
||||
if err != nil {
|
||||
logrus.Warn("Failed to marshal span")
|
||||
return
|
||||
}
|
||||
|
||||
msg.Headers = append(msg.Headers, sarama.RecordHeader{
|
||||
Key: []byte(kafkaOpentracingHeaderKey),
|
||||
Value: outputBytes,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ func SetupFederationSenderComponent(
|
|||
base *basecomponent.BaseDendrite,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
) {
|
||||
tracer := base.CreateNewTracer("FederationSender")
|
||||
|
||||
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panic("failed to connect to federation sender db")
|
||||
|
@ -39,5 +41,5 @@ func SetupFederationSenderComponent(
|
|||
handler := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, queues, federationSenderDB, base.QueryAPI(),
|
||||
)
|
||||
base.StartRoomServerConsumer(federationSenderDB, handler)
|
||||
base.StartRoomServerConsumer(tracer, federationSenderDB, handler)
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ func SetupPublicRoomsAPIComponent(
|
|||
base *basecomponent.BaseDendrite,
|
||||
deviceDB *devices.Database,
|
||||
) {
|
||||
tracer := base.CreateNewTracer("PublicRoomsAPI")
|
||||
|
||||
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to public rooms db")
|
||||
|
@ -37,7 +39,7 @@ func SetupPublicRoomsAPIComponent(
|
|||
handler := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, publicRoomsDB, base.QueryAPI(),
|
||||
)
|
||||
base.StartRoomServerConsumer(publicRoomsDB, handler)
|
||||
base.StartRoomServerConsumer(tracer, publicRoomsDB, handler)
|
||||
|
||||
routing.Setup(base.APIMux, deviceDB, publicRoomsDB)
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ type RoomEventDatabase interface {
|
|||
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
|
||||
type OutputRoomEventWriter interface {
|
||||
// Write a list of events for a room
|
||||
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
|
||||
WriteOutputEvents(ctx context.Context, roomID string, updates []api.OutputEvent) error
|
||||
}
|
||||
|
||||
func processRoomEvent(
|
||||
|
@ -188,7 +188,7 @@ func processInviteEvent(
|
|||
return err
|
||||
}
|
||||
|
||||
if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil {
|
||||
if err = ow.WriteOutputEvents(ctx, roomID, outputUpdates); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ import (
|
|||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/util"
|
||||
|
@ -36,18 +38,20 @@ type RoomserverInputAPI struct {
|
|||
}
|
||||
|
||||
// WriteOutputEvents implements OutputRoomEventWriter
|
||||
func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
||||
func (r *RoomserverInputAPI) WriteOutputEvents(ctx context.Context, roomID string, updates []api.OutputEvent) error {
|
||||
messages := make([]*sarama.ProducerMessage, len(updates))
|
||||
for i := range updates {
|
||||
value, err := json.Marshal(updates[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
messages[i] = &sarama.ProducerMessage{
|
||||
msg := &sarama.ProducerMessage{
|
||||
Topic: r.OutputRoomEventTopic,
|
||||
Key: sarama.StringEncoder(roomID),
|
||||
Value: sarama.ByteEncoder(value),
|
||||
}
|
||||
common.SerialiseOpentracingSpan(opentracing.GlobalTracer(), ctx, msg)
|
||||
messages[i] = msg
|
||||
}
|
||||
return r.Producer.SendMessages(messages)
|
||||
}
|
||||
|
|
|
@ -157,7 +157,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
|||
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
|
||||
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
|
||||
// necessary bookkeeping we'll keep the event sending synchronous for now.
|
||||
if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
|
||||
if err = u.ow.WriteOutputEvents(u.ctx, u.event.RoomID(), updates); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ func SetupSyncAPIComponent(
|
|||
deviceDB *devices.Database,
|
||||
accountsDB *accounts.Database,
|
||||
) {
|
||||
tracer := base.CreateNewTracer("SyncAPI")
|
||||
|
||||
syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI))
|
||||
if err != nil {
|
||||
logrus.WithError(err).Panicf("failed to connect to sync db")
|
||||
|
@ -58,12 +60,12 @@ func SetupSyncAPIComponent(
|
|||
roomserverHandler := consumers.NewOutputRoomEventConsumer(
|
||||
base.Cfg, notifier, syncDB, base.QueryAPI(),
|
||||
)
|
||||
base.StartRoomServerConsumer(syncDB, roomserverHandler)
|
||||
base.StartRoomServerConsumer(tracer, syncDB, roomserverHandler)
|
||||
|
||||
clientHandler := consumers.NewOutputClientDataConsumer(
|
||||
base.Cfg, notifier, syncDB,
|
||||
)
|
||||
base.StartClientDataConsumer(syncDB, clientHandler)
|
||||
base.StartClientDataConsumer(tracer, syncDB, clientHandler)
|
||||
|
||||
routing.Setup(base.APIMux, requestPool, syncDB, deviceDB)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue