diff --git a/docs/examples/basic_meter/basic_metrics.py b/docs/examples/basic_meter/basic_metrics.py index b9ff8d87417..e65aa788b83 100644 --- a/docs/examples/basic_meter/basic_metrics.py +++ b/docs/examples/basic_meter/basic_metrics.py @@ -26,9 +26,6 @@ from opentelemetry import metrics from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.controller import PushController - -stateful = True print( "Starting example, values will be printed to the console every 5 seconds." @@ -37,7 +34,11 @@ # Stateful determines whether how metrics are collected: if true, metrics # accumulate over the process lifetime. If false, metrics are reset at the # beginning of each collection interval. -metrics.set_meter_provider(MeterProvider(stateful)) +stateful = True + +# Sets the global MeterProvider instance +metrics.set_meter_provider(MeterProvider()) + # The Meter is responsible for creating and recording metrics. Each meter has a # unique name, which we set as the module's name here. meter = metrics.get_meter(__name__) @@ -45,9 +46,9 @@ # Exporter to export metrics to the console exporter = ConsoleMetricsExporter() -# A PushController collects metrics created from meter and exports it via the -# exporter every interval -controller = PushController(meter=meter, exporter=exporter, interval=5) +# start_pipeline will notify the MeterProvider to begin collecting/exporting +# metrics with the given meter, exporter and interval in seconds +metrics.get_meter_provider().start_pipeline(meter, exporter, 5) # Metric instruments allow to capture measurements requests_counter = meter.create_metric( @@ -77,7 +78,7 @@ # Update the metric instruments using the direct calling convention requests_counter.add(25, staging_labels) requests_size.record(100, staging_labels) -time.sleep(5) +time.sleep(10) requests_counter.add(50, staging_labels) requests_size.record(5000, staging_labels) diff --git a/docs/examples/basic_meter/calling_conventions.py b/docs/examples/basic_meter/calling_conventions.py index f8cc3dddbb1..3615f60d7d4 100644 --- a/docs/examples/basic_meter/calling_conventions.py +++ b/docs/examples/basic_meter/calling_conventions.py @@ -21,13 +21,11 @@ from opentelemetry import metrics from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.controller import PushController # Use the meter type provided by the SDK package metrics.set_meter_provider(MeterProvider()) meter = metrics.get_meter(__name__) -exporter = ConsoleMetricsExporter() -controller = PushController(meter=meter, exporter=exporter, interval=5) +metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5) requests_counter = meter.create_metric( name="requests", @@ -62,7 +60,7 @@ # You can record metrics directly using the metric instrument. You pass in # labels that you would like to record for. requests_counter.add(25, labels) -time.sleep(5) +time.sleep(10) print("Updating using a bound instrument...") # You can record metrics with bound metric instruments. Bound metric diff --git a/docs/examples/basic_meter/observer.py b/docs/examples/basic_meter/observer.py index d3aa02168d4..076c416c0ab 100644 --- a/docs/examples/basic_meter/observer.py +++ b/docs/examples/basic_meter/observer.py @@ -21,13 +21,10 @@ from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider, ValueObserver from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter -from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher -from opentelemetry.sdk.metrics.export.controller import PushController metrics.set_meter_provider(MeterProvider()) meter = metrics.get_meter(__name__) -exporter = ConsoleMetricsExporter() -controller = PushController(meter=meter, exporter=exporter, interval=2) +metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5) # Callback to gather cpu usage diff --git a/docs/examples/cloud_monitoring/basic_metrics.py b/docs/examples/cloud_monitoring/basic_metrics.py index e0ceb420b62..fa00fc068b6 100644 --- a/docs/examples/cloud_monitoring/basic_metrics.py +++ b/docs/examples/cloud_monitoring/basic_metrics.py @@ -20,13 +20,11 @@ CloudMonitoringMetricsExporter, ) from opentelemetry.sdk.metrics import Counter, MeterProvider -from opentelemetry.sdk.metrics.export.controller import PushController -meter = metrics.get_meter(__name__, True) - -# Gather and export metrics every 5 seconds -controller = PushController( - meter=meter, exporter=CloudMonitoringMetricsExporter(), interval=5 +metrics.set_meter_provider(MeterProvider()) +meter = metrics.get_meter(__name__) +metrics.get_meter_provider().start_pipeline( + meter, CloudMonitoringMetricsExporter(), 5 ) requests_counter = meter.create_metric( diff --git a/docs/examples/opencensus-exporter-metrics/collector.py b/docs/examples/opencensus-exporter-metrics/collector.py index 89dabd12eab..725f07b77ab 100644 --- a/docs/examples/opencensus-exporter-metrics/collector.py +++ b/docs/examples/opencensus-exporter-metrics/collector.py @@ -21,7 +21,6 @@ OpenCensusMetricsExporter, ) from opentelemetry.sdk.metrics import Counter, MeterProvider -from opentelemetry.sdk.metrics.export.controller import PushController exporter = OpenCensusMetricsExporter( service_name="basic-service", endpoint="localhost:55678" @@ -29,7 +28,7 @@ metrics.set_meter_provider(MeterProvider()) meter = metrics.get_meter(__name__) -controller = PushController(meter, exporter, 5) +metrics.get_meter_provider().start_pipeline(meter, exporter, 5) requests_counter = meter.create_metric( name="requests", diff --git a/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py b/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py index 59ef3f1708a..da22042dcc5 100644 --- a/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py +++ b/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py @@ -29,7 +29,6 @@ from opentelemetry import metrics from opentelemetry.ext.prometheus import PrometheusMetricsExporter from opentelemetry.sdk.metrics import Counter, Meter - from opentelemetry.sdk.metrics.export.controller import PushController from prometheus_client import start_http_server # Start Prometheus client @@ -37,13 +36,12 @@ # Meter is responsible for creating and recording metrics metrics.set_meter_provider(MeterProvider()) - meter = metrics.meter() + meter = metrics.get_meter(__name__) # exporter to export metrics to Prometheus prefix = "MyAppPrefix" exporter = PrometheusMetricsExporter(prefix) - # controller collects metrics created from meter and exports it via the - # exporter every interval - controller = PushController(meter, exporter, 5) + # Starts the collect/export pipeline for metrics + metrics.get_meter_provider().start_pipeline(meter, exporter, 5) counter = meter.create_metric( "requests", diff --git a/ext/opentelemetry-ext-system-metrics/src/opentelemetry/ext/system_metrics/__init__.py b/ext/opentelemetry-ext-system-metrics/src/opentelemetry/ext/system_metrics/__init__.py index 09c633f14b4..10b09795571 100644 --- a/ext/opentelemetry-ext-system-metrics/src/opentelemetry/ext/system_metrics/__init__.py +++ b/ext/opentelemetry-ext-system-metrics/src/opentelemetry/ext/system_metrics/__init__.py @@ -28,9 +28,12 @@ .. code:: python + from opentelemetry import metrics from opentelemetry.ext.system_metrics import SystemMetrics + from opentelemetry.sdk.metrics import MeterProvider, from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter + metrics.set_meter_provider(MeterProvider()) exporter = ConsoleMetricsExporter() SystemMetrics(exporter) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 23007dde5a8..746f1ab4a50 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -14,6 +14,8 @@ ([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764)) - Add SumObserver, UpDownSumObserver and LastValueAggregator in metrics ([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789)) +- Add start_pipeline to MeterProvider + ([#791](https://github.com/open-telemetry/opentelemetry-python/pull/791)) ## 0.8b0 diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index 7156f68c165..e19d33580b5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -12,13 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import atexit import logging import threading from typing import Dict, Sequence, Tuple, Type from opentelemetry import metrics as metrics_api +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricsExporter, + MetricsExporter, +) from opentelemetry.sdk.metrics.export.aggregate import Aggregator from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher +from opentelemetry.sdk.metrics.export.controller import PushController from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo @@ -449,24 +455,64 @@ class MeterProvider(metrics_api.MeterProvider): Args: stateful: Indicates whether meters created are going to be stateful resource: Resource for this MeterProvider + shutdown_on_exit: Register an atexit hook to shut down when the + application exists """ def __init__( - self, stateful=True, resource: Resource = Resource.create_empty(), + self, + stateful=True, + resource: Resource = Resource.create_empty(), + shutdown_on_exit: bool = True, ): self.stateful = stateful self.resource = resource + self._controllers = [] + self._exporters = set() + self._atexit_handler = None + if shutdown_on_exit: + self._atexit_handler = atexit.register(self.shutdown) def get_meter( self, instrumenting_module_name: str, instrumenting_library_version: str = "", ) -> "metrics_api.Meter": + """See `opentelemetry.metrics.MeterProvider`.get_meter.""" if not instrumenting_module_name: # Reject empty strings too. - raise ValueError("get_meter called with missing module name.") + instrumenting_module_name = "ERROR:MISSING MODULE NAME" + logger.error("get_meter called with missing module name.") return Meter( self, InstrumentationInfo( - instrumenting_module_name, instrumenting_library_version + instrumenting_module_name, instrumenting_library_version, ), ) + + def start_pipeline( + self, + meter: metrics_api.Meter, + exporter: MetricsExporter = None, + interval: float = 15.0, + ) -> None: + """Method to begin the collect/export pipeline. + + Args: + meter: The meter to collect metrics from. + exporter: The exporter to export metrics to. + interval: The collect/export interval in seconds. + """ + if not exporter: + exporter = ConsoleMetricsExporter() + self._exporters.add(exporter) + # TODO: Controller type configurable? + self._controllers.append(PushController(meter, exporter, interval)) + + def shutdown(self) -> None: + for controller in self._controllers: + controller.shutdown() + for exporter in self._exporters: + exporter.shutdown() + if self._atexit_handler is not None: + atexit.unregister(self._atexit_handler) + self._atexit_handler = None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py index 88abed410a1..7448f353c45 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py @@ -12,30 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. -import atexit import threading from opentelemetry.context import attach, detach, set_value +from opentelemetry.metrics import Meter +from opentelemetry.sdk.metrics.export import MetricsExporter class PushController(threading.Thread): - """A push based controller, used for exporting. + """A push based controller, used for collecting and exporting. Uses a worker thread that periodically collects metrics for exporting, exports them and performs some post-processing. + + Args: + meter: The meter used to collect metrics. + exporter: The exporter used to export metrics. + interval: The collect/export interval in seconds. """ daemon = True - def __init__(self, meter, exporter, interval, shutdown_on_exit=True): + def __init__( + self, meter: Meter, exporter: MetricsExporter, interval: float + ): super().__init__() self.meter = meter self.exporter = exporter self.interval = interval self.finished = threading.Event() - self._atexit_handler = None - if shutdown_on_exit: - self._atexit_handler = atexit.register(self.shutdown) self.start() def run(self): @@ -46,17 +51,13 @@ def shutdown(self): self.finished.set() # Run one more collection pass to flush metrics batched in the meter self.tick() - self.exporter.shutdown() - if self._atexit_handler is not None: - atexit.unregister(self._atexit_handler) - self._atexit_handler = None def tick(self): # Collect all of the meter's metrics to be exported self.meter.collect() + # Export the collected metrics token = attach(set_value("suppress_instrumentation", True)) - # Export the given metrics in the batcher self.exporter.export(self.meter.batcher.checkpoint_set()) detach(token) - # Perform post-exporting logic based on batcher configuration + # Perform post-exporting logic self.meter.batcher.finished_collection() diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index 178e41b2134..901d5a94046 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -614,7 +614,6 @@ def test_push_controller(self): controller.shutdown() self.assertTrue(controller.finished.isSet()) - exporter.shutdown.assert_any_call() # shutdown should flush the meter self.assertEqual(meter.collect.call_count, 1) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 02cf2c93548..a92af748ac6 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -38,6 +38,30 @@ def test_resource_empty(self): # pylint: disable=protected-access self.assertIs(meter.resource, resources._EMPTY_RESOURCE) + def test_start_pipeline(self): + exporter = mock.Mock() + meter_provider = metrics.MeterProvider() + meter = meter_provider.get_meter(__name__) + # pylint: disable=protected-access + meter_provider.start_pipeline(meter, exporter, 6) + try: + self.assertEqual(len(meter_provider._exporters), 1) + self.assertEqual(len(meter_provider._controllers), 1) + finally: + meter_provider.shutdown() + + def test_shutdown(self): + controller = mock.Mock() + exporter = mock.Mock() + meter_provider = metrics.MeterProvider() + # pylint: disable=protected-access + meter_provider._controllers = [controller] + meter_provider._exporters = [exporter] + meter_provider.shutdown() + self.assertEqual(controller.shutdown.call_count, 1) + self.assertEqual(exporter.shutdown.call_count, 1) + self.assertIsNone(meter_provider._atexit_handler) + class TestMeter(unittest.TestCase): def test_extends_api(self):