From 11b8dc0d0b75e340383fd3de848104731169a142 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Nov 2017 15:39:43 +0000 Subject: [PATCH] Add span to kafka streams --- .../clientapi/consumers/roomserver.go | 5 +- .../dendrite/roomserver/api/output.go | 41 +++++++++++++++ .../dendrite/roomserver/input/events.go | 2 +- .../roomserver/input/latest_events.go | 11 +++- .../dendrite/roomserver/input/membership.go | 50 ++++++++++++++----- 5 files changed, 93 insertions(+), 16 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go index 0ee7c6bf..ad8acacf 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/consumers/roomserver.go @@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + if output.Type != api.OutputTypeNewRoomEvent { log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", @@ -96,7 +99,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return err } - return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) + return s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs) } // lookupStateEvents looks up the state events that are added by a new event. diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 6a5c924c..522f1b61 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -15,7 +15,11 @@ package api import ( + "context" + "github.com/matrix-org/gomatrixserverlib" + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" ) // An OutputType is a type of roomserver output. @@ -41,6 +45,43 @@ type OutputEvent struct { NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` + // Serialized span context + OpentracingCarrier opentracing.TextMapCarrier `json:"opentracing_carrier"` +} + +// AddSpanFromContext fills out the OpentracingCarrier field from the given context +func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error { + span := opentracing.SpanFromContext(ctx) + ext.SpanKindProducer.Set(span) + var carrier opentracing.TextMapCarrier + tracer := opentracing.GlobalTracer() + + err := tracer.Inject(span.Context(), opentracing.TextMap, carrier) + if err != nil { + return err + } + + o.OpentracingCarrier = carrier + + return nil +} + +func (o *OutputEvent) StartSpanAndReplaceContext( + ctx context.Context, +) (context.Context, opentracing.Span) { + tracer := opentracing.GlobalTracer() + producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier) + + var span opentracing.Span + if err == nil { + // Default to a span without reference to producer context. + span = tracer.StartSpan("room_event_consumer") + } else { + // Set the producer context. + span = tracer.StartSpan("room_event_consumer", opentracing.FollowsFrom(producerContext)) + } + + return opentracing.ContextWithSpan(ctx, span), span } // An OutputNewRoomEvent is written when the roomserver receives a new event. diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index 9032219e..31fd5f2f 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -183,7 +183,7 @@ func processInviteEvent( return nil } - outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil) + outputUpdates, err := updateToInviteMembership(ctx, updater, &input.Event, nil) if err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index 5767daab..9fb945c4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -275,10 +275,17 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) } ore.SendAsServer = u.sendAsServer - return &api.OutputEvent{ + oe := api.OutputEvent{ Type: api.OutputTypeNewRoomEvent, NewRoomEvent: &ore, - }, nil + } + + err = oe.AddSpanFromContext(u.ctx) + if err != nil { + return nil, err + } + + return &oe, nil } type eventNIDSorter []types.EventNID diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go index 4c42cadd..5e1d0f93 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/membership.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/membership.go @@ -77,7 +77,7 @@ func updateMemberships( ae = &ev.Event } } - if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil { + if updates, err = updateMembership(ctx, updater, targetUserNID, re, ae, updates); err != nil { return nil, err } } @@ -85,6 +85,7 @@ func updateMemberships( } func updateMembership( + ctx context.Context, updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID, remove, add *gomatrixserverlib.Event, updates []api.OutputEvent, @@ -119,11 +120,11 @@ func updateMembership( switch new { case invite: - return updateToInviteMembership(mu, add, updates) + return updateToInviteMembership(ctx, mu, add, updates) case join: - return updateToJoinMembership(mu, add, updates) + return updateToJoinMembership(ctx, mu, add, updates) case leave, ban: - return updateToLeaveMembership(mu, add, new, updates) + return updateToLeaveMembership(ctx, mu, add, new, updates) default: panic(fmt.Errorf( "input: membership %q is not one of the allowed values", new, @@ -132,7 +133,7 @@ func updateMembership( } func updateToInviteMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { // We may have already sent the invite to the user, either because we are // reprocessing this event, or because the we received this invite from a @@ -151,16 +152,24 @@ func updateToInviteMembership( onie := api.OutputNewInviteEvent{ Event: *add, } - updates = append(updates, api.OutputEvent{ + + oe := api.OutputEvent{ Type: api.OutputTypeNewInviteEvent, NewInviteEvent: &onie, - }) + } + + err = oe.AddSpanFromContext(ctx) + if err != nil { + return nil, err + } + + updates = append(updates, oe) } return updates, nil } func updateToJoinMembership( - mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, + ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { // If the user is already marked as being joined, we call SetToJoin to update // the event ID then we can return immediately. Retired is ignored as there @@ -187,15 +196,24 @@ func updateToJoinMembership( RetiredByEventID: add.EventID(), TargetUserID: *add.StateKey(), } - updates = append(updates, api.OutputEvent{ + + oe := api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, RetireInviteEvent: &orie, - }) + } + + err = oe.AddSpanFromContext(ctx) + if err != nil { + return nil, err + } + + updates = append(updates, oe) } return updates, nil } func updateToLeaveMembership( + ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, newMembership string, updates []api.OutputEvent, ) ([]api.OutputEvent, error) { @@ -219,10 +237,18 @@ func updateToLeaveMembership( RetiredByEventID: add.EventID(), TargetUserID: *add.StateKey(), } - updates = append(updates, api.OutputEvent{ + + oe := api.OutputEvent{ Type: api.OutputTypeRetireInviteEvent, RetireInviteEvent: &orie, - }) + } + + err = oe.AddSpanFromContext(ctx) + if err != nil { + return nil, err + } + + updates = append(updates, oe) } return updates, nil }