Add span to kafka streams

This commit is contained in:
Erik Johnston 2017-11-30 15:39:43 +00:00
parent cc12fc930a
commit 11b8dc0d0b
5 changed files with 93 additions and 16 deletions

View file

@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
ctx, span := output.StartSpanAndReplaceContext(context.Background())
defer span.Finish()
if output.Type != api.OutputTypeNewRoomEvent { if output.Type != api.OutputTypeNewRoomEvent {
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",
@ -96,7 +99,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return err return err
} }
return s.db.UpdateMemberships(context.TODO(), events, output.NewRoomEvent.RemovesStateEventIDs) return s.db.UpdateMemberships(ctx, events, output.NewRoomEvent.RemovesStateEventIDs)
} }
// lookupStateEvents looks up the state events that are added by a new event. // lookupStateEvents looks up the state events that are added by a new event.

View file

@ -15,7 +15,11 @@
package api package api
import ( import (
"context"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
) )
// An OutputType is a type of roomserver output. // An OutputType is a type of roomserver output.
@ -41,6 +45,43 @@ type OutputEvent struct {
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent // The content of event with type OutputTypeRetireInviteEvent
RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"` RetireInviteEvent *OutputRetireInviteEvent `json:"retire_invite_event,omitempty"`
// Serialized span context
OpentracingCarrier opentracing.TextMapCarrier `json:"opentracing_carrier"`
}
// AddSpanFromContext fills out the OpentracingCarrier field from the given context
func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error {
span := opentracing.SpanFromContext(ctx)
ext.SpanKindProducer.Set(span)
var carrier opentracing.TextMapCarrier
tracer := opentracing.GlobalTracer()
err := tracer.Inject(span.Context(), opentracing.TextMap, carrier)
if err != nil {
return err
}
o.OpentracingCarrier = carrier
return nil
}
func (o *OutputEvent) StartSpanAndReplaceContext(
ctx context.Context,
) (context.Context, opentracing.Span) {
tracer := opentracing.GlobalTracer()
producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier)
var span opentracing.Span
if err == nil {
// Default to a span without reference to producer context.
span = tracer.StartSpan("room_event_consumer")
} else {
// Set the producer context.
span = tracer.StartSpan("room_event_consumer", opentracing.FollowsFrom(producerContext))
}
return opentracing.ContextWithSpan(ctx, span), span
} }
// An OutputNewRoomEvent is written when the roomserver receives a new event. // An OutputNewRoomEvent is written when the roomserver receives a new event.

View file

@ -183,7 +183,7 @@ func processInviteEvent(
return nil return nil
} }
outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil) outputUpdates, err := updateToInviteMembership(ctx, updater, &input.Event, nil)
if err != nil { if err != nil {
return err return err
} }

View file

@ -275,10 +275,17 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
} }
ore.SendAsServer = u.sendAsServer ore.SendAsServer = u.sendAsServer
return &api.OutputEvent{ oe := api.OutputEvent{
Type: api.OutputTypeNewRoomEvent, Type: api.OutputTypeNewRoomEvent,
NewRoomEvent: &ore, NewRoomEvent: &ore,
}, nil }
err = oe.AddSpanFromContext(u.ctx)
if err != nil {
return nil, err
}
return &oe, nil
} }
type eventNIDSorter []types.EventNID type eventNIDSorter []types.EventNID

View file

@ -77,7 +77,7 @@ func updateMemberships(
ae = &ev.Event ae = &ev.Event
} }
} }
if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil { if updates, err = updateMembership(ctx, updater, targetUserNID, re, ae, updates); err != nil {
return nil, err return nil, err
} }
} }
@ -85,6 +85,7 @@ func updateMemberships(
} }
func updateMembership( func updateMembership(
ctx context.Context,
updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID, updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID,
remove, add *gomatrixserverlib.Event, remove, add *gomatrixserverlib.Event,
updates []api.OutputEvent, updates []api.OutputEvent,
@ -119,11 +120,11 @@ func updateMembership(
switch new { switch new {
case invite: case invite:
return updateToInviteMembership(mu, add, updates) return updateToInviteMembership(ctx, mu, add, updates)
case join: case join:
return updateToJoinMembership(mu, add, updates) return updateToJoinMembership(ctx, mu, add, updates)
case leave, ban: case leave, ban:
return updateToLeaveMembership(mu, add, new, updates) return updateToLeaveMembership(ctx, mu, add, new, updates)
default: default:
panic(fmt.Errorf( panic(fmt.Errorf(
"input: membership %q is not one of the allowed values", new, "input: membership %q is not one of the allowed values", new,
@ -132,7 +133,7 @@ func updateMembership(
} }
func updateToInviteMembership( func updateToInviteMembership(
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
) ([]api.OutputEvent, error) { ) ([]api.OutputEvent, error) {
// We may have already sent the invite to the user, either because we are // We may have already sent the invite to the user, either because we are
// reprocessing this event, or because the we received this invite from a // reprocessing this event, or because the we received this invite from a
@ -151,16 +152,24 @@ func updateToInviteMembership(
onie := api.OutputNewInviteEvent{ onie := api.OutputNewInviteEvent{
Event: *add, Event: *add,
} }
updates = append(updates, api.OutputEvent{
oe := api.OutputEvent{
Type: api.OutputTypeNewInviteEvent, Type: api.OutputTypeNewInviteEvent,
NewInviteEvent: &onie, NewInviteEvent: &onie,
}) }
err = oe.AddSpanFromContext(ctx)
if err != nil {
return nil, err
}
updates = append(updates, oe)
} }
return updates, nil return updates, nil
} }
func updateToJoinMembership( func updateToJoinMembership(
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent, ctx context.Context, mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
) ([]api.OutputEvent, error) { ) ([]api.OutputEvent, error) {
// If the user is already marked as being joined, we call SetToJoin to update // If the user is already marked as being joined, we call SetToJoin to update
// the event ID then we can return immediately. Retired is ignored as there // the event ID then we can return immediately. Retired is ignored as there
@ -187,15 +196,24 @@ func updateToJoinMembership(
RetiredByEventID: add.EventID(), RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(), TargetUserID: *add.StateKey(),
} }
updates = append(updates, api.OutputEvent{
oe := api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent, Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &orie, RetireInviteEvent: &orie,
}) }
err = oe.AddSpanFromContext(ctx)
if err != nil {
return nil, err
}
updates = append(updates, oe)
} }
return updates, nil return updates, nil
} }
func updateToLeaveMembership( func updateToLeaveMembership(
ctx context.Context,
mu types.MembershipUpdater, add *gomatrixserverlib.Event, mu types.MembershipUpdater, add *gomatrixserverlib.Event,
newMembership string, updates []api.OutputEvent, newMembership string, updates []api.OutputEvent,
) ([]api.OutputEvent, error) { ) ([]api.OutputEvent, error) {
@ -219,10 +237,18 @@ func updateToLeaveMembership(
RetiredByEventID: add.EventID(), RetiredByEventID: add.EventID(),
TargetUserID: *add.StateKey(), TargetUserID: *add.StateKey(),
} }
updates = append(updates, api.OutputEvent{
oe := api.OutputEvent{
Type: api.OutputTypeRetireInviteEvent, Type: api.OutputTypeRetireInviteEvent,
RetireInviteEvent: &orie, RetireInviteEvent: &orie,
}) }
err = oe.AddSpanFromContext(ctx)
if err != nil {
return nil, err
}
updates = append(updates, oe)
} }
return updates, nil return updates, nil
} }