From 853993618c393e4fbe4106bbd1e04b4d4bd202bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Wed, 28 Aug 2019 21:54:13 -0500 Subject: [PATCH] sdk/trace: add SpanProcessor SpanProcessor is an interface that allows to register hooks for Span start and end invocations. This commit adds the SpanProcessor interface to the SDK as well as the MultiSpanProcessor that allows to register multiple processors. --- .../src/opentelemetry/sdk/trace/__init__.py | 86 ++++++++++++- opentelemetry-sdk/tests/trace/test_trace.py | 113 ++++++++++++++++++ 2 files changed, 198 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 72c5c303469..21acb394945 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -145,6 +145,68 @@ def from_map(cls, maxlen, mapping): Link = namedtuple("Link", ("context", "attributes")) +class SpanProcessor: + """Interface which allows hooks for SDK's `Span`s start and end method + invocations. + + Span processors can be registered directly using + :func:`~Tracer:add_span_processor` and they are invoked in the same order + as they were registered. + """ + + def on_start(self, span: "Span") -> None: + """Called when a :class:`Span` is started. + + This method is called synchronously on the thread that starts the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`Span` that just started. + """ + + def on_end(self, span: "Span") -> None: + """Called when a :class:`Span` is ended. + + This method is called synchronously on the thread that ends the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`Span` that just ended. + """ + + def shutdown(self) -> None: + """Called when a :class:`Tracer` is shutdown.""" + + +class MultiSpanProcessor(SpanProcessor): + """Implementation of :class:`SpanProcessor` that forwards all received + events to a list of `SpanProcessor`. + """ + + def __init__(self): + # use a tuple to avoid race conditions when adding a new span and + # iterating through it on "on_start" and "on_end". + self._span_processors = () + self._lock = threading.Lock() + + def add_span_processor(self, span_processor: SpanProcessor): + """Adds a SpanProcessor to the list handled by this instance.""" + with self._lock: + self._span_processors = self._span_processors + (span_processor,) + + def on_start(self, span: "Span") -> None: + for sp in self._span_processors: + sp.on_start(span) + + def on_end(self, span: "Span") -> None: + for sp in self._span_processors: + sp.on_end(span) + + def shutdown(self) -> None: + for sp in self._span_processors: + sp.shutdown() + + class Span(trace_api.Span): """See `opentelemetry.trace.Span`. @@ -161,6 +223,8 @@ class Span(trace_api.Span): attributes: The span's attributes to be exported events: Timestamped events to be exported links: Links to other spans to be exported + span_processor: `SpanProcessor` to invoke when starting and ending + this `Span`. """ # Initialize these lazily assuming most spans won't have them. @@ -179,6 +243,7 @@ def __init__( attributes: types.Attributes = None, # TODO events: typing.Sequence[Event] = None, # TODO links: typing.Sequence[Link] = None, # TODO + span_processor: SpanProcessor = SpanProcessor(), ) -> None: self.name = name @@ -190,6 +255,7 @@ def __init__( self.attributes = attributes self.events = events self.links = links + self.span_processor = span_processor if attributes is None: self.attributes = Span.empty_attributes @@ -247,10 +313,12 @@ def add_link( def start(self): if self.start_time is None: self.start_time = util.time_ns() + self.span_processor.on_start(self) def end(self): if self.end_time is None: self.end_time = util.time_ns() + self.span_processor.on_end(self) def update_name(self, name: str) -> None: self.name = name @@ -286,6 +354,7 @@ def __init__(self, name: str = "") -> None: if name: slot_name = "{}.current_span".format(name) self._current_span_slot = Context.register_slot(slot_name) + self._active_span_processor = MultiSpanProcessor() def get_current_span(self): """See `opentelemetry.trace.Tracer.get_current_span`.""" @@ -325,7 +394,12 @@ def create_span( parent_context.trace_options, parent_context.trace_state, ) - return Span(name=name, context=context, parent=parent) + return Span( + name=name, + context=context, + parent=parent, + span_processor=self._active_span_processor, + ) @contextmanager def use_span(self, span: "Span") -> typing.Iterator["Span"]: @@ -339,5 +413,15 @@ def use_span(self, span: "Span") -> typing.Iterator["Span"]: self._current_span_slot.set(span_snapshot) span.end() + def add_span_processor(self, span_processor: SpanProcessor) -> None: + """Registers a new :class:`SpanProcessor` for this `Tracer`. + + The span processors are invoked in the same order they are registered. + """ + + # no lock here because MultiSpanProcessor.add_span_processor is + # thread safe + self._active_span_processor.add_span_processor(span_processor) + tracer = Tracer() diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index 240972344c8..c0a0e65008d 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -198,3 +198,116 @@ class TestSpan(unittest.TestCase): def test_basic_span(self): span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext)) self.assertEqual(span.name, "name") + + +def span_event_start_fmt(span_processor_name, span_name): + return span_processor_name + ":" + span_name + ":start" + + +def span_event_end_fmt(span_processor_name, span_name): + return span_processor_name + ":" + span_name + ":end" + + +class MySpanProcessor(trace.SpanProcessor): + def __init__(self, name, span_list): + self.name = name + self.span_list = span_list + + def on_start(self, span: "trace.Span") -> None: + self.span_list.append(span_event_start_fmt(self.name, span.name)) + + def on_end(self, span: "trace.Span") -> None: + self.span_list.append(span_event_end_fmt(self.name, span.name)) + + +class TestSpanProcessor(unittest.TestCase): + def test_span_processor(self): + tracer = trace.Tracer() + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + # Span processors are created but not added to the tracer yet + sp1 = MySpanProcessor("SP1", spans_calls_list) + sp2 = MySpanProcessor("SP2", spans_calls_list) + + with tracer.start_span("foo"): + with tracer.start_span("bar"): + with tracer.start_span("baz"): + pass + + # at this point lists must be empty + self.assertEqual(len(spans_calls_list), 0) + + # add single span processor + tracer.add_span_processor(sp1) + + with tracer.start_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + + with tracer.start_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + + with tracer.start_span("baz"): + expected_list.append(span_event_start_fmt("SP1", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "bar")) + + expected_list.append(span_event_end_fmt("SP1", "foo")) + + self.assertListEqual(spans_calls_list, expected_list) + + spans_calls_list.clear() + expected_list.clear() + + # go for multiple span processors + tracer.add_span_processor(sp2) + + with tracer.start_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + expected_list.append(span_event_start_fmt("SP2", "foo")) + + with tracer.start_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + expected_list.append(span_event_start_fmt("SP2", "bar")) + + with tracer.start_span("baz"): + expected_list.append(span_event_start_fmt("SP1", "baz")) + expected_list.append(span_event_start_fmt("SP2", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_end_fmt("SP2", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_end_fmt("SP2", "bar")) + + expected_list.append(span_event_end_fmt("SP1", "foo")) + expected_list.append(span_event_end_fmt("SP2", "foo")) + + # compare if two lists are the same + self.assertListEqual(spans_calls_list, expected_list) + + def test_add_span_processor_after_span_creation(self): + tracer = trace.Tracer() + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + # Span processors are created but not added to the tracer yet + sp = MySpanProcessor("SP1", spans_calls_list) + + with tracer.start_span("foo"): + with tracer.start_span("bar"): + with tracer.start_span("baz"): + # add span processor after spans have been created + tracer.add_span_processor(sp) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + + expected_list.append(span_event_end_fmt("SP1", "bar")) + + expected_list.append(span_event_end_fmt("SP1", "foo")) + + self.assertListEqual(spans_calls_list, expected_list)