From 40c9d8f50886cc6d39d25943391c9f354cb0496c Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Mon, 9 Sep 2019 16:26:34 -0700 Subject: [PATCH 01/25] Experimentation on tracing and EventHubs --- .../azure/eventhub/producer.py | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index cab9638f2acc..a74259aa9420 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -12,6 +12,11 @@ from uamqp import types, constants, errors # type: ignore from uamqp import SendClient # type: ignore +from azure.core.tracing.context import tracing_context +from azure.core.settings import settings +from opencensus.trace.span import SpanKind +from opencensus.trace.status import Status + from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ._consumer_producer_mixin import ConsumerProducerMixin @@ -218,6 +223,15 @@ def send(self, event_data, partition_key=None, timeout=None): :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ + # Tracing code + parent_span = tracing_context.current_span.get() + wrapper_class = settings.tracing_implementation() + if parent_span is None and wrapper_class is not None: + current_span_instance = wrapper_class.get_current_span() + parent_span = wrapper_class(current_span_instance) + + child = parent_span.span(name="Azure.EventHubs.send") + child.span_instance.span_kind = SpanKind.CLIENT self._check_closed() if isinstance(event_data, EventData): @@ -235,7 +249,19 @@ def send(self, event_data, partition_key=None, timeout=None): wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - self._send_event_data_with_retry(timeout=timeout) + + # Start current span and set metadata + child.start() + child.add_attribute("component", "eventhubs") + child.add_attribute("message_bus.destination", self._client._address.path) + child.add_attribute("peer.address", self._client._address.hostname) + + try: + self._send_event_data_with_retry(timeout=timeout) + except Exception as err: + child.span_instance.status = Status.from_exception(err) + finally: + child.finish() def close(self, exception=None): # pylint:disable=useless-super-delegation # type:(Exception) -> None From 93dc90802d1dadc6629eabfc6e3f0d2d48fdf5ff Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 10 Sep 2019 16:29:01 -0700 Subject: [PATCH 02/25] Continue to let the initial excp raise --- sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index a74259aa9420..86d1d52e03d1 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -260,6 +260,7 @@ def send(self, event_data, partition_key=None, timeout=None): self._send_event_data_with_retry(timeout=timeout) except Exception as err: child.span_instance.status = Status.from_exception(err) + raise finally: child.finish() From b22a944bee5f05c65a6ab1acca34d192289c1181 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Wed, 11 Sep 2019 16:54:29 -0700 Subject: [PATCH 03/25] Naive direct tracing implementation --- .../azure/eventhub/aio/consumer_async.py | 31 +++++++++++++- .../aio/eventprocessor/event_processor.py | 31 +++++++++++++- .../azure/eventhub/aio/producer_async.py | 36 +++++++++++++++- .../azure/eventhub/client_abstract.py | 5 +++ .../azure/eventhub/consumer.py | 31 +++++++++++++- .../azure/eventhub/producer.py | 42 +++++++++++-------- 6 files changed, 154 insertions(+), 22 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index efad6a3cb7db..313fef495b0d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -11,6 +11,10 @@ from uamqp import errors, types # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore +from azure.core.tracing.common import get_parent_span +from opencensus.trace.span import SpanKind +from opencensus.trace.status import Status + from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler from ._consumer_producer_mixin_async import ConsumerProducerMixin @@ -179,6 +183,14 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) + + # Tracing + current_span = get_parent_span() + if current_span and event_data.application_properties: + traceparent = event_data.application_properties.get(b"Diagnostic-Id", None).decode('ascii') + if traceparent: + current_span.link(traceparent) + return data_batch async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): @@ -225,12 +237,29 @@ async def receive(self, *, max_batch_size=None, timeout=None): :caption: Receives events asynchronously """ + # Tracing code + parent_span = get_parent_span() + if parent_span: + child = parent_span.span(name="Azure.EventHubs.receive") + child.span_instance.span_kind = SpanKind.CLIENT + + child.start() + self._client._add_span_request_attributes(child) + self._check_closed() timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access - return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + try: + return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + except Exception as err: + if parent_span: + child.span_instance.status = Status.from_exception(err) + raise + finally: + if parent_span: + child.finish() async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 18446249ff23..2f9ac2ccea1e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -8,6 +8,10 @@ import asyncio import logging +from azure.core.tracing.common import get_parent_span +from opencensus.trace.span import SpanKind +from opencensus.trace.status import Status + from azure.eventhub import EventPosition, EventHubError from azure.eventhub.aio import EventHubClient from .partition_context import PartitionContext @@ -247,7 +251,32 @@ async def close(reason): while True: try: events = await partition_consumer.receive() - await partition_processor.process_events(events, partition_context) + + # Tracing + parent_span = get_parent_span() + if parent_span: + child = parent_span.span(name="Azure.EventHubs.process") + child.span_instance.span_kind = SpanKind.SERVER + + for event_data in events: + if event_data.application_properties: + traceparent = event_data.application_properties.get(b"Diagnostic-Id", None).decode('ascii') + if traceparent: + child.link(traceparent) + + child.start() + self._eventhub_client._add_span_request_attributes(child) + + try: + await partition_processor.process_events(events, partition_context) + except Exception as err: + if parent_span: + child.span_instance.status = Status.from_exception(err) + raise + finally: + if parent_span: + child.finish() + except asyncio.CancelledError: log.info( "PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r" diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index ec4e39c87116..fa6e142d2f0b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -11,6 +11,10 @@ from uamqp import types, constants, errors # type: ignore from uamqp import SendClientAsync # type: ignore +from azure.core.tracing.common import get_parent_span +from opencensus.trace.span import SpanKind +from opencensus.trace.status import Status + from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError from ..producer import _error, _set_partition_key @@ -213,12 +217,26 @@ async def send( :caption: Sends an event data and blocks until acknowledgement is received or operation times out. """ + # Tracing code + parent_span = get_parent_span() + if parent_span: + child = parent_span.span(name="Azure.EventHubs.send") + child.span_instance.span_kind = SpanKind.CLIENT + + def trace_message(message): + message_span = child.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(message.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + message.application_properties = app_prop + message_span.finish() self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data + trace_message(wrapper_event_data) else: if isinstance(event_data, EventDataBatch): if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -228,9 +246,25 @@ async def send( if partition_key: event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access + for internal_message in wrapper_event_data.message._body_gen: + trace_message(internal_message) + wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor + + if parent_span: + # Start current span and set metadata + child.start() + self._client._add_span_request_attributes(child) + try: + await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor + except Exception as err: + if parent_span: + child.span_instance.status = Status.from_exception(err) + raise + finally: + if parent_span: + child.finish() async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index c6879730266c..62ea791a5894 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -214,6 +214,11 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use properties["user-agent"] = final_user_agent return properties + def _add_span_request_attributes(self, span): + span.add_attribute("component", "eventhubs") + span.add_attribute("message_bus.destination", self._address.path) + span.add_attribute("peer.address", self._address.hostname) + def _process_redirect_uri(self, redirect): redirect_uri = redirect.address.decode('utf-8') auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 604d9c7d7b82..006c774ec6cc 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -12,6 +12,10 @@ from uamqp import types, errors # type: ignore from uamqp import ReceiveClient, Source # type: ignore +from azure.core.tracing.common import get_parent_span +from opencensus.trace.span import SpanKind +from opencensus.trace.status import Status + from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler from ._consumer_producer_mixin import ConsumerProducerMixin @@ -173,6 +177,14 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) + + # Tracing + current_span = get_parent_span() + if current_span and event_data.application_properties: + traceparent = event_data.application_properties.get(b"Diagnostic-Id", None).decode('ascii') + if traceparent: + current_span.link(traceparent) + return data_batch def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs): @@ -219,12 +231,29 @@ def receive(self, max_batch_size=None, timeout=None): :caption: Receive events from the EventHub. """ + # Tracing code + parent_span = get_parent_span() + if parent_span: + child = parent_span.span(name="Azure.EventHubs.receive") + child.span_instance.span_kind = SpanKind.CLIENT + + child.start() + self._client._add_span_request_attributes(child) + self._check_closed() timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access - return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + try: + return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + except Exception as err: + if parent_span: + child.span_instance.status = Status.from_exception(err) + raise + finally: + if parent_span: + child.finish() def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 86d1d52e03d1..24c5059c7712 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -12,8 +12,7 @@ from uamqp import types, constants, errors # type: ignore from uamqp import SendClient # type: ignore -from azure.core.tracing.context import tracing_context -from azure.core.settings import settings +from azure.core.tracing.common import get_parent_span from opencensus.trace.span import SpanKind from opencensus.trace.status import Status @@ -224,20 +223,25 @@ def send(self, event_data, partition_key=None, timeout=None): """ # Tracing code - parent_span = tracing_context.current_span.get() - wrapper_class = settings.tracing_implementation() - if parent_span is None and wrapper_class is not None: - current_span_instance = wrapper_class.get_current_span() - parent_span = wrapper_class(current_span_instance) - - child = parent_span.span(name="Azure.EventHubs.send") - child.span_instance.span_kind = SpanKind.CLIENT + parent_span = get_parent_span() + if parent_span: + child = parent_span.span(name="Azure.EventHubs.send") + child.span_instance.span_kind = SpanKind.CLIENT + + def trace_message(message): + message_span = child.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(message.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + message.application_properties = app_prop + message_span.finish() self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data + trace_message(wrapper_event_data) else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -247,22 +251,24 @@ def send(self, event_data, partition_key=None, timeout=None): if partition_key: event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access + for internal_message in wrapper_event_data.message._body_gen: + trace_message(internal_message) wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - # Start current span and set metadata - child.start() - child.add_attribute("component", "eventhubs") - child.add_attribute("message_bus.destination", self._client._address.path) - child.add_attribute("peer.address", self._client._address.hostname) - + if parent_span: + # Start current span and set metadata + child.start() + self._client._add_span_request_attributes(child) try: self._send_event_data_with_retry(timeout=timeout) except Exception as err: - child.span_instance.status = Status.from_exception(err) + if parent_span: + child.span_instance.status = Status.from_exception(err) raise finally: - child.finish() + if parent_span: + child.finish() def close(self, exception=None): # pylint:disable=useless-super-delegation # type:(Exception) -> None From e8724a418bb4cc13d25c4ef42624bf17474787e0 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Thu, 12 Sep 2019 14:49:09 -0700 Subject: [PATCH 04/25] Update Kind in EventHub to generic one --- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 4 ++-- .../azure/eventhub/aio/eventprocessor/event_processor.py | 4 ++-- .../azure-eventhubs/azure/eventhub/aio/producer_async.py | 3 ++- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 4 ++-- sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 4 ++-- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 313fef495b0d..54b85ac8f9b7 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -12,7 +12,7 @@ from uamqp import ReceiveClientAsync, Source # type: ignore from azure.core.tracing.common import get_parent_span -from opencensus.trace.span import SpanKind +from azure.core.tracing import SpanKind from opencensus.trace.status import Status from azure.eventhub import EventData, EventPosition @@ -241,7 +241,7 @@ async def receive(self, *, max_batch_size=None, timeout=None): parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.receive") - child.span_instance.span_kind = SpanKind.CLIENT + child.kind = SpanKind.CLIENT # Should be PRODUCER child.start() self._client._add_span_request_attributes(child) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 2f9ac2ccea1e..8bb8fc344fde 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -9,7 +9,7 @@ import logging from azure.core.tracing.common import get_parent_span -from opencensus.trace.span import SpanKind +from azure.core.tracing import SpanKind from opencensus.trace.status import Status from azure.eventhub import EventPosition, EventHubError @@ -256,7 +256,7 @@ async def close(reason): parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.process") - child.span_instance.span_kind = SpanKind.SERVER + child.kind = SpanKind.SERVER for event_data in events: if event_data.application_properties: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index fa6e142d2f0b..dbbb1f456b04 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -12,6 +12,7 @@ from uamqp import SendClientAsync # type: ignore from azure.core.tracing.common import get_parent_span +from azure.core.tracing import SpanKind from opencensus.trace.span import SpanKind from opencensus.trace.status import Status @@ -221,7 +222,7 @@ async def send( parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.send") - child.span_instance.span_kind = SpanKind.CLIENT + child.kind = SpanKind.CLIENT # Should be PRODUCER def trace_message(message): message_span = child.span(name="Azure.EventHubs.message") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 006c774ec6cc..bde97ae74228 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -13,7 +13,7 @@ from uamqp import ReceiveClient, Source # type: ignore from azure.core.tracing.common import get_parent_span -from opencensus.trace.span import SpanKind +from azure.core.tracing import SpanKind from opencensus.trace.status import Status from azure.eventhub.common import EventData, EventPosition @@ -235,7 +235,7 @@ def receive(self, max_batch_size=None, timeout=None): parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.receive") - child.span_instance.span_kind = SpanKind.CLIENT + child.kind = SpanKind.CLIENT # Should be PRODUCER child.start() self._client._add_span_request_attributes(child) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 24c5059c7712..e5b0cc7fc315 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -13,7 +13,7 @@ from uamqp import SendClient # type: ignore from azure.core.tracing.common import get_parent_span -from opencensus.trace.span import SpanKind +from azure.core.tracing import SpanKind from opencensus.trace.status import Status from azure.eventhub.common import EventData, EventDataBatch @@ -226,7 +226,7 @@ def send(self, event_data, partition_key=None, timeout=None): parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.send") - child.span_instance.span_kind = SpanKind.CLIENT + child.kind = SpanKind.CLIENT # Should be PRODUCER def trace_message(message): message_span = child.span(name="Azure.EventHubs.message") From c4dee261860f727286f25f9f17d49317be2ab2da Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Thu, 12 Sep 2019 16:07:04 -0700 Subject: [PATCH 05/25] Use contextmanager in EventHub --- .../azure/eventhub/aio/consumer_async.py | 25 +++++++------------ .../aio/eventprocessor/event_processor.py | 15 +++-------- .../azure/eventhub/aio/producer_async.py | 17 +++---------- .../azure/eventhub/consumer.py | 25 +++++++------------ .../azure/eventhub/producer.py | 15 +++-------- 5 files changed, 30 insertions(+), 67 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 54b85ac8f9b7..dc085dffa34d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -237,29 +237,22 @@ async def receive(self, *, max_batch_size=None, timeout=None): :caption: Receives events asynchronously """ + self._check_closed() + + timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access + max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access + # Tracing code parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.receive") child.kind = SpanKind.CLIENT # Should be PRODUCER - child.start() - self._client._add_span_request_attributes(child) - - self._check_closed() - - timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access - max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access - - try: + with child: + self._client._add_span_request_attributes(child) + return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + else: return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - except Exception as err: - if parent_span: - child.span_instance.status = Status.from_exception(err) - raise - finally: - if parent_span: - child.finish() async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 8bb8fc344fde..18c41df84cb8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -264,18 +264,11 @@ async def close(reason): if traceparent: child.link(traceparent) - child.start() - self._eventhub_client._add_span_request_attributes(child) - - try: + with child: + self._eventhub_client._add_span_request_attributes(child) await partition_processor.process_events(events, partition_context) - except Exception as err: - if parent_span: - child.span_instance.status = Status.from_exception(err) - raise - finally: - if parent_span: - child.finish() + else: + await partition_processor.process_events(events, partition_context) except asyncio.CancelledError: log.info( diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index dbbb1f456b04..dcccd2d1e014 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -13,8 +13,6 @@ from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind -from opencensus.trace.span import SpanKind -from opencensus.trace.status import Status from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError @@ -254,18 +252,11 @@ def trace_message(message): self._unsent_events = [wrapper_event_data.message] if parent_span: - # Start current span and set metadata - child.start() - self._client._add_span_request_attributes(child) - try: + with child: + self._client._add_span_request_attributes(child) + await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor + else: await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor - except Exception as err: - if parent_span: - child.span_instance.status = Status.from_exception(err) - raise - finally: - if parent_span: - child.finish() async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index bde97ae74228..5c42e2ca4ddf 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -231,29 +231,22 @@ def receive(self, max_batch_size=None, timeout=None): :caption: Receive events from the EventHub. """ + self._check_closed() + + timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access + max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access + # Tracing code parent_span = get_parent_span() if parent_span: child = parent_span.span(name="Azure.EventHubs.receive") child.kind = SpanKind.CLIENT # Should be PRODUCER - child.start() - self._client._add_span_request_attributes(child) - - self._check_closed() - - timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access - max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access - - try: + with child: + self._client._add_span_request_attributes(child) + return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + else: return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - except Exception as err: - if parent_span: - child.span_instance.status = Status.from_exception(err) - raise - finally: - if parent_span: - child.finish() def close(self, exception=None): # type:(Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index e5b0cc7fc315..3789b4999c53 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -257,18 +257,11 @@ def trace_message(message): self._unsent_events = [wrapper_event_data.message] if parent_span: - # Start current span and set metadata - child.start() - self._client._add_span_request_attributes(child) - try: + with child: + self._client._add_span_request_attributes(child) + self._send_event_data_with_retry(timeout=timeout) + else: self._send_event_data_with_retry(timeout=timeout) - except Exception as err: - if parent_span: - child.span_instance.status = Status.from_exception(err) - raise - finally: - if parent_span: - child.finish() def close(self, exception=None): # pylint:disable=useless-super-delegation # type:(Exception) -> None From e23c6f04f377aaa3674a251e3ea985e846253ae7 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Thu, 12 Sep 2019 16:36:43 -0700 Subject: [PATCH 06/25] Remove opencensus specific import --- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 1 - .../azure/eventhub/aio/eventprocessor/event_processor.py | 1 - sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 1 - sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py | 1 - 4 files changed, 4 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index dc085dffa34d..5fff95ac8d90 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -13,7 +13,6 @@ from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind -from opencensus.trace.status import Status from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 18c41df84cb8..985cfb523554 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -10,7 +10,6 @@ from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind -from opencensus.trace.status import Status from azure.eventhub import EventPosition, EventHubError from azure.eventhub.aio import EventHubClient diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 5c42e2ca4ddf..28698d6e7a00 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -14,7 +14,6 @@ from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind -from opencensus.trace.status import Status from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 3789b4999c53..ce001df20a3d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -14,7 +14,6 @@ from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind -from opencensus.trace.status import Status from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError From 8c4bede4be232fe1699f6dbb4561231cad79430e Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Thu, 12 Sep 2019 16:40:25 -0700 Subject: [PATCH 07/25] Fix possible AttributeError --- .../azure-eventhubs/azure/eventhub/aio/consumer_async.py | 2 +- sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 5fff95ac8d90..d445e41f0e16 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -186,7 +186,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): # Tracing current_span = get_parent_span() if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", None).decode('ascii') + traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') if traceparent: current_span.link(traceparent) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 28698d6e7a00..14b140095ad2 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -180,7 +180,7 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): # Tracing current_span = get_parent_span() if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", None).decode('ascii') + traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') if traceparent: current_span.link(traceparent) From ce59dc069b708fe0fa654a4f2564c56105250377 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Fri, 13 Sep 2019 16:01:46 -0700 Subject: [PATCH 08/25] Remove parent concept --- .../azure/eventhub/aio/consumer_async.py | 20 ++++++++++--------- .../aio/eventprocessor/event_processor.py | 8 ++++---- .../azure/eventhub/aio/producer_async.py | 10 +++++----- .../azure/eventhub/consumer.py | 11 +++++----- .../azure/eventhub/producer.py | 10 +++++----- 5 files changed, 31 insertions(+), 28 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index d445e41f0e16..e185355f93c6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -11,8 +11,8 @@ from uamqp import errors, types # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore -from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind +from azure.core.settings import settings from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler @@ -184,11 +184,13 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): data_batch.append(event_data) # Tracing - current_span = get_parent_span() - if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') - if traceparent: - current_span.link(traceparent) + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + current_span = span_impl_type(span_impl_type.get_current_span()) + if current_span and event_data.application_properties: + traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') + if traceparent: + current_span.link(traceparent) return data_batch @@ -242,9 +244,9 @@ async def receive(self, *, max_batch_size=None, timeout=None): max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access # Tracing code - parent_span = get_parent_span() - if parent_span: - child = parent_span.span(name="Azure.EventHubs.receive") + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + child = span_impl_type(name="Azure.EventHubs.receive") child.kind = SpanKind.CLIENT # Should be PRODUCER with child: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 985cfb523554..198d90b90261 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -8,8 +8,8 @@ import asyncio import logging -from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind +from azure.core.settings import settings from azure.eventhub import EventPosition, EventHubError from azure.eventhub.aio import EventHubClient @@ -252,9 +252,9 @@ async def close(reason): events = await partition_consumer.receive() # Tracing - parent_span = get_parent_span() - if parent_span: - child = parent_span.span(name="Azure.EventHubs.process") + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + child = span_impl_type(name="Azure.EventHubs.process") child.kind = SpanKind.SERVER for event_data in events: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index dcccd2d1e014..6d4fbf4c228b 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -11,8 +11,8 @@ from uamqp import types, constants, errors # type: ignore from uamqp import SendClientAsync # type: ignore -from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind +from azure.core.settings import settings from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError @@ -217,9 +217,9 @@ async def send( """ # Tracing code - parent_span = get_parent_span() - if parent_span: - child = parent_span.span(name="Azure.EventHubs.send") + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + child = span_impl_type(name="Azure.EventHubs.send") child.kind = SpanKind.CLIENT # Should be PRODUCER def trace_message(message): @@ -251,7 +251,7 @@ def trace_message(message): wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - if parent_span: + if span_impl_type is not None: with child: self._client._add_span_request_attributes(child) await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 14b140095ad2..361d0f9fd9d0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -12,8 +12,8 @@ from uamqp import types, errors # type: ignore from uamqp import ReceiveClient, Source # type: ignore -from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind +from azure.core.settings import settings from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler @@ -178,7 +178,8 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): data_batch.append(event_data) # Tracing - current_span = get_parent_span() + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + current_span = span_impl_type(span_impl_type.get_current_span()) if current_span and event_data.application_properties: traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') if traceparent: @@ -236,9 +237,9 @@ def receive(self, max_batch_size=None, timeout=None): max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access # Tracing code - parent_span = get_parent_span() - if parent_span: - child = parent_span.span(name="Azure.EventHubs.receive") + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type: + child = span_impl_type(name="Azure.EventHubs.receive") child.kind = SpanKind.CLIENT # Should be PRODUCER with child: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index ce001df20a3d..b7bdd429843c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -12,8 +12,8 @@ from uamqp import types, constants, errors # type: ignore from uamqp import SendClient # type: ignore -from azure.core.tracing.common import get_parent_span from azure.core.tracing import SpanKind +from azure.core.settings import settings from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError @@ -222,9 +222,9 @@ def send(self, event_data, partition_key=None, timeout=None): """ # Tracing code - parent_span = get_parent_span() - if parent_span: - child = parent_span.span(name="Azure.EventHubs.send") + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + child = span_impl_type(name="Azure.EventHubs.send") child.kind = SpanKind.CLIENT # Should be PRODUCER def trace_message(message): @@ -255,7 +255,7 @@ def trace_message(message): wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] - if parent_span: + if span_impl_type is not None: with child: self._client._add_span_request_attributes(child) self._send_event_data_with_retry(timeout=timeout) From 567b2bcae8ef5b8472770f76799edff72a411be9 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Mon, 16 Sep 2019 09:59:44 -0700 Subject: [PATCH 09/25] Remove receive tracing code --- .../azure/eventhub/aio/consumer_async.py | 12 +----------- .../azure-eventhubs/azure/eventhub/consumer.py | 12 +----------- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index e185355f93c6..18f061b6cac0 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -243,17 +243,7 @@ async def receive(self, *, max_batch_size=None, timeout=None): timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access - # Tracing code - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - if span_impl_type is not None: - child = span_impl_type(name="Azure.EventHubs.receive") - child.kind = SpanKind.CLIENT # Should be PRODUCER - - with child: - self._client._add_span_request_attributes(child) - return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - else: - return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + return await self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) async def close(self, exception=None): # type: (Exception) -> None diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 361d0f9fd9d0..27912e38722e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -236,17 +236,7 @@ def receive(self, max_batch_size=None, timeout=None): timeout = timeout or self._client._config.receive_timeout # pylint:disable=protected-access max_batch_size = max_batch_size or min(self._client._config.max_batch_size, self._prefetch) # pylint:disable=protected-access - # Tracing code - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - if span_impl_type: - child = span_impl_type(name="Azure.EventHubs.receive") - child.kind = SpanKind.CLIENT # Should be PRODUCER - - with child: - self._client._add_span_request_attributes(child) - return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) - else: - return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) + return self._receive_with_retry(timeout=timeout, max_batch_size=max_batch_size) def close(self, exception=None): # type:(Exception) -> None From a205ea5cd13e1311578a61de610d634e11e31b25 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Thu, 19 Sep 2019 12:00:18 -0700 Subject: [PATCH 10/25] Don't execute tracing on message if no tracing loaded --- .../azure/eventhub/aio/producer_async.py | 13 +++++++------ .../azure-eventhubs/azure/eventhub/producer.py | 13 +++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 6d4fbf4c228b..38555bf7c00a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -223,12 +223,13 @@ async def send( child.kind = SpanKind.CLIENT # Should be PRODUCER def trace_message(message): - message_span = child.span(name="Azure.EventHubs.message") - message_span.start() - app_prop = dict(message.application_properties) - app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) - message.application_properties = app_prop - message_span.finish() + if span_impl_type is not None: + message_span = child.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(message.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + message.application_properties = app_prop + message_span.finish() self._check_closed() if isinstance(event_data, EventData): diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index b7bdd429843c..3b4fb312230a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -228,12 +228,13 @@ def send(self, event_data, partition_key=None, timeout=None): child.kind = SpanKind.CLIENT # Should be PRODUCER def trace_message(message): - message_span = child.span(name="Azure.EventHubs.message") - message_span.start() - app_prop = dict(message.application_properties) - app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) - message.application_properties = app_prop - message_span.finish() + if span_impl_type is not None: + message_span = child.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(message.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + message.application_properties = app_prop + message_span.finish() self._check_closed() if isinstance(event_data, EventData): From 443cdc80d77e44c312102bcc5e9f94515edbc6c3 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Mon, 23 Sep 2019 16:29:01 -0700 Subject: [PATCH 11/25] Try to re-order dev dep for CI --- sdk/eventhub/azure-eventhubs/dev_requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/dev_requirements.txt b/sdk/eventhub/azure-eventhubs/dev_requirements.txt index 338710e52fb3..79be4ffee6f0 100644 --- a/sdk/eventhub/azure-eventhubs/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs/dev_requirements.txt @@ -1,7 +1,7 @@ --e ../../servicebus/azure-servicebus --e ../../core/azure-core -e ../../../tools/azure-sdk-tools +-e ../../core/azure-core -e ../../identity/azure-identity +-e ../../servicebus/azure-servicebus pytest-asyncio>=0.8.0; python_version > '3.4' docutils>=0.14 pygments>=2.2.0 From 83d712863e27084daf8e3f290f05d1c0cff7179c Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 09:14:56 -0700 Subject: [PATCH 12/25] Fix EH plugin dev deps --- .../dev_requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt index 6a6469b05bb1..9ae8eefecefb 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -1,2 +1,4 @@ +-e ../../../tools/azure-sdk-tools +-e ../../core/azure-core ../azure-eventhubs pytest-asyncio>=0.8.0; python_version >= '3.5' From 78b78ab706c34f51729d67cfb2039f00da14177f Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 13:00:54 -0700 Subject: [PATCH 13/25] Add azure-core to azure-eventhub --- sdk/eventhub/azure-eventhubs/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/eventhub/azure-eventhubs/setup.py b/sdk/eventhub/azure-eventhubs/setup.py index 1ffa5c93005f..aae5cc60b638 100644 --- a/sdk/eventhub/azure-eventhubs/setup.py +++ b/sdk/eventhub/azure-eventhubs/setup.py @@ -67,6 +67,7 @@ zip_safe=False, packages=find_packages(exclude=exclude_packages), install_requires=[ + "azure-core<2.0.0,>=1.0.0b4", 'uamqp~=1.2.0', 'azure-common~=1.1', ], From 22b78c26710ce3d2a46af3eaa18008b25563dc11 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 13:07:18 -0700 Subject: [PATCH 14/25] Share req --- shared_requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/shared_requirements.txt b/shared_requirements.txt index 47fc6b58dfd4..3b5a837350ea 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -106,6 +106,7 @@ six>=1.6 #override azure-storage-blob azure-core<2.0.0,>=1.0.0b4 #override azure-storage-queue azure-core<2.0.0,>=1.0.0b4 #override azure-storage-file azure-core<2.0.0,>=1.0.0b4 +#override azure-eventhub azure-core<2.0.0,>=1.0.0b4 #override azure-cosmos azure-core<2.0.0,>=1.0.0b3 #override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2 #override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 From 82a39efd47f3354a7ab22d50d17a77127acfeb01 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 13:17:56 -0700 Subject: [PATCH 15/25] ChangeLog --- sdk/eventhub/azure-eventhubs/HISTORY.md | 6 ++++++ sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs/HISTORY.md b/sdk/eventhub/azure-eventhubs/HISTORY.md index c5af555047bc..f0577a7640d9 100644 --- a/sdk/eventhub/azure-eventhubs/HISTORY.md +++ b/sdk/eventhub/azure-eventhubs/HISTORY.md @@ -1,5 +1,11 @@ # Release History +## 5.0.0b4 (2019-XX-XX) + +**New features** + +- Support for tracing #7153 + ## 5.0.0b3 (2019-09-10) **New features** diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py index 62b2a6b811d8..888e86a98986 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- __path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore -__version__ = "5.0.0b3" +__version__ = "5.0.0b4" from uamqp import constants # type: ignore from azure.eventhub.common import EventData, EventDataBatch, EventPosition from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \ From db31f6be9a00bad4c7015420b2850a8a32431fd9 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 14:47:22 -0700 Subject: [PATCH 16/25] EH extension is ok with b4 --- sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py index 41cba5b3027e..c83b2b6964c6 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/setup.py @@ -65,7 +65,7 @@ packages=find_packages(exclude=exclude_packages), python_requires=">=3.5.3", install_requires=[ - 'azure-storage-blob<12.0.0b4,>=12.0.0b2', + 'azure-storage-blob<=12.0.0b4,>=12.0.0b2', 'azure-eventhub<6.0.0,>=5.0.0b3', 'aiohttp<4.0,>=3.0', ], From 632a8d81d489550631f729f6c3b777f0ee61c114 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 15:38:17 -0700 Subject: [PATCH 17/25] Install blob SDK for EH extension --- .../azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt index 9ae8eefecefb..29c8c57f39f3 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -1,4 +1,5 @@ -e ../../../tools/azure-sdk-tools +-e ../../storage/azure-blob-storage -e ../../core/azure-core ../azure-eventhubs pytest-asyncio>=0.8.0; python_version >= '3.5' From c4c4fd187209025e1e7fbc3a1201fe9e4ff1385a Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 16:18:41 -0700 Subject: [PATCH 18/25] pylint --- .../azure/eventhub/aio/consumer_async.py | 1 - .../aio/eventprocessor/event_processor.py | 40 ++++++++++--------- .../azure/eventhub/aio/producer_async.py | 4 +- .../azure/eventhub/consumer.py | 1 - .../azure/eventhub/producer.py | 4 +- 5 files changed, 26 insertions(+), 24 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 18f061b6cac0..e7edb0d8eaa9 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -11,7 +11,6 @@ from uamqp import errors, types # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore -from azure.core.tracing import SpanKind from azure.core.settings import settings from azure.eventhub import EventData, EventPosition diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 198d90b90261..3452f9eeb009 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # ----------------------------------------------------------------------------------- +from contextlib import contextmanager from typing import Dict, Type import uuid import asyncio @@ -188,7 +189,26 @@ def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list): if partition_id not in self._tasks or self._tasks[partition_id].done(): self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership)) - async def _receive(self, ownership): + @contextmanager + def _context(self, events): + # Tracing + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is None: + yield + else: + child = span_impl_type(name="Azure.EventHubs.process") + self._eventhub_client._add_span_request_attributes(child) # pylint: disable=protected-access + child.kind = SpanKind.SERVER + + for event in events: + if event.application_properties: + traceparent = event.application_properties.get(b"Diagnostic-Id", b'').decode('ascii') + if traceparent: + child.link(traceparent) + with child: + yield + + async def _receive(self, ownership): # pylint: disable=too-many-statements log.info("start ownership, %r", ownership) partition_processor = self._partition_processor_factory() partition_id = ownership["partition_id"] @@ -250,23 +270,7 @@ async def close(reason): while True: try: events = await partition_consumer.receive() - - # Tracing - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - if span_impl_type is not None: - child = span_impl_type(name="Azure.EventHubs.process") - child.kind = SpanKind.SERVER - - for event_data in events: - if event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", None).decode('ascii') - if traceparent: - child.link(traceparent) - - with child: - self._eventhub_client._add_span_request_attributes(child) - await partition_processor.process_events(events, partition_context) - else: + with self._context(events): await partition_processor.process_events(events, partition_context) except asyncio.CancelledError: diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 38555bf7c00a..47f7e011f90d 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -246,7 +246,7 @@ def trace_message(message): if partition_key: event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access - for internal_message in wrapper_event_data.message._body_gen: + for internal_message in wrapper_event_data.message._body_gen: # pylint: disable=protected-access trace_message(internal_message) wrapper_event_data.message.on_send_complete = self._on_outcome @@ -254,7 +254,7 @@ def trace_message(message): if span_impl_type is not None: with child: - self._client._add_span_request_attributes(child) + self._client._add_span_request_attributes(child) # pylint: disable=protected-access await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor else: await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 27912e38722e..70cbf3c2e52e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -12,7 +12,6 @@ from uamqp import types, errors # type: ignore from uamqp import ReceiveClient, Source # type: ignore -from azure.core.tracing import SpanKind from azure.core.settings import settings from azure.eventhub.common import EventData, EventPosition diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 3b4fb312230a..7571d7ef1593 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -251,14 +251,14 @@ def trace_message(message): if partition_key: event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access - for internal_message in wrapper_event_data.message._body_gen: + for internal_message in wrapper_event_data.message._body_gen: # pylint: disable=protected-access trace_message(internal_message) wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] if span_impl_type is not None: with child: - self._client._add_span_request_attributes(child) + self._client._add_span_request_attributes(child) # pylint: disable=protected-access self._send_event_data_with_retry(timeout=timeout) else: self._send_event_data_with_retry(timeout=timeout) From 9fb2039e5e3875c322e9502f9b55e617f970ddec Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 16:31:23 -0700 Subject: [PATCH 19/25] fix dev req --- .../dev_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt index 29c8c57f39f3..222efb40b513 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -1,5 +1,5 @@ -e ../../../tools/azure-sdk-tools --e ../../storage/azure-blob-storage +-e ../../storage/azure-storage-blob -e ../../core/azure-core ../azure-eventhubs pytest-asyncio>=0.8.0; python_version >= '3.5' From 2f4df0a59ab01dba0f8678ac9b4d445926393447 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Tue, 24 Sep 2019 16:47:44 -0700 Subject: [PATCH 20/25] dep fix --- .../dev_requirements.txt | 2 +- shared_requirements.txt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt index 222efb40b513..49d226f4bf52 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/dev_requirements.txt @@ -1,5 +1,5 @@ -e ../../../tools/azure-sdk-tools --e ../../storage/azure-storage-blob -e ../../core/azure-core +-e ../../storage/azure-storage-blob ../azure-eventhubs pytest-asyncio>=0.8.0; python_version >= '3.5' diff --git a/shared_requirements.txt b/shared_requirements.txt index 3b5a837350ea..2e0ca59bb50e 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -110,3 +110,4 @@ six>=1.6 #override azure-cosmos azure-core<2.0.0,>=1.0.0b3 #override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2 #override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 +#override azure azure-storage-blob~=1.3 From af4acd74fe326a6dfd794899d808aa2359b1fae0 Mon Sep 17 00:00:00 2001 From: scbedd <45376673+scbedd@users.noreply.github.com> Date: Tue, 24 Sep 2019 19:01:17 -0700 Subject: [PATCH 21/25] the override had <. the setup actually defines <=. need to update the error message, but this will fix the analyze error --- shared_requirements.txt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/shared_requirements.txt b/shared_requirements.txt index 2e0ca59bb50e..601385e1a452 100644 --- a/shared_requirements.txt +++ b/shared_requirements.txt @@ -108,6 +108,5 @@ six>=1.6 #override azure-storage-file azure-core<2.0.0,>=1.0.0b4 #override azure-eventhub azure-core<2.0.0,>=1.0.0b4 #override azure-cosmos azure-core<2.0.0,>=1.0.0b3 -#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2 -#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 -#override azure azure-storage-blob~=1.3 +#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<=12.0.0b4,>=12.0.0b2 +#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0 \ No newline at end of file From ce2948a00955a236a8dbc8d4ac9f7502ad669ef7 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Wed, 25 Sep 2019 15:49:24 -0700 Subject: [PATCH 22/25] Tracing message from receive iterators as well --- .../azure/eventhub/aio/consumer_async.py | 13 ++----------- .../azure/eventhub/client_abstract.py | 11 +++++++++++ .../azure-eventhubs/azure/eventhub/consumer.py | 12 ++---------- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index e7edb0d8eaa9..3b816588a5d6 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -11,8 +11,6 @@ from uamqp import errors, types # type: ignore from uamqp import ReceiveClientAsync, Source # type: ignore -from azure.core.settings import settings - from azure.eventhub import EventData, EventPosition from azure.eventhub.error import EventHubError, ConnectError, _error_handler from ._consumer_producer_mixin_async import ConsumerProducerMixin @@ -104,6 +102,7 @@ async def __anext__(self): self._messages_iter = self._handler.receive_messages_iter_async() message = await self._messages_iter.__anext__() event_data = EventData._from_message(message) # pylint:disable=protected-access + self._client._trace_link_message(event_data) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data @@ -181,15 +180,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) - - # Tracing - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - if span_impl_type is not None: - current_span = span_impl_type(span_impl_type.get_current_span()) - if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') - if traceparent: - current_span.link(traceparent) + self._client._trace_link_message(event_data) # pylint:disable=protected-access return data_batch diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 62ea791a5894..65862b73a67a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -12,6 +12,8 @@ from abc import abstractmethod from typing import Dict, Union, Any, TYPE_CHECKING +from azure.core.settings import settings + from azure.eventhub import __version__, EventPosition from azure.eventhub.configuration import _Configuration from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address @@ -219,6 +221,15 @@ def _add_span_request_attributes(self, span): span.add_attribute("message_bus.destination", self._address.path) span.add_attribute("peer.address", self._address.hostname) + @staticmethod + def _trace_link_message(event_data): + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + current_span = span_impl_type(span_impl_type.get_current_span()) + if current_span and event_data.application_properties: + traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') + if traceparent: + current_span.link(traceparent) + def _process_redirect_uri(self, redirect): redirect_uri = redirect.address.decode('utf-8') auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 70cbf3c2e52e..187899a29ff3 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -12,8 +12,6 @@ from uamqp import types, errors # type: ignore from uamqp import ReceiveClient, Source # type: ignore -from azure.core.settings import settings - from azure.eventhub.common import EventData, EventPosition from azure.eventhub.error import _error_handler from ._consumer_producer_mixin import ConsumerProducerMixin @@ -100,6 +98,7 @@ def __next__(self): self._messages_iter = self._handler.receive_messages_iter() message = next(self._messages_iter) event_data = EventData._from_message(message) # pylint:disable=protected-access + self._client._trace_link_message(event_data) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data @@ -175,14 +174,7 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) - - # Tracing - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - current_span = span_impl_type(span_impl_type.get_current_span()) - if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') - if traceparent: - current_span.link(traceparent) + self._client._trace_link_message(event_data) # pylint:disable=protected-access return data_batch From bd2da67d571c7b4cbbe1c978b7ae42c037f02552 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Wed, 25 Sep 2019 15:57:27 -0700 Subject: [PATCH 23/25] Producer simplification --- .../azure/eventhub/aio/producer_async.py | 16 ++++---------- .../azure/eventhub/client_abstract.py | 22 ++++++++++++++----- .../azure/eventhub/producer.py | 14 +++--------- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 47f7e011f90d..52434afe1331 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -5,7 +5,7 @@ import uuid import asyncio import logging -from typing import Iterable, Union, Any +from typing import Iterable, Union import time from uamqp import types, constants, errors # type: ignore @@ -218,25 +218,17 @@ async def send( """ # Tracing code span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + child = None if span_impl_type is not None: child = span_impl_type(name="Azure.EventHubs.send") child.kind = SpanKind.CLIENT # Should be PRODUCER - def trace_message(message): - if span_impl_type is not None: - message_span = child.span(name="Azure.EventHubs.message") - message_span.start() - app_prop = dict(message.application_properties) - app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) - message.application_properties = app_prop - message_span.finish() - self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data - trace_message(wrapper_event_data) + self._client._trace_message(child, wrapper_event_data) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -247,7 +239,7 @@ def trace_message(message): event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access for internal_message in wrapper_event_data.message._body_gen: # pylint: disable=protected-access - trace_message(internal_message) + self._client._trace_message(child, internal_message) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 65862b73a67a..8d9734e29e87 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -224,11 +224,23 @@ def _add_span_request_attributes(self, span): @staticmethod def _trace_link_message(event_data): span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - current_span = span_impl_type(span_impl_type.get_current_span()) - if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') - if traceparent: - current_span.link(traceparent) + if span_impl_type is not None: + current_span = span_impl_type(span_impl_type.get_current_span()) + if current_span and event_data.application_properties: + traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') + if traceparent: + current_span.link(traceparent) + + @staticmethod + def _trace_message(parent_span, message): + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None and parent_span is not None: + message_span = parent_span.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(message.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + message.application_properties = app_prop + message_span.finish() def _process_redirect_uri(self, redirect): redirect_uri = redirect.address.decode('utf-8') diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 7571d7ef1593..9e265196b66e 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -223,25 +223,17 @@ def send(self, event_data, partition_key=None, timeout=None): """ # Tracing code span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + child = None if span_impl_type is not None: child = span_impl_type(name="Azure.EventHubs.send") child.kind = SpanKind.CLIENT # Should be PRODUCER - def trace_message(message): - if span_impl_type is not None: - message_span = child.span(name="Azure.EventHubs.message") - message_span.start() - app_prop = dict(message.application_properties) - app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) - message.application_properties = app_prop - message_span.finish() - self._check_closed() if isinstance(event_data, EventData): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data - trace_message(wrapper_event_data) + self._client._trace_message(child, wrapper_event_data) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -252,7 +244,7 @@ def trace_message(message): event_data = _set_partition_key(event_data, partition_key) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access for internal_message in wrapper_event_data.message._body_gen: # pylint: disable=protected-access - trace_message(internal_message) + self._client._trace_message(child, internal_message) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] From b025231ca8e84fe5bb069ac1903cf63d983a142b Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Wed, 25 Sep 2019 16:03:08 -0700 Subject: [PATCH 24/25] Simplify eventprocessor --- .../azure/eventhub/aio/eventprocessor/event_processor.py | 5 +---- .../azure-eventhubs/azure/eventhub/client_abstract.py | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index 3452f9eeb009..ce979a0c3d14 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -201,10 +201,7 @@ def _context(self, events): child.kind = SpanKind.SERVER for event in events: - if event.application_properties: - traceparent = event.application_properties.get(b"Diagnostic-Id", b'').decode('ascii') - if traceparent: - child.link(traceparent) + self._eventhub_client._trace_link_message(event, child) # pylint: disable=protected-access with child: yield diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 8d9734e29e87..87ebbd2c065c 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -222,10 +222,10 @@ def _add_span_request_attributes(self, span): span.add_attribute("peer.address", self._address.hostname) @staticmethod - def _trace_link_message(event_data): + def _trace_link_message(event_data, parent_span=None): span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] if span_impl_type is not None: - current_span = span_impl_type(span_impl_type.get_current_span()) + current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) if current_span and event_data.application_properties: traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') if traceparent: From 32ce0e8ec3aefa372bf92b65591b617665a44308 Mon Sep 17 00:00:00 2001 From: Laurent Mazuel Date: Fri, 27 Sep 2019 09:43:23 -0700 Subject: [PATCH 25/25] Consider batch size --- .../azure/eventhub/aio/consumer_async.py | 4 +-- .../aio/eventprocessor/event_processor.py | 2 +- .../azure/eventhub/aio/producer_async.py | 7 ++-- .../azure/eventhub/client_abstract.py | 23 ------------- .../azure-eventhubs/azure/eventhub/common.py | 34 +++++++++++++++++++ .../azure/eventhub/consumer.py | 4 +-- .../azure/eventhub/producer.py | 12 +++++-- 7 files changed, 51 insertions(+), 35 deletions(-) diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py index 3b816588a5d6..015b11190212 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/consumer_async.py @@ -102,7 +102,7 @@ async def __anext__(self): self._messages_iter = self._handler.receive_messages_iter_async() message = await self._messages_iter.__anext__() event_data = EventData._from_message(message) # pylint:disable=protected-access - self._client._trace_link_message(event_data) # pylint:disable=protected-access + event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data @@ -180,7 +180,7 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) - self._client._trace_link_message(event_data) # pylint:disable=protected-access + event_data._trace_link_message() # pylint:disable=protected-access return data_batch diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py index ce979a0c3d14..3183dc051ac8 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/eventprocessor/event_processor.py @@ -201,7 +201,7 @@ def _context(self, events): child.kind = SpanKind.SERVER for event in events: - self._eventhub_client._trace_link_message(event, child) # pylint: disable=protected-access + event._trace_link_message(child) # pylint: disable=protected-access with child: yield diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py index 52434afe1331..8ef299b0e6da 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py @@ -16,7 +16,7 @@ from azure.eventhub.common import EventData, EventDataBatch from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError -from ..producer import _error, _set_partition_key +from ..producer import _error, _set_partition_key, _set_trace_message from ._consumer_producer_mixin_async import ConsumerProducerMixin log = logging.getLogger(__name__) @@ -228,7 +228,7 @@ async def send( if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data - self._client._trace_message(child, wrapper_event_data) # pylint: disable=protected-access + wrapper_event_data._trace_message(child) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -237,9 +237,8 @@ async def send( else: if partition_key: event_data = _set_partition_key(event_data, partition_key) + event_data = _set_trace_message(event_data, child) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access - for internal_message in wrapper_event_data.message._body_gen: # pylint: disable=protected-access - self._client._trace_message(child, internal_message) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message] diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py index 87ebbd2c065c..62ea791a5894 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/client_abstract.py @@ -12,8 +12,6 @@ from abc import abstractmethod from typing import Dict, Union, Any, TYPE_CHECKING -from azure.core.settings import settings - from azure.eventhub import __version__, EventPosition from azure.eventhub.configuration import _Configuration from .common import EventHubSharedKeyCredential, EventHubSASTokenCredential, _Address @@ -221,27 +219,6 @@ def _add_span_request_attributes(self, span): span.add_attribute("message_bus.destination", self._address.path) span.add_attribute("peer.address", self._address.hostname) - @staticmethod - def _trace_link_message(event_data, parent_span=None): - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - if span_impl_type is not None: - current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) - if current_span and event_data.application_properties: - traceparent = event_data.application_properties.get(b"Diagnostic-Id", "").decode('ascii') - if traceparent: - current_span.link(traceparent) - - @staticmethod - def _trace_message(parent_span, message): - span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] - if span_impl_type is not None and parent_span is not None: - message_span = parent_span.span(name="Azure.EventHubs.message") - message_span.start() - app_prop = dict(message.application_properties) - app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) - message.application_properties = app_prop - message_span.finish() - def _process_redirect_uri(self, redirect): redirect_uri = redirect.address.decode('utf-8') auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py index 5923d7f57972..3f2545829748 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/common.py @@ -12,6 +12,9 @@ from uamqp import BatchMessage, Message, types, constants # type: ignore from uamqp.message import MessageHeader, MessageProperties # type: ignore + +from azure.core.settings import settings + from azure.eventhub.error import EventDataError log = logging.getLogger(__name__) @@ -114,6 +117,35 @@ def _set_partition_key(self, value): self.message.header = header self._annotations = annotations + def _trace_message(self, parent_span=None): + """Add tracing information to this message. + + Will open and close a "Azure.EventHubs.message" span, and + add the "DiagnosticId" as app properties of the message. + """ + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) + message_span = current_span.span(name="Azure.EventHubs.message") + message_span.start() + app_prop = dict(self.application_properties) + app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii')) + self.application_properties = app_prop + message_span.finish() + + def _trace_link_message(self, parent_span=None): + """Link the current message to current span. + + Will extract DiagnosticId if available. + """ + span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan] + if span_impl_type is not None: + current_span = parent_span or span_impl_type(span_impl_type.get_current_span()) + if current_span and self.application_properties: + traceparent = self.application_properties.get(b"Diagnostic-Id", "").decode('ascii') + if traceparent: + current_span.link(traceparent) + @staticmethod def _from_message(message): event_data = EventData(body='') @@ -328,6 +360,8 @@ def try_add(self, event_data): if not event_data.partition_key: event_data._set_partition_key(self._partition_key) # pylint:disable=protected-access + event_data._trace_message() # pylint:disable=protected-access + event_data_size = event_data.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py index 187899a29ff3..ff996a57747a 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py @@ -98,7 +98,7 @@ def __next__(self): self._messages_iter = self._handler.receive_messages_iter() message = next(self._messages_iter) event_data = EventData._from_message(message) # pylint:disable=protected-access - self._client._trace_link_message(event_data) # pylint:disable=protected-access + event_data._trace_link_message() # pylint:disable=protected-access self._offset = EventPosition(event_data.offset, inclusive=False) retried_times = 0 return event_data @@ -174,7 +174,7 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs): event_data = EventData._from_message(message) # pylint:disable=protected-access self._offset = EventPosition(event_data.offset) data_batch.append(event_data) - self._client._trace_link_message(event_data) # pylint:disable=protected-access + event_data._trace_link_message() # pylint:disable=protected-access return data_batch diff --git a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py index 9e265196b66e..6e562bbdf051 100644 --- a/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py +++ b/sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py @@ -35,6 +35,13 @@ def _set_partition_key(event_datas, partition_key): yield ed +def _set_trace_message(event_datas, parent_span=None): + ed_iter = iter(event_datas) + for ed in ed_iter: + ed._trace_message(parent_span) # pylint:disable=protected-access + yield ed + + class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instance-attributes """ A producer responsible for transmitting EventData to a specific Event Hub, @@ -233,7 +240,7 @@ def send(self, event_data, partition_key=None, timeout=None): if partition_key: event_data._set_partition_key(partition_key) # pylint: disable=protected-access wrapper_event_data = event_data - self._client._trace_message(child, wrapper_event_data) # pylint: disable=protected-access + wrapper_event_data._trace_message(child) # pylint: disable=protected-access else: if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access @@ -242,9 +249,8 @@ def send(self, event_data, partition_key=None, timeout=None): else: if partition_key: event_data = _set_partition_key(event_data, partition_key) + event_data = _set_trace_message(event_data, child) wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access - for internal_message in wrapper_event_data.message._body_gen: # pylint: disable=protected-access - self._client._trace_message(child, internal_message) # pylint: disable=protected-access wrapper_event_data.message.on_send_complete = self._on_outcome self._unsent_events = [wrapper_event_data.message]