diff --git a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go index 45e48f16..5e856ebb 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/federationsender/consumers/roomserver.go @@ -85,6 +85,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { ) return nil } + + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + ev := &output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), @@ -92,7 +96,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "send_as_server": output.NewRoomEvent.SendAsServer, }).Info("received event from roomserver") - if err := s.processMessage(*output.NewRoomEvent); err != nil { + if err := s.processMessage(ctx, *output.NewRoomEvent); err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ "event": string(ev.JSON()), @@ -108,8 +112,10 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // processMessage updates the list of currently joined hosts in the room // and then sends the event to the hosts that were joined before the event. -func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error { - addsStateEvents, err := s.lookupStateEvents(ore.AddsStateEventIDs, ore.Event) +func (s *OutputRoomEventConsumer) processMessage( + ctx context.Context, ore api.OutputNewRoomEvent, +) error { + addsStateEvents, err := s.lookupStateEvents(ctx, ore.AddsStateEventIDs, ore.Event) if err != nil { return err } @@ -123,7 +129,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // TODO(#290): handle EventIDMismatchError and recover the current state by // talking to the roomserver oldJoinedHosts, err := s.db.UpdateRoom( - context.TODO(), + ctx, ore.Event.RoomID(), ore.LastSentEventID, ore.Event.EventID(), @@ -148,7 +154,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err } // Work out which hosts were joined at the event itself. - joinedHostsAtEvent, err := s.joinedHostsAtEvent(ore, oldJoinedHosts) + joinedHostsAtEvent, err := s.joinedHostsAtEvent(ctx, ore, oldJoinedHosts) if err != nil { return err } @@ -169,7 +175,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err // events from the room server. // Returns an error if there was a problem talking to the room server. func (s *OutputRoomEventConsumer) joinedHostsAtEvent( - ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, + ctx context.Context, ore api.OutputNewRoomEvent, oldJoinedHosts []types.JoinedHost, ) ([]gomatrixserverlib.ServerName, error) { // Combine the delta into a single delta so that the adds and removes can // cancel each other out. This should reduce the number of times we need @@ -178,7 +184,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( ore.AddsStateEventIDs, ore.RemovesStateEventIDs, ore.StateBeforeAddsEventIDs, ore.StateBeforeRemovesEventIDs, ) - combinedAddsEvents, err := s.lookupStateEvents(combinedAdds, ore.Event) + combinedAddsEvents, err := s.lookupStateEvents(ctx, combinedAdds, ore.Event) if err != nil { return nil, err } @@ -288,7 +294,7 @@ func combineDeltas(adds1, removes1, adds2, removes2 []string) (adds, removes []s // lookupStateEvents looks up the state events that are added by a new event. func (s *OutputRoomEventConsumer) lookupStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.Event, + ctx context.Context, addsStateEventIDs []string, event gomatrixserverlib.Event, ) ([]gomatrixserverlib.Event, error) { // Fast path if there aren't any new state events. if len(addsStateEventIDs) == 0 { @@ -321,7 +327,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents( // from the roomserver using the query API. eventReq := api.QueryEventsByIDRequest{EventIDs: missing} var eventResp api.QueryEventsByIDResponse - if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil { + if err := s.query.QueryEventsByID(ctx, &eventReq, &eventResp); err != nil { return nil, err } diff --git a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go index b7d42b11..3a83ba67 100644 --- a/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/publicroomsapi/consumers/roomserver.go @@ -77,6 +77,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + ev := output.NewRoomEvent.Event log.WithFields(log.Fields{ "event_id": ev.EventID(), @@ -86,17 +89,17 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { addQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.AddsStateEventIDs} var addQueryRes api.QueryEventsByIDResponse - if err := s.query.QueryEventsByID(context.TODO(), &addQueryReq, &addQueryRes); err != nil { + if err := s.query.QueryEventsByID(ctx, &addQueryReq, &addQueryRes); err != nil { log.Warn(err) return err } remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs} var remQueryRes api.QueryEventsByIDResponse - if err := s.query.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil { + if err := s.query.QueryEventsByID(ctx, &remQueryReq, &remQueryRes); err != nil { log.Warn(err) return err } - return s.db.UpdateRoomFromEvents(context.TODO(), addQueryRes.Events, remQueryRes.Events) + return s.db.UpdateRoomFromEvents(ctx, addQueryRes.Events, remQueryRes.Events) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/output.go b/src/github.com/matrix-org/dendrite/roomserver/api/output.go index 522f1b61..3c928202 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/output.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/output.go @@ -53,7 +53,7 @@ type OutputEvent struct { func (o *OutputEvent) AddSpanFromContext(ctx context.Context) error { span := opentracing.SpanFromContext(ctx) ext.SpanKindProducer.Set(span) - var carrier opentracing.TextMapCarrier + carrier := make(opentracing.TextMapCarrier) tracer := opentracing.GlobalTracer() err := tracer.Inject(span.Context(), opentracing.TextMap, carrier) @@ -73,12 +73,12 @@ func (o *OutputEvent) StartSpanAndReplaceContext( producerContext, err := tracer.Extract(opentracing.TextMap, o.OpentracingCarrier) var span opentracing.Span - if err == nil { + if err != nil { // Default to a span without reference to producer context. - span = tracer.StartSpan("room_event_consumer") + span = tracer.StartSpan("output_event_consumer") } else { // Set the producer context. - span = tracer.StartSpan("room_event_consumer", opentracing.FollowsFrom(producerContext)) + span = tracer.StartSpan("output_event_consumer", opentracing.FollowsFrom(producerContext)) } return opentracing.ContextWithSpan(ctx, span), span diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 677eeb42..a02c4be4 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -80,13 +80,16 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return nil } + ctx, span := output.StartSpanAndReplaceContext(context.Background()) + defer span.Finish() + switch output.Type { case api.OutputTypeNewRoomEvent: - return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + return s.onNewRoomEvent(ctx, *output.NewRoomEvent) case api.OutputTypeNewInviteEvent: - return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) + return s.onNewInviteEvent(ctx, *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: - return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) + return s.onRetireInviteEvent(ctx, *output.RetireInviteEvent) default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type",