Create opentracing spans on kafka consumers

This commit is contained in:
Erik Johnston 2017-11-30 16:19:22 +00:00
parent 11b8dc0d0b
commit d843b51554
4 changed files with 31 additions and 19 deletions

View file

@ -85,6 +85,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
) )
return nil return nil
} }
ctx, span := output.StartSpanAndReplaceContext(context.Background())
defer span.Finish()
ev := &output.NewRoomEvent.Event ev := &output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
@ -92,7 +96,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
"send_as_server": output.NewRoomEvent.SendAsServer, "send_as_server": output.NewRoomEvent.SendAsServer,
}).Info("received event from roomserver") }).Info("received event from roomserver")
if err := s.processMessage(*output.NewRoomEvent); err != nil { if err := s.processMessage(ctx, *output.NewRoomEvent); err != nil {
// panic rather than continue with an inconsistent database // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event": string(ev.JSON()), "event": string(ev.JSON()),
@ -108,8 +112,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// processMessage updates the list of currently joined hosts in the room // processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event. // and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { func (s *OutputRoomEventConsumer) processMessage(
addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) ctx context.Context, ore api.OutputNewRoomEvent,
) error {
addsStateEvents, err := s.lookupStateEvents(ctx, ore.AddsStateEventIDs, ore.Event)
if err != nil { if err != nil {
return err return err
} }
@ -123,7 +129,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// TODO(#290): handle EventIDMismatchError and recover the current state by // TODO(#290): handle EventIDMismatchError and recover the current state by
// talking to the roomserver // talking to the roomserver
oldJoinedHosts, err := s.db.UpdateRoom( oldJoinedHosts, err := s.db.UpdateRoom(
context.TODO(), ctx,
ore.Event.RoomID(), ore.Event.RoomID(),
ore.LastSentEventID, ore.LastSentEventID,
ore.Event.EventID(), ore.Event.EventID(),
@ -148,7 +154,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
} }
// Work out which hosts were joined at the event itself. // Work out which hosts were joined at the event itself.
joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts) joinedHostsAtEvent, err := s.joinedHostsAtEvent(ctx, ore, oldJoinedHosts)
if err != nil { if err != nil {
return err return err
} }
@ -169,7 +175,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// events from the room server. // events from the room server.
// Returns an error if there was a problem talking to the room server. // Returns an error if there was a problem talking to the room server.
func (s *OutputRoomEventConsumer) joinedHostsAtEvent( func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, ctx context.Context, ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost,
) ([]gomatrixserverlib.ServerName, error) { ) ([]gomatrixserverlib.ServerName, error) {
// Combine the delta into a single delta so that the adds and removes can // Combine the delta into a single delta so that the adds and removes can
// cancel each other out. This should reduce the number of times we need // cancel each other out. This should reduce the number of times we need
@ -178,7 +184,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.AddsStateEventIDs, ore.RemovesStateEventIDs,
ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs,
) )
combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) combinedAddsEvents, err := s.lookupStateEvents(ctx, combinedAdds, ore.Event)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -288,7 +294,7 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s
// lookupStateEvents looks up the state events that are added by a new event. // lookupStateEvents looks up the state events that are added by a new event.
func (s *OutputRoomEventConsumer) lookupStateEvents( func (s *OutputRoomEventConsumer) lookupStateEvents(
addsStateEventIDs []string, event gomatrixserverlib.Event, ctx context.Context, addsStateEventIDs []string, event gomatrixserverlib.Event,
) ([]gomatrixserverlib.Event, error) { ) ([]gomatrixserverlib.Event, error) {
// Fast path if there aren't any new state events. // Fast path if there aren't any new state events.
if len(addsStateEventIDs) == 0 { if len(addsStateEventIDs) == 0 {
@ -321,7 +327,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// from the roomserver using the query API. // from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing} eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse var eventResp api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { if err := s.query.QueryEventsByID(ctx, &eventReq, &eventResp); err != nil {
return nil, err return nil, err
} }

View file

@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
ctx, span := output.StartSpanAndReplaceContext(context.Background())
defer span.Finish()
ev := output.NewRoomEvent.Event ev := output.NewRoomEvent.Event
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"event_id": ev.EventID(), "event_id": ev.EventID(),
@ -86,17 +89,17 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs} addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs}
var addQueryRes api.QueryEventsByIDResponse var addQueryRes api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil { if err := s.query.QueryEventsByID(ctx, &addQueryReq, &addQueryRes); err != nil {
log.Warn(err) log.Warn(err)
return err return err
} }
remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
var remQueryRes api.QueryEventsByIDResponse var remQueryRes api.QueryEventsByIDResponse
if err := s.query.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { if err := s.query.QueryEventsByID(ctx, &remQueryReq, &remQueryRes); err != nil {
log.Warn(err) log.Warn(err)
return err return err
} }
return s.db.UpdateRoomFromEvents(context.TODO(), addQueryRes.Events, remQueryRes.Events) return s.db.UpdateRoomFromEvents(ctx, addQueryRes.Events, remQueryRes.Events)
} }

View file

@ -53,7 +53,7 @@ type OutputEvent struct {
func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error { func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error {
span := opentracing.SpanFromContext(ctx) span := opentracing.SpanFromContext(ctx)
ext.SpanKindProducer.Set(span) ext.SpanKindProducer.Set(span)
var carrier opentracing.TextMapCarrier carrier := make(opentracing.TextMapCarrier)
tracer := opentracing.GlobalTracer() tracer := opentracing.GlobalTracer()
err := tracer.Inject(span.Context(), opentracing.TextMap, carrier) err := tracer.Inject(span.Context(), opentracing.TextMap, carrier)
@ -73,12 +73,12 @@ func (o *OutputEvent) StartSpanAndReplaceContext(
producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier) producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier)
var span opentracing.Span var span opentracing.Span
if err == nil { if err != nil {
// Default to a span without reference to producer context. // Default to a span without reference to producer context.
span = tracer.StartSpan("room_event_consumer") span = tracer.StartSpan("output_event_consumer")
} else { } else {
// Set the producer context. // Set the producer context.
span = tracer.StartSpan("room_event_consumer", opentracing.FollowsFrom(producerContext)) span = tracer.StartSpan("output_event_consumer", opentracing.FollowsFrom(producerContext))
} }
return opentracing.ContextWithSpan(ctx, span), span return opentracing.ContextWithSpan(ctx, span), span

View file

@ -80,13 +80,16 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil return nil
} }
ctx, span := output.StartSpanAndReplaceContext(context.Background())
defer span.Finish()
switch output.Type { switch output.Type {
case api.OutputTypeNewRoomEvent: case api.OutputTypeNewRoomEvent:
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) return s.onNewRoomEvent(ctx, *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent: case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) return s.onNewInviteEvent(ctx, *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent: case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) return s.onRetireInviteEvent(ctx, *output.RetireInviteEvent)
default: default:
log.WithField("type", output.Type).Debug( log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type", "roomserver output log: ignoring unknown output type",