Skip to content
This repository was archived by the owner on Sep 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ Stats Exporter
.. _threading: https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-threading
.. _Zipkin: https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-zipkin

Log Exporter
--------------

- `Azure`_

------------
Versioning
------------
Expand Down
4 changes: 3 additions & 1 deletion contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Changelog

## Unreleased
- Add persistent storage support
- Added log exporter
([#657](https://github.com/census-instrumentation/opencensus-python/pull/657))
- Added persistent storage support
([#640](https://github.com/census-instrumentation/opencensus-python/pull/640))
- Changed AzureExporter constructor signature to use kwargs
([#632](https://github.com/census-instrumentation/opencensus-python/pull/632))
Expand Down
65 changes: 59 additions & 6 deletions contrib/opencensus-ext-azure/README.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
OpenCensus Azure Monitor Trace Exporter
OpenCensus Azure Monitor Exporters
============================================================================

|pypi|
Expand All @@ -21,15 +21,11 @@ Trace

The **Azure Monitor Trace Exporter** allows you to export `OpenCensus`_ traces to `Azure Monitor`_.

.. _Azure Monitor: https://docs.microsoft.com/azure/azure-monitor/
.. _OpenCensus: https://github.com/census-instrumentation/opencensus-python/

This example shows how to send a span "hello" to Azure Monitor.

* Create an Azure Monitor resource and get the instrumentation key, more information can be found `here <https://docs.microsoft.com/azure/azure-monitor/app/create-new-resource>`_.
* Put the instrumentation key in ``APPINSIGHTS_INSTRUMENTATIONKEY`` environment variable.


.. code:: python

from opencensus.ext.azure.trace_exporter import AzureExporter
Expand All @@ -43,6 +39,10 @@ This example shows how to send a span "hello" to Azure Monitor.

You can also specify the instrumentation key explicitly in the code.

* Create an Azure Monitor resource and get the instrumentation key, more information can be found `here <https://docs.microsoft.com/azure/azure-monitor/app/create-new-resource>`_.
* Install the `requests integration package <../opencensus-ext-requests>`_ using ``pip install opencensus-ext-requests``.
* Put the instrumentation key in the following code.

.. code:: python

import requests
Expand All @@ -57,16 +57,69 @@ You can also specify the instrumentation key explicitly in the code.
exporter=AzureExporter(
# TODO: replace this with your own instrumentation key.
instrumentation_key='00000000-0000-0000-0000-000000000000',
timeout=29.9,
),
sampler=ProbabilitySampler(1.0),
)
with tracer.span(name='parent'):
response = requests.get(url='https://www.wikipedia.org/wiki/Rabbit')

Log
~~~

The **Azure Monitor Log Handler** allows you to export Python logs to `Azure Monitor`_.

This example shows how to send a warning level log to Azure Monitor.

* Create an Azure Monitor resource and get the instrumentation key, more information can be found `here <https://docs.microsoft.com/azure/azure-monitor/app/create-new-resource>`_.
* Put the instrumentation key in ``APPINSIGHTS_INSTRUMENTATIONKEY`` environment variable.

.. code:: python

import logging

from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
logger.addHandler(AzureLogHandler())
logger.warning('Hello, World!')

You can enrich the logs with trace IDs and span IDs by using the `logging integration <../opencensus-ext-logging>`_.

* Create an Azure Monitor resource and get the instrumentation key, more information can be found `here <https://docs.microsoft.com/azure/azure-monitor/app/create-new-resource>`_.
* Install the `logging integration package <../opencensus-ext-logging>`_ using ``pip install opencensus-ext-logging``.
* Put the instrumentation key in ``APPINSIGHTS_INSTRUMENTATIONKEY`` environment variable.

.. code:: python

import logging

from opencensus.ext.azure.log_exporter import AzureLogHandler
from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.trace import config_integration
from opencensus.trace.samplers import ProbabilitySampler
from opencensus.trace.tracer import Tracer

config_integration.trace_integrations(['logging'])

logger = logging.getLogger(__name__)

handler = AzureLogHandler()
handler.setFormatter(logging.Formatter('%(traceId)s %(spanId)s %(message)s'))
logger.addHandler(handler)

tracer = Tracer(exporter=AzureExporter(), sampler=ProbabilitySampler(1.0))

logger.warning('Before the span')
with tracer.span(name='test'):
logger.warning('In the span')
logger.warning('After the span')

References
----------

* `Azure Monitor <https://docs.microsoft.com/azure/azure-monitor/>`_
* `Examples <https://github.com/census-instrumentation/opencensus-python/tree/master/contrib/opencensus-ext-azure/examples>`_
* `OpenCensus Project <https://opencensus.io/>`_

.. _Azure Monitor: https://docs.microsoft.com/azure/azure-monitor/
.. _OpenCensus: https://github.com/census-instrumentation/opencensus-python/
38 changes: 38 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/correlated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2019, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from opencensus.ext.azure.log_exporter import AzureLogHandler
from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.trace import config_integration
from opencensus.trace.samplers import ProbabilitySampler
from opencensus.trace.tracer import Tracer

config_integration.trace_integrations(['logging'])

logger = logging.getLogger(__name__)

# TODO: you need to specify the instrumentation key in the
# APPINSIGHTS_INSTRUMENTATIONKEY environment variable.
handler = AzureLogHandler()
handler.setFormatter(logging.Formatter('%(traceId)s %(spanId)s %(message)s'))
logger.addHandler(handler)

tracer = Tracer(exporter=AzureExporter(), sampler=ProbabilitySampler(1.0))

logger.warning('Before the span')
with tracer.span(name='test'):
logger.warning('In the span')
logger.warning('After the span')
23 changes: 23 additions & 0 deletions contrib/opencensus-ext-azure/examples/logs/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2019, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
# TODO: you need to specify the instrumentation key in the
# APPINSIGHTS_INSTRUMENTATIONKEY environment variable.
logger.addHandler(AzureLogHandler())
logger.warning('Hello, World!')
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
'SAMPLER': 'opencensus.trace.samplers.ProbabilitySampler(rate=1.0)',
'EXPORTER': '''opencensus.ext.azure.trace_exporter.AzureExporter(
instrumentation_key='00000000-0000-0000-0000-000000000000',
timeout=29.9,
)''',
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(self, **options):
# TODO: queue should be moved to tracer
# too much refactor work, leave to the next PR
self._queue = Queue(capacity=8192) # TODO: make this configurable
self.EXIT_EVENT = self._queue.EXIT_EVENT
# TODO: worker should not be created in the base exporter
self._worker = Worker(self._queue, self)
self._worker.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright 2019, OpenCensus Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading
import time

from opencensus.common.schedule import Queue
from opencensus.common.schedule import QueueEvent
from opencensus.ext.azure.common import Options

logger = logging.getLogger(__name__)

__all__ = ['AzureLogHandler']


class BaseLogHandler(logging.Handler):
def __init__(self):
super(BaseLogHandler, self).__init__()
self._queue = Queue(capacity=8192) # TODO: make this configurable
self._worker = Worker(self._queue, self)
self._worker.start()

def close(self):
self._worker.stop()

def createLock(self):
self.lock = None

def emit(self, record):
self._queue.put(record, block=False)

def _export(self, batch, event=None):
try:
return self.export(batch)
finally:
if event:
event.set()

def export(self, batch):
raise NotImplementedError

def flush(self, timeout=None):
self._queue.flush(timeout=timeout)


class Worker(threading.Thread):
daemon = True

def __init__(self, src, dst):
self._src = src
self._dst = dst
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the circular reference here, we've been bitten by this before trying to clean up exporters. See

def new_stats_exporter(options=None, interval=None):
"""Get a stats exporter and running transport thread.
Create a new `StackdriverStatsExporter` with the given options and start
periodically exporting stats to stackdriver in the background.
Fall back to default auth if `options` is null. This will raise
`google.auth.exceptions.DefaultCredentialsError` if default credentials
aren't configured.
See `opencensus.metrics.transport.get_exporter_thread` for details on the
transport thread.
:type options: :class:`Options`
:param exporter: Options to pass to the exporter
:type interval: int or float
:param interval: Seconds between export calls.
:rtype: :class:`StackdriverStatsExporter`
:return: The newly-created exporter.
"""
if options is None:
_, project_id = google.auth.default()
options = Options(project_id=project_id)
if str(options.project_id).strip() == "":
raise ValueError(ERROR_BLANK_PROJECT_ID)
ci = client_info.ClientInfo(client_library_version=get_user_agent_slug())
client = monitoring_v3.MetricServiceClient(client_info=ci)
exporter = StackdriverStatsExporter(client=client, options=options)
transport.get_exporter_thread(stats.stats, exporter, interval=interval)
return exporter
.

Copy link
Copy Markdown
Contributor Author

@reyang reyang May 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original understanding is that we have Handler.close which can be used explicitly or implicitly (by Python logging library), and the circular reference will be collected by the GC. This shouldn't cause problem? Is this to prevent memory leak, or to reduce GC overhead?

Regarding memory leak, I run the following app for an hour and see a flat memory usage:

import logging

from opencensus.ext.azure.log_exporter import AzureLogHandler

logger = logging.getLogger(__name__)
while True:
    handler = AzureLogHandler()
    logger.addHandler(handler)
    logger.warning('Hello, World!')
    logger.removeHandler(handler)
    handler.close()

For GC overhead, given the circle is pretty small, I guess there is no noticeable difference?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the circular reference will be collected by the GC. This shouldn't cause problem?

I thought there were more general problems cleaning up circular references in python, but it may only be a problem for classes that define __del__ (see https://stackoverflow.com/a/2428888), in which case this is fine.

self._stopping = False
super(Worker, self).__init__(
name='{} Worker'.format(type(dst).__name__)
)

def run(self):
src = self._src
dst = self._dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
if batch and isinstance(batch[-1], QueueEvent):
try:
dst._export(batch[:-1], event=batch[-1])
except Exception:
logger.exception('Unhandled exception from exporter.')
if batch[-1] is src.EXIT_EVENT:
break
else:
continue
try:
dst._export(batch)
except Exception:
logger.exception('Unhandled exception from exporter.')

def stop(self, timeout=None):
start_time = time.time()
wait_time = timeout
if self.is_alive() and not self._stopping:
self._stopping = True
self._src.put(self._src.EXIT_EVENT, block=True, timeout=wait_time)
elapsed_time = time.time() - start_time
wait_time = timeout and max(timeout - elapsed_time, 0)
if self._src.EXIT_EVENT.wait(timeout=wait_time):
return time.time() - start_time # time taken to stop


class AzureLogHandler(BaseLogHandler):
"""Handler for logging to Microsoft Azure Monitor.

:param options: Options for the log handler.
"""

def __init__(self, **options):
self.options = Options(**options)
if not self.options.instrumentation_key:
raise ValueError('The instrumentation_key is not provided.')
self.export_interval = self.options.export_interval
self.max_batch_size = self.options.max_batch_size
super(AzureLogHandler, self).__init__()

def export(self, batch):
if batch:
for item in batch:
item.traceId = getattr(item, 'traceId', 'N/A')
item.spanId = getattr(item, 'spanId', 'N/A')
print(self.format(item))
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import requests

from opencensus.common.schedule import QueueExitEvent
from opencensus.ext.azure.common import Options
from opencensus.ext.azure.common import utils
from opencensus.ext.azure.common.exporter import BaseExporter
Expand All @@ -35,8 +36,7 @@
class AzureExporter(BaseExporter):
"""An exporter that sends traces to Microsoft Azure Monitor.

:type options: dict
:param options: Options for the exporter. Defaults to None.
:param options: Options for the exporter.
"""

def __init__(self, **options):
Expand Down Expand Up @@ -230,14 +230,14 @@ def emit(self, batch, event=None):
if result > 0:
self.storage.put(envelopes, result)
if event:
if event is self.EXIT_EVENT:
if isinstance(event, QueueExitEvent):
self._transmit_from_storage() # send files before exit
event.set()
return
if len(batch) < self.options.max_batch_size:
self._transmit_from_storage()
except Exception as ex:
logger.exception('Transmission exception: %s.', ex)
except Exception:
logger.exception('Exception occurred while exporting the data.')

def _stop(self, timeout=None):
self.storage.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ def test_emit_empty(self, request_mock):
)
exporter.emit([])
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter.emit([], exporter.EXIT_EVENT)
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter._stop()

@mock.patch('opencensus.ext.azure.trace_exporter.logger')
Expand Down
Loading