diff --git a/CHANGELOG.md b/CHANGELOG.md index d1332dda741..d6a9df74f7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2754](https://github.com/open-telemetry/opentelemetry-python/pull/2754)) - Fix --insecure of CLI argument ([#2696](https://github.com/open-telemetry/opentelemetry-python/pull/2696)) +- Add temporality and aggregation configuration for metrics exporters, + use `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` only for OTLP metrics exporter + ([#2843](https://github.com/open-telemetry/opentelemetry-python/pull/2843)) - Instrument instances are always created through a Meter ([#2844](https://github.com/open-telemetry/opentelemetry-python/pull/2844)) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index fce7129a326..04645021f97 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -13,8 +13,9 @@ from logging import getLogger from os import environ -from typing import Optional, Sequence +from typing import Dict, Optional, Sequence from grpc import ChannelCredentials, Compression +from opentelemetry.sdk.metrics._internal.aggregation import Aggregation from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, get_resource_data, @@ -29,18 +30,25 @@ from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_INSECURE, + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, ) -from opentelemetry.sdk.metrics.export import ( - Gauge, +from opentelemetry.sdk.metrics import ( + Counter, Histogram, - Metric, - Sum, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, ) - from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + Gauge, + Histogram as HistogramType, + Metric, MetricExporter, MetricExportResult, MetricsData, + Sum, ) _logger = getLogger(__name__) @@ -61,6 +69,8 @@ def __init__( headers: Optional[Sequence] = None, timeout: Optional[int] = None, compression: Optional[Compression] = None, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[type, Aggregation] = None, ): if insecure is None: @@ -68,15 +78,48 @@ def __init__( if insecure is not None: insecure = insecure.lower() == "true" - super().__init__( - **{ - "endpoint": endpoint, - "insecure": insecure, - "credentials": credentials, - "headers": headers, - "timeout": timeout, - "compression": compression, + instrument_class_temporality = {} + if ( + environ.get( + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + "CUMULATIVE", + ) + .upper() + .strip() + == "DELTA" + ): + instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, } + else: + instrument_class_temporality = { + Counter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + instrument_class_temporality.update(preferred_temporality or {}) + + MetricExporter.__init__( + self, + preferred_temporality=instrument_class_temporality, + preferred_aggregation=preferred_aggregation, + ) + OTLPExporterMixin.__init__( + self, + endpoint=endpoint, + insecure=insecure, + credentials=credentials, + headers=headers, + timeout=timeout, + compression=compression, ) def _translate_data( @@ -132,7 +175,7 @@ def _translate_data( pt.as_double = data_point.value pb2_metric.gauge.data_points.append(pt) - elif isinstance(metric.data, Histogram): + elif isinstance(metric.data, HistogramType): for data_point in metric.data.data_points: pt = pb2.HistogramDataPoint( attributes=self._translate_attributes( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py index c25ab06263c..66151d09937 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py @@ -42,10 +42,19 @@ ) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_INSECURE, + OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, ) -from opentelemetry.sdk.metrics.export import ( - AggregationTemporality, +from opentelemetry.sdk.metrics import ( + Counter, Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk.metrics.export import AggregationTemporality +from opentelemetry.sdk.metrics.export import Histogram as HistogramType +from opentelemetry.sdk.metrics.export import ( HistogramDataPoint, Metric, MetricExportResult, @@ -121,7 +130,7 @@ def setUp(self): name="histogram", description="foo", unit="s", - data=Histogram( + data=HistogramType( data_points=[ HistogramDataPoint( attributes={"a": 1, "b": True}, @@ -302,6 +311,40 @@ def test_exporting(self): # pylint: disable=protected-access self.assertEqual(self.exporter._exporting, "metrics") + @patch.dict( + "os.environ", + {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"}, + ) + def test_preferred_temporality(self): + # pylint: disable=protected-access + exporter = OTLPMetricExporter( + preferred_temporality={Counter: AggregationTemporality.CUMULATIVE} + ) + self.assertEqual( + exporter._preferred_temporality[Counter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + exporter._preferred_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + exporter._preferred_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + exporter._preferred_temporality[ObservableCounter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + exporter._preferred_temporality[ObservableUpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + exporter._preferred_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) + @patch( "opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials" ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index a9b13f5c3f1..e97557226f1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -31,9 +31,6 @@ detach, set_value, ) -from opentelemetry.sdk.environment_variables import ( - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, -) from opentelemetry.sdk.metrics._internal.aggregation import ( AggregationTemporality, DefaultAggregation, @@ -73,8 +70,26 @@ class MetricExporter(ABC): Interface to be implemented by services that want to export metrics received in their own format. + + Args: + preferred_temporality: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to + configure exporter level preferred temporality. See `opentelemetry.sdk.metrics.export.MetricReader` for + more details on what preferred temporality is. + preferred_aggregation: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to + configure exporter level preferred aggregation. See `opentelemetry.sdk.metrics.export.MetricReader` for + more details on what preferred aggregation is. """ + def __init__( + self, + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] = None, + ) -> None: + self._preferred_temporality = preferred_temporality + self._preferred_aggregation = preferred_aggregation + @abstractmethod def export( self, @@ -122,6 +137,7 @@ def __init__( ] = lambda metrics_data: metrics_data.to_json() + linesep, ): + super().__init__() self.out = out self.formatter = formatter @@ -143,6 +159,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: class MetricReader(ABC): + # pylint: disable=too-many-branches """ Base class for all metric readers @@ -157,8 +174,6 @@ class MetricReader(ABC): temporalities of the classes that the user wants to change, not all of them. The classes not included in the passed dictionary will retain their association to their default aggregation temporalities. - The value passed here will override the corresponding values set - via the environment variable preferred_aggregation: A mapping between instrument classes and aggregation instances. By default maps all instrument classes to an instance of `DefaultAggregation`. This mapping will be used to @@ -177,10 +192,6 @@ class MetricReader(ABC): .. automethod:: _receive_metrics """ - # FIXME add :std:envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` - # to the end of the documentation paragraph above. - - # pylint: disable=too-many-branches def __init__( self, preferred_temporality: Dict[type, AggregationTemporality] = None, @@ -196,33 +207,14 @@ def __init__( Iterable["opentelemetry.sdk.metrics.export.Metric"], ] = None - if ( - environ.get( - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, - "CUMULATIVE", - ) - .upper() - .strip() - == "DELTA" - ): - self._instrument_class_temporality = { - _Counter: AggregationTemporality.DELTA, - _UpDownCounter: AggregationTemporality.CUMULATIVE, - _Histogram: AggregationTemporality.DELTA, - _ObservableCounter: AggregationTemporality.DELTA, - _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, - _ObservableGauge: AggregationTemporality.CUMULATIVE, - } - - else: - self._instrument_class_temporality = { - _Counter: AggregationTemporality.CUMULATIVE, - _UpDownCounter: AggregationTemporality.CUMULATIVE, - _Histogram: AggregationTemporality.CUMULATIVE, - _ObservableCounter: AggregationTemporality.CUMULATIVE, - _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, - _ObservableGauge: AggregationTemporality.CUMULATIVE, - } + self._instrument_class_temporality = { + _Counter: AggregationTemporality.CUMULATIVE, + _UpDownCounter: AggregationTemporality.CUMULATIVE, + _Histogram: AggregationTemporality.CUMULATIVE, + _ObservableCounter: AggregationTemporality.CUMULATIVE, + _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + _ObservableGauge: AggregationTemporality.CUMULATIVE, + } if preferred_temporality is not None: for temporality in preferred_temporality.values(): @@ -404,16 +396,13 @@ class PeriodicExportingMetricReader(MetricReader): def __init__( self, exporter: MetricExporter, - preferred_temporality: Dict[type, AggregationTemporality] = None, - preferred_aggregation: Dict[ - type, "opentelemetry.sdk.metrics.view.Aggregation" - ] = None, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: + # PeriodicExportingMetricReader defers to exporter for configuration super().__init__( - preferred_temporality=preferred_temporality, - preferred_aggregation=preferred_aggregation, + preferred_temporality=exporter._preferred_temporality, + preferred_aggregation=exporter._preferred_aggregation, ) self._exporter = exporter if export_interval_millis is None: diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py index e0d2813a5f0..82b65bd90cd 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -12,14 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from os import environ from typing import Dict, Iterable from unittest import TestCase -from unittest.mock import patch -from opentelemetry.sdk.environment_variables import ( - OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, -) from opentelemetry.sdk.metrics import Counter, Histogram, ObservableGauge from opentelemetry.sdk.metrics._internal.instrument import ( _Counter, @@ -74,66 +69,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: class TestMetricReader(TestCase): - @patch.dict( - environ, - {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "CUMULATIVE"}, - ) - def test_configure_temporality_cumulative(self): - - dummy_metric_reader = DummyMetricReader() - - self.assertEqual( - dummy_metric_reader._instrument_class_temporality.keys(), - set(_expected_keys), - ) - for ( - value - ) in dummy_metric_reader._instrument_class_temporality.values(): - self.assertEqual(value, AggregationTemporality.CUMULATIVE) - - @patch.dict( - environ, {OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"} - ) - def test_configure_temporality_delta(self): - - dummy_metric_reader = DummyMetricReader() - - self.assertEqual( - dummy_metric_reader._instrument_class_temporality.keys(), - set(_expected_keys), - ) - self.assertEqual( - dummy_metric_reader._instrument_class_temporality[_Counter], - AggregationTemporality.DELTA, - ) - self.assertEqual( - dummy_metric_reader._instrument_class_temporality[_UpDownCounter], - AggregationTemporality.CUMULATIVE, - ) - self.assertEqual( - dummy_metric_reader._instrument_class_temporality[_Histogram], - AggregationTemporality.DELTA, - ) - self.assertEqual( - dummy_metric_reader._instrument_class_temporality[ - _ObservableCounter - ], - AggregationTemporality.DELTA, - ) - self.assertEqual( - dummy_metric_reader._instrument_class_temporality[ - _ObservableUpDownCounter - ], - AggregationTemporality.CUMULATIVE, - ) - self.assertEqual( - dummy_metric_reader._instrument_class_temporality[ - _ObservableGauge - ], - AggregationTemporality.CUMULATIVE, - ) - - def test_configure_temporality_parameter(self): + def test_configure_temporality(self): dummy_metric_reader = DummyMetricReader( preferred_temporality={ @@ -177,7 +113,7 @@ def test_configure_temporality_parameter(self): AggregationTemporality.DELTA, ) - def test_default_temporality(self): + def test_configure_aggregation(self): dummy_metric_reader = DummyMetricReader() self.assertEqual( dummy_metric_reader._instrument_class_aggregation.keys(), diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 52154981d5f..b128456531b 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -64,9 +64,11 @@ def tearDown(self): MeterProvider._all_metric_readers = set() def test_register_metric_readers(self): - - metric_reader_0 = PeriodicExportingMetricReader(Mock()) - metric_reader_1 = PeriodicExportingMetricReader(Mock()) + mock_exporter = Mock() + mock_exporter._preferred_temporality = None + mock_exporter._preferred_aggregation = None + metric_reader_0 = PeriodicExportingMetricReader(mock_exporter) + metric_reader_1 = PeriodicExportingMetricReader(mock_exporter) try: MeterProvider(metric_readers=(metric_reader_0,)) @@ -429,6 +431,7 @@ def test_create_observable_up_down_counter(self): class InMemoryMetricExporter(MetricExporter): def __init__(self): + super().__init__() self.metrics = {} self._counter = 0 diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 65b831d6336..4213f9218ed 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -18,7 +18,10 @@ from flaky import flaky +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics._internal import _Counter from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, Gauge, Metric, MetricExporter, @@ -27,15 +30,25 @@ PeriodicExportingMetricReader, Sum, ) +from opentelemetry.sdk.metrics.view import ( + DefaultAggregation, + LastValueAggregation, +) from opentelemetry.test.concurrency_test import ConcurrencyTestBase from opentelemetry.util._time import _time_ns class FakeMetricsExporter(MetricExporter): - def __init__(self, wait=0): + def __init__( + self, wait=0, preferred_temporality=None, preferred_aggregation=None + ): self.wait = wait self.metrics = [] self._shutdown = False + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) def export( self, @@ -114,7 +127,9 @@ def _collect(reader): def test_ticker_called(self): collect_mock = Mock() - pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) + exporter = FakeMetricsExporter() + exporter.export = Mock() + pmr = PeriodicExportingMetricReader(exporter, export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) self.assertTrue(collect_mock.assert_called_once) @@ -141,8 +156,34 @@ def test_shutdown(self): self.assertTrue(exporter._shutdown) def test_shutdown_multiple_times(self): - pmr = self._create_periodic_reader([], Mock()) + pmr = self._create_periodic_reader([], FakeMetricsExporter()) with self.assertLogs(level="WARNING") as w: self.run_with_many_threads(pmr.shutdown) self.assertTrue("Can't shutdown multiple times", w.output[0]) pmr.shutdown() + + def test_exporter_temporality_preference(self): + exporter = FakeMetricsExporter( + preferred_temporality={ + Counter: AggregationTemporality.DELTA, + }, + ) + pmr = PeriodicExportingMetricReader(exporter) + for key, value in pmr._instrument_class_temporality.items(): + if key is not _Counter: + self.assertEqual(value, AggregationTemporality.CUMULATIVE) + else: + self.assertEqual(value, AggregationTemporality.DELTA) + + def test_exporter_aggregation_preference(self): + exporter = FakeMetricsExporter( + preferred_aggregation={ + Counter: LastValueAggregation(), + }, + ) + pmr = PeriodicExportingMetricReader(exporter) + for key, value in pmr._instrument_class_aggregation.items(): + if key is not _Counter: + self.assertTrue(isinstance(value, DefaultAggregation)) + else: + self.assertTrue(isinstance(value, LastValueAggregation))