diff --git a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go index c40dcd7c..b4cfa011 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/clientapi.go +++ b/src/github.com/matrix-org/dendrite/clientapi/clientapi.go @@ -33,6 +33,8 @@ func SetupClientAPIComponent( federation *gomatrixserverlib.FederationClient, keyRing *gomatrixserverlib.KeyRing, ) { + tracer := base.CreateNewTracer("ClientAPI") + roomserverProducer := producers.NewRoomserverProducer(base.InputAPI()) userUpdateProducer := &producers.UserUpdateProducer{ @@ -48,7 +50,7 @@ func SetupClientAPIComponent( handler := consumers.NewOutputRoomEventConsumer( base.Cfg, accountsDB, base.QueryAPI(), ) - base.StartRoomServerConsumer(accountsDB, handler) + base.StartRoomServerConsumer(tracer, accountsDB, handler) routing.Setup( base.APIMux, *base.Cfg, roomserverProducer, diff --git a/src/github.com/matrix-org/dendrite/common/basecomponent/base.go b/src/github.com/matrix-org/dendrite/common/basecomponent/base.go index 035c0982..993de880 100644 --- a/src/github.com/matrix-org/dendrite/common/basecomponent/base.go +++ b/src/github.com/matrix-org/dendrite/common/basecomponent/base.go @@ -281,6 +281,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(addr string) { // StartRoomServerConsumer starts handling OutputRoomEvent kafka topic with the // given handler func (b *BaseDendrite) StartRoomServerConsumer( + tracer opentracing.Tracer, partitionStore common.PartitionStorer, handler api.ProcessOutputEventHandler, ) { @@ -291,6 +292,7 @@ func (b *BaseDendrite) StartRoomServerConsumer( b.KafkaConsumer, partitionStore, &processor, + tracer, ) if err := consumer.Start(); err != nil { @@ -301,6 +303,7 @@ func (b *BaseDendrite) StartRoomServerConsumer( // StartClientDataConsumer starts handling OutputClientData kafka topic with the // given handler func (b *BaseDendrite) StartClientDataConsumer( + tracer opentracing.Tracer, partitionStore common.PartitionStorer, handler common.ProcessKafkaMessage, ) { @@ -309,6 +312,7 @@ func (b *BaseDendrite) StartClientDataConsumer( b.KafkaConsumer, partitionStore, handler, + tracer, ) if err := consumer.Start(); err != nil { diff --git a/src/github.com/matrix-org/dendrite/common/consumers.go b/src/github.com/matrix-org/dendrite/common/consumers.go index 9129682d..07d1f8a5 100644 --- a/src/github.com/matrix-org/dendrite/common/consumers.go +++ b/src/github.com/matrix-org/dendrite/common/consumers.go @@ -15,9 +15,17 @@ package common import ( + "bytes" "context" + "encoding/json" "fmt" + "github.com/sirupsen/logrus" + + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" + + opentracing "github.com/opentracing/opentracing-go" sarama "gopkg.in/Shopify/sarama.v1" ) @@ -52,6 +60,7 @@ type ContinualConsumer struct { // ProcessMessage is a function which will be called for each message in the log. Return an error to // stop processing messages. See ErrShutdown for specific control signals. handler ProcessKafkaMessage + tracer opentracing.Tracer } // ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer. @@ -62,12 +71,14 @@ func NewContinualConsumer( consumer sarama.Consumer, partitionStore PartitionStorer, handler ProcessKafkaMessage, + tracer opentracing.Tracer, ) ContinualConsumer { return ContinualConsumer{ topic: topic, consumer: consumer, partitionStore: partitionStore, handler: handler, + tracer: tracer, } } @@ -120,18 +131,112 @@ func (c *ContinualConsumer) Start() error { func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) { defer pc.Close() // nolint: errcheck for message := range pc.Messages() { - msgErr := c.handler.ProcessMessage(context.TODO(), message) - // Advance our position in the stream so that we will start at the right position after a restart. - if err := c.partitionStore.SetPartitionOffset(context.TODO(), c.topic, message.Partition, message.Offset); err != nil { - panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err)) - } + err := c.processMessage(message) // Shutdown if we were told to do so. - if msgErr == ErrShutdown { + if err == ErrShutdown { return } } } +func (c *ContinualConsumer) processMessage(message *sarama.ConsumerMessage) error { + span, ctx := c.DeserialiseOpentracingSpan(context.Background(), message) + defer span.Finish() + + msgErr := c.handler.ProcessMessage(ctx, message) + // Advance our position in the stream so that we will start at the right position after a restart. + if err := c.partitionStore.SetPartitionOffset(ctx, c.topic, message.Partition, message.Offset); err != nil { + panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err)) + } + + if msgErr == ErrShutdown { + return msgErr + } + + if msgErr != nil { + logrus.WithError(msgErr).WithField("topic", c.topic).Warn("Failed to handle message") + ext.Error.Set(span, true) + span.LogFields(log.Error(msgErr)) + } + + return nil +} + type ProcessKafkaMessage interface { ProcessMessage(ctx context.Context, msg *sarama.ConsumerMessage) error } + +const kafkaOpentracingHeaderKey string = "opentracingSpanContext" + +func (c *ContinualConsumer) DeserialiseOpentracingSpan( + ctx context.Context, msg *sarama.ConsumerMessage, +) (opentracing.Span, context.Context) { + spanContext := c.spanContextFromMessage(msg) + + span := c.tracer.StartSpan( + "process_message", + opentracing.FollowsFrom(spanContext), + ext.SpanKindConsumer, + opentracing.Tag{Key: "message_bus.destination", Value: c.topic}, + ) + + return span, opentracing.ContextWithSpan(ctx, span) +} + +func (c *ContinualConsumer) spanContextFromMessage(msg *sarama.ConsumerMessage) opentracing.SpanContext { + var opentracingHeader []byte + for _, record := range msg.Headers { + if bytes.Equal(record.Key, []byte(kafkaOpentracingHeaderKey)) { + opentracingHeader = record.Value + break + } + } + + if len(opentracingHeader) == 0 { + logrus.Warn("Failed to find opentracing header") + return nil + } + + var tmc opentracing.TextMapCarrier + if err := json.Unmarshal(opentracingHeader, &tmc); err != nil { + logrus.WithError(err).Error("Failed to unmarshal opentracing header") + return nil + } + + spanContext, err := c.tracer.Extract(opentracing.TextMap, tmc) + if err != nil { + logrus.WithError(err).Error("Failed to extract spancontext from header") + return nil + } + + return spanContext +} + +func SerialiseOpentracingSpan( + tracer opentracing.Tracer, ctx context.Context, msg *sarama.ProducerMessage, +) { + tmc := make(opentracing.TextMapCarrier) + + span := opentracing.SpanFromContext(ctx) + if span == nil { + logrus.Warn("Failed to find span in context") + return + } + + err := tracer.Inject(span.Context(), opentracing.TextMap, tmc) + if err != nil { + logrus.Warn("Failed to inject span") + return + } + + outputBytes, err := json.Marshal(tmc) + if err != nil { + logrus.Warn("Failed to marshal span") + return + } + + msg.Headers = append(msg.Headers, sarama.RecordHeader{ + Key: []byte(kafkaOpentracingHeaderKey), + Value: outputBytes, + }) +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go index b07fb6c8..06b6b820 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/federationsender.go +++ b/src/github.com/matrix-org/dendrite/federationsender/federationsender.go @@ -29,6 +29,8 @@ func SetupFederationSenderComponent( base *basecomponent.BaseDendrite, federation *gomatrixserverlib.FederationClient, ) { + tracer := base.CreateNewTracer("FederationSender") + federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender)) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") @@ -39,5 +41,5 @@ func SetupFederationSenderComponent( handler := consumers.NewOutputRoomEventConsumer( base.Cfg, queues, federationSenderDB, base.QueryAPI(), ) - base.StartRoomServerConsumer(federationSenderDB, handler) + base.StartRoomServerConsumer(tracer, federationSenderDB, handler) } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/publicroomsapi.go b/src/github.com/matrix-org/dendrite/publicroomsapi/publicroomsapi.go index 756c2396..77d66940 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/publicroomsapi.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/publicroomsapi.go @@ -29,6 +29,8 @@ func SetupPublicRoomsAPIComponent( base *basecomponent.BaseDendrite, deviceDB *devices.Database, ) { + tracer := base.CreateNewTracer("PublicRoomsAPI") + publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -37,7 +39,7 @@ func SetupPublicRoomsAPIComponent( handler := consumers.NewOutputRoomEventConsumer( base.Cfg, publicRoomsDB, base.QueryAPI(), ) - base.StartRoomServerConsumer(publicRoomsDB, handler) + base.StartRoomServerConsumer(tracer, publicRoomsDB, handler) routing.Setup(base.APIMux, deviceDB, publicRoomsDB) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 91de6435..983b169b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -66,7 +66,7 @@ type RoomEventDatabase interface { // OutputRoomEventWriter has the APIs needed to write an event to the output logs. type OutputRoomEventWriter interface { // Write a list of events for a room - WriteOutputEvents(roomID string, updates []api.OutputEvent) error + WriteOutputEvents(ctx context.Context, roomID string, updates []api.OutputEvent) error } func processRoomEvent( @@ -188,7 +188,7 @@ func processInviteEvent( return err } - if err = ow.WriteOutputEvents(roomID, outputUpdates); err != nil { + if err = ow.WriteOutputEvents(ctx, roomID, outputUpdates); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go index 253ef9ff..bed4b656 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -20,6 +20,8 @@ import ( "encoding/json" "net/http" + "github.com/opentracing/opentracing-go" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/util" @@ -36,18 +38,20 @@ type RoomserverInputAPI struct { } // WriteOutputEvents implements OutputRoomEventWriter -func (r *RoomserverInputAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { +func (r *RoomserverInputAPI) WriteOutputEvents(ctx context.Context, roomID string, updates []api.OutputEvent) error { messages := make([]*sarama.ProducerMessage, len(updates)) for i := range updates { value, err := json.Marshal(updates[i]) if err != nil { return err } - messages[i] = &sarama.ProducerMessage{ + msg := &sarama.ProducerMessage{ Topic: r.OutputRoomEventTopic, Key: sarama.StringEncoder(roomID), Value: sarama.ByteEncoder(value), } + common.SerialiseOpentracingSpan(opentracing.GlobalTracer(), ctx, msg) + messages[i] = msg } return r.Producer.SendMessages(messages) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 2b82bcba..81320207 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -157,7 +157,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // send the event asynchronously but we would need to ensure that 1) the events are written to the log in // the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the // necessary bookkeeping we'll keep the event sending synchronous for now. - if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil { + if err = u.ow.WriteOutputEvents(u.ctx, u.event.RoomID(), updates); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/syncapi/syncapi.go b/src/github.com/matrix-org/dendrite/syncapi/syncapi.go index 209d0bfa..9f3df2e4 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/syncapi.go +++ b/src/github.com/matrix-org/dendrite/syncapi/syncapi.go @@ -37,6 +37,8 @@ func SetupSyncAPIComponent( deviceDB *devices.Database, accountsDB *accounts.Database, ) { + tracer := base.CreateNewTracer("SyncAPI") + syncDB, err := storage.NewSyncServerDatabase(string(base.Cfg.Database.SyncAPI)) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") @@ -58,12 +60,12 @@ func SetupSyncAPIComponent( roomserverHandler := consumers.NewOutputRoomEventConsumer( base.Cfg, notifier, syncDB, base.QueryAPI(), ) - base.StartRoomServerConsumer(syncDB, roomserverHandler) + base.StartRoomServerConsumer(tracer, syncDB, roomserverHandler) clientHandler := consumers.NewOutputClientDataConsumer( base.Cfg, notifier, syncDB, ) - base.StartClientDataConsumer(syncDB, clientHandler) + base.StartClientDataConsumer(tracer, syncDB, clientHandler) routing.Setup(base.APIMux, requestPool, syncDB, deviceDB) }