diff --git a/README.rst b/README.rst index bdbe252dc..fd6569342 100644 --- a/README.rst +++ b/README.rst @@ -54,7 +54,7 @@ Installation & basic usage from opencensus.stats import stats as stats_module - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/contrib/opencensus-ext-prometheus/README.rst b/contrib/opencensus-ext-prometheus/README.rst index 2845ff7c9..5e36d506c 100644 --- a/contrib/opencensus-ext-prometheus/README.rst +++ b/contrib/opencensus-ext-prometheus/README.rst @@ -45,7 +45,7 @@ Register the Prometheus exporter .. code:: python - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager exporter = prometheus.new_stats_exporter(prometheus.Options(namespace="")) diff --git a/contrib/opencensus-ext-prometheus/examples/prometheus.py b/contrib/opencensus-ext-prometheus/examples/prometheus.py index 272d3e902..5e808d007 100644 --- a/contrib/opencensus-ext-prometheus/examples/prometheus.py +++ b/contrib/opencensus-ext-prometheus/examples/prometheus.py @@ -40,7 +40,7 @@ def main(): - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py b/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py index a3888af39..5d9795534 100644 --- a/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py +++ b/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py @@ -280,7 +280,7 @@ def test_exporter_constructor_no_namespace(self): def test_emit(self): options = prometheus.Options(namespace="opencensus", port=9005) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder exporter = prometheus.new_stats_exporter(options) diff --git a/contrib/opencensus-ext-stackdriver/README.rst b/contrib/opencensus-ext-stackdriver/README.rst index 30b6b3132..9702c3bbc 100644 --- a/contrib/opencensus-ext-stackdriver/README.rst +++ b/contrib/opencensus-ext-stackdriver/README.rst @@ -89,7 +89,7 @@ Register the Stackdriver exporter .. code:: python - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager exporter = stackdriver.new_stats_exporter(stackdriver.Options(project_id="")) diff --git a/contrib/opencensus-ext-stackdriver/examples/stackdriver.py b/contrib/opencensus-ext-stackdriver/examples/stackdriver.py index 9dbfa682d..f0632e7e1 100644 --- a/contrib/opencensus-ext-stackdriver/examples/stackdriver.py +++ b/contrib/opencensus-ext-stackdriver/examples/stackdriver.py @@ -30,7 +30,7 @@ "task_latency", "The task latency in milliseconds", "ms") # The stats recorder -stats = stats_module.Stats() +stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 0ed31a431..eb7ec5f9f 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -12,24 +12,25 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime import itertools -import logging import os import platform import re import string +import threading -from datetime import datetime from google.api_core.gapic_v1 import client_info from google.cloud import monitoring_v3 from opencensus.common import utils from opencensus.common.monitored_resource import monitored_resource -from opencensus.common.transports import async_ from opencensus.common.version import __version__ -from opencensus.stats import aggregation -from opencensus.stats import base_exporter -from opencensus.stats import measure +from opencensus.metrics import transport +from opencensus.metrics.export import metric as metric_module +from opencensus.metrics.export import metric_descriptor +from opencensus.stats import stats + MAX_TIME_SERIES_PER_UPLOAD = 200 OPENCENSUS_TASK = "opencensus_task" @@ -42,6 +43,25 @@ EPOCH_PATTERN = "%Y-%m-%dT%H:%M:%S.%fZ" GLOBAL_RESOURCE_TYPE = 'global' +# OC metric descriptor type to SD metric kind and value type +OC_MD_TO_SD_TYPE = { + metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64: + (monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64), + metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE: + (monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE, + monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE), + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION: + (monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE, + monitoring_v3.enums.MetricDescriptor.ValueType.DISTRIBUTION), + metric_descriptor.MetricDescriptorType.GAUGE_INT64: + (monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64), + metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE: + (monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE, + monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE) +} + class Options(object): """ Options contains options for configuring the exporter. @@ -110,18 +130,16 @@ def default_monitoring_labels(self): return self._default_monitoring_labels -class StackdriverStatsExporter(base_exporter.StatsExporter): +class StackdriverStatsExporter(object): """Stats exporter for the Stackdriver Monitoring backend.""" - def __init__(self, - options=Options(), - client=None, - default_labels={}, - transport=async_.AsyncTransport): + def __init__(self, options=None, client=None): + if options is None: + options = Options() self._options = options self._client = client - self._transport = transport(self) - self._default_labels = default_labels + self._md_cache = {} + self._md_lock = threading.Lock() @property def options(self): @@ -131,212 +149,157 @@ def options(self): def client(self): return self._client - @property - def transport(self): - return self._transport - - @property - def default_labels(self): - return self._default_labels - - def set_default_labels(self, value): - self._default_labels = value - - def on_register_view(self, view): - """ create metric descriptor for the registered view""" - if view is not None: - self.create_metric_descriptor(view) - - def emit(self, view_data): - """ export data to Stackdriver Monitoring""" - if view_data is not None: - self.handle_upload(view_data) - - def export(self, view_data): - """ export data to transport class""" - if view_data is not None: - self.transport.export(view_data) - - def handle_upload(self, view_data): - """ handle_upload handles uploading a slice of Data - as well as error handling. - """ - if view_data is not None: - self.upload_stats(view_data) - - def upload_stats(self, view_data): - """ It receives an array of view_data object - and create time series for each value - """ - view_data_set = utils.uniq(view_data) - time_series_batches = self.create_batched_time_series( - view_data_set, MAX_TIME_SERIES_PER_UPLOAD) - for time_series_batch in time_series_batches: + def export_metrics(self, metrics): + metrics = list(metrics) + for metric in metrics: + self.register_metric_descriptor(metric.descriptor) + ts_batches = self.create_batched_time_series(metrics) + for ts_batch in ts_batches: self.client.create_time_series( - self.client.project_path(self.options.project_id), - time_series_batch) + self.client.project_path(self.options.project_id), ts_batch) - def create_batched_time_series(self, view_data, batch_size): - """ Create the data structure that will be - sent to Stackdriver Monitoring - """ + def create_batched_time_series(self, metrics, + batch_size=MAX_TIME_SERIES_PER_UPLOAD): time_series_list = itertools.chain.from_iterable( - self.create_time_series_list( - v_data, self.options.resource, self.options.metric_prefix) - for v_data in view_data) + self.create_time_series_list(metric) for metric in metrics) return list(utils.window(time_series_list, batch_size)) - def create_time_series_list(self, v_data, option_resource_type, - metric_prefix): - """ Create the TimeSeries object based on the view data - """ - time_series_list = [] - aggregation_type = v_data.view.aggregation.aggregation_type - tag_agg = v_data.tag_value_aggregation_data_map - for tag_value, agg in tag_agg.items(): - series = monitoring_v3.types.TimeSeries() - series.metric.type = namespaced_view_name(v_data.view.name, - metric_prefix) - set_metric_labels(series, v_data.view, tag_value) - set_monitored_resource(series, option_resource_type) - - point = series.points.add() - if aggregation_type is aggregation.Type.DISTRIBUTION: - dist_value = point.value.distribution_value - dist_value.count = agg.count_data - dist_value.mean = agg.mean_data - - sum_of_sqd = agg.sum_of_sqd_deviations - dist_value.sum_of_squared_deviation = sum_of_sqd - - # Uncomment this when stackdriver supports Range - # point.value.distribution_value.range.min = agg_data.min - # point.value.distribution_value.range.max = agg_data.max - bounds = dist_value.bucket_options.explicit_buckets.bounds - buckets = dist_value.bucket_counts - - # Stackdriver expects a first bucket for samples in (-inf, 0), - # but we record positive samples only, and our first bucket is - # [0, first_bound). - bounds.extend([0]) - buckets.extend([0]) - bounds.extend(list(map(float, agg.bounds))) - buckets.extend(list(map(int, agg.counts_per_bucket))) - elif aggregation_type is aggregation.Type.COUNT: - point.value.int64_value = agg.count_data - elif aggregation_type is aggregation.Type.SUM: - if isinstance(v_data.view.measure, measure.MeasureInt): - # TODO: Add implementation of sum aggregation that does not - # store it's data as a float. - point.value.int64_value = int(agg.sum_data) - if isinstance(v_data.view.measure, measure.MeasureFloat): - point.value.double_value = float(agg.sum_data) - elif aggregation_type is aggregation.Type.LASTVALUE: - if isinstance(v_data.view.measure, measure.MeasureInt): - point.value.int64_value = int(agg.value) - if isinstance(v_data.view.measure, measure.MeasureFloat): - point.value.double_value = float(agg.value) - else: - raise TypeError("Unsupported aggregation type: %s" % - type(v_data.view.aggregation)) - - start = datetime.strptime(v_data.start_time, EPOCH_PATTERN) - end = datetime.strptime(v_data.end_time, EPOCH_PATTERN) - - timestamp_start = (start - EPOCH_DATETIME).total_seconds() - timestamp_end = (end - EPOCH_DATETIME).total_seconds() - - point.interval.end_time.seconds = int(timestamp_end) - - secs = point.interval.end_time.seconds - point.interval.end_time.nanos = int((timestamp_end - secs) * 10**9) - - if aggregation_type is not aggregation.Type.LASTVALUE: - if timestamp_start == timestamp_end: - # avoiding start_time and end_time to be equal - timestamp_start = timestamp_start - 1 - else: - # For LastValue (Gauge), start and end time must be the same. - timestamp_start = timestamp_end - - start_time = point.interval.start_time - start_time.seconds = int(timestamp_start) - start_secs = start_time.seconds - start_time.nanos = int((timestamp_start - start_secs) * 1e9) - - time_series_list.append(series) - - return time_series_list - - def create_metric_descriptor(self, view): - """ it creates a MetricDescriptor - for the given view data in Stackdriver Monitoring. - An error will be raised if there is - already a metric descriptor created with the same name - but it has a different aggregation or keys. - """ - view_measure = view.measure - view_aggregation = view.aggregation - view_name = view.name - - metric_type = namespaced_view_name(view_name, - self.options.metric_prefix) - value_type = None - unit = view_measure.unit - metric_desc = monitoring_v3.enums.MetricDescriptor - agg_type = aggregation.Type - - # Default metric Kind - metric_kind = metric_desc.MetricKind.CUMULATIVE - - if view_aggregation.aggregation_type is agg_type.COUNT: - value_type = metric_desc.ValueType.INT64 - # If the aggregation type is count - # which counts the number of recorded measurements - # the unit must be "1", because this view - # does not apply to the recorded values. - unit = str(1) - elif view_aggregation.aggregation_type is agg_type.SUM: - if isinstance(view_measure, measure.MeasureInt): - value_type = metric_desc.ValueType.INT64 - if isinstance(view_measure, measure.MeasureFloat): - value_type = metric_desc.ValueType.DOUBLE - elif view_aggregation.aggregation_type is agg_type.DISTRIBUTION: - value_type = metric_desc.ValueType.DISTRIBUTION - elif view_aggregation.aggregation_type is agg_type.LASTVALUE: - metric_kind = metric_desc.MetricKind.GAUGE - if isinstance(view_measure, measure.MeasureInt): - value_type = metric_desc.ValueType.INT64 - if isinstance(view_measure, measure.MeasureFloat): - value_type = metric_desc.ValueType.DOUBLE + def create_time_series_list(self, metric): + if not isinstance(metric, metric_module.Metric): # pragma: NO COVER + raise ValueError + return [self._convert_series(metric, ts) for ts in metric.time_series] + + def _convert_series(self, metric, ts): + """Convert an OC timeseries to a SD series.""" + series = monitoring_v3.types.TimeSeries() + series.metric.type = namespaced_view_name( + metric.descriptor.name, self.options.metric_prefix) + + series.metric.labels[OPENCENSUS_TASK] = get_task_value() + + for key, val in zip(metric.descriptor.label_keys, ts.label_values): + if val.value is not None: + safe_key = sanitize_label(key.key) + series.metric.labels[safe_key] = val.value + + set_monitored_resource(series, self.options.resource) + + for point in ts.points: + sd_point = series.points.add() + # this just modifies points, no return + self._convert_point(metric, ts, point, sd_point) + return series + + def _convert_point(self, metric, ts, point, sd_point): + """Convert an OC metric point to a SD point.""" + if (metric.descriptor.type == metric_descriptor.MetricDescriptorType + .CUMULATIVE_DISTRIBUTION): + + sd_dist_val = sd_point.value.distribution_value + sd_dist_val.count = point.value.count + sd_dist_val.sum_of_squared_deviation =\ + point.value.sum_of_squared_deviation + + assert sd_dist_val.bucket_options.explicit_buckets.bounds == [] + sd_dist_val.bucket_options.explicit_buckets.bounds.extend( + [0.0] + + list(map(float, point.value.bucket_options.type_.bounds)) + ) + + assert sd_dist_val.bucket_counts == [] + sd_dist_val.bucket_counts.extend( + [0] + + [bb.count for bb in point.value.buckets] + ) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64): + sd_point.value.int64_value = int(point.value.value) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE): + sd_point.value.double_value = float(point.value.value) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.GAUGE_INT64): + sd_point.value.int64_value = int(point.value.value) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE): + sd_point.value.double_value = float(point.value.value) + + # TODO: handle SUMMARY metrics, #567 + else: # pragma: NO COVER + raise TypeError("Unsupported metric type: {}" + .format(metric.descriptor.type)) + + end = point.timestamp + if ts.start_timestamp is None: + start = end else: - raise Exception( - "unsupported aggregation type: %s" % type(view_aggregation)) + start = datetime.strptime(ts.start_timestamp, EPOCH_PATTERN) - display_name_prefix = DEFAULT_DISPLAY_NAME_PREFIX - if self.options.metric_prefix != "": - display_name_prefix = self.options.metric_prefix + timestamp_start = (start - EPOCH_DATETIME).total_seconds() + timestamp_end = (end - EPOCH_DATETIME).total_seconds() + + sd_point.interval.end_time.seconds = int(timestamp_end) + + secs = sd_point.interval.end_time.seconds + sd_point.interval.end_time.nanos = int((timestamp_end - secs) * 1e9) - descriptor_pattern = "projects/%s/metricDescriptors/%s" - project_id = self.options.project_id + start_time = sd_point.interval.start_time + start_time.seconds = int(timestamp_start) + start_time.nanos = int((timestamp_start - start_time.seconds) * 1e9) - desc_labels = new_label_descriptors(self.default_labels, view.columns) + def get_descriptor_type(self, oc_md): + """Get a SD descriptor type for an OC metric descriptor.""" + return namespaced_view_name(oc_md.name, self.options.metric_prefix) + + def get_metric_descriptor(self, oc_md): + """Convert an OC metric descriptor to a SD metric descriptor.""" + try: + metric_kind, value_type = OC_MD_TO_SD_TYPE[oc_md.type] + except KeyError: + raise TypeError("Unsupported metric type: {}".format(oc_md.type)) + + if self.options.metric_prefix: + display_name_prefix = self.options.metric_prefix + else: + display_name_prefix = DEFAULT_DISPLAY_NAME_PREFIX + + default_labels = self.options.default_monitoring_labels + if default_labels is None: + default_labels = {} + desc_labels = new_label_descriptors(default_labels, oc_md.label_keys) descriptor = monitoring_v3.types.MetricDescriptor(labels=desc_labels) + metric_type = self.get_descriptor_type(oc_md) descriptor.type = metric_type descriptor.metric_kind = metric_kind descriptor.value_type = value_type - descriptor.description = view.description - descriptor.unit = unit + descriptor.description = oc_md.description + descriptor.unit = oc_md.unit + descriptor.name = ("projects/{}/metricDescriptors/{}" + .format(self.options.project_id, metric_type)) + descriptor.display_name = ("{}/{}" + .format(display_name_prefix, oc_md.name)) - descriptor.name = descriptor_pattern % (project_id, metric_type) - descriptor.display_name = "%s/%s" % (display_name_prefix, view_name) - - client = self.client - project_name = client.project_path(project_id) - descriptor = client.create_metric_descriptor(project_name, descriptor) return descriptor + def register_metric_descriptor(self, oc_md): + """Register a metric descriptor with stackdriver.""" + descriptor_type = self.get_descriptor_type(oc_md) + with self._md_lock: + if descriptor_type in self._md_cache: + return self._md_cache[descriptor_type] + + descriptor = self.get_metric_descriptor(oc_md) + project_name = self.client.project_path(self.options.project_id) + sd_md = self.client.create_metric_descriptor(project_name, descriptor) + with self._md_lock: + self._md_cache[descriptor_type] = sd_md + return sd_md + def set_monitored_resource(series, option_resource_type): """Set a resource(type and labels) that can be used for monitoring. @@ -402,21 +365,34 @@ def get_user_agent_slug(): return "opencensus-python/{}".format(__version__) -def new_stats_exporter(options): - """ new_stats_exporter returns an exporter that - uploads stats data to Stackdriver Monitoring. +def new_stats_exporter(options, interval=None): + """Get a stats exporter and running transport thread. + + Create a new `StackdriverStatsExporter` with the given options and start + periodically exporting stats to stackdriver in the background. + + See `opencensus.metrics.transport.get_exporter_thread` for details on the + transport thread. + + :type options: :class:`Options` + :param exporter: Options to pass to the exporter + + :type interval: int or float + :param interval: Seconds between export calls. + + :rtype: :class:`StackdriverStatsExporter` and :class:`PeriodicTask` + :return: A tuple of the exporter and transport thread. """ if str(options.project_id).strip() == "": - raise Exception(ERROR_BLANK_PROJECT_ID) + raise ValueError(ERROR_BLANK_PROJECT_ID) ci = client_info.ClientInfo(client_library_version=get_user_agent_slug()) client = monitoring_v3.MetricServiceClient(client_info=ci) - exporter = StackdriverStatsExporter(client=client, options=options) - if options.default_monitoring_labels is not None: - exporter.set_default_labels(options.default_monitoring_labels) - return exporter + tt = transport.get_exporter_thread(stats.stats, exporter, + interval=interval) + return exporter, tt def get_task_value(): @@ -447,27 +423,16 @@ def new_label_descriptors(defaults, keys): label["description"] = lbl label_descriptors.append(label) - for tag_key in keys: + for label_key in keys: label = {} - label["key"] = sanitize_label(tag_key) + label["key"] = sanitize_label(label_key.key) + label["description"] = sanitize_label(label_key.description) label_descriptors.append(label) label_descriptors.append({"key": OPENCENSUS_TASK, "description": OPENCENSUS_TASK_DESCRIPTION}) return label_descriptors -def set_metric_labels(series, view, tag_values): - if len(view.columns) != len(tag_values): - logging.warning( - "TagKeys and TagValues don't have same size." - ) # pragma: NO COVER - - for key, value in zip(view.columns, tag_values): - if value is not None: - series.metric.labels[sanitize_label(key)] = value - series.metric.labels[OPENCENSUS_TASK] = get_task_value() - - def sanitize_label(text): """Remove characters not accepted in labels key diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index cf3d07d0f..8200bd2c4 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -21,10 +21,19 @@ from opencensus.common import utils from opencensus.common.version import __version__ from opencensus.ext.stackdriver import stats_exporter as stackdriver +from opencensus.metrics import label_key +from opencensus.metrics import label_value +from opencensus.metrics import transport as transport_module +from opencensus.metrics.export import metric +from opencensus.metrics.export import metric_descriptor +from opencensus.metrics.export import point +from opencensus.metrics.export import time_series +from opencensus.metrics.export import value from opencensus.stats import aggregation as aggregation_module from opencensus.stats import aggregation_data as aggregation_data_module from opencensus.stats import execution_context from opencensus.stats import measure as measure_module +from opencensus.stats import metric_utils from opencensus.stats import stats as stats_module from opencensus.stats import view as view_module from opencensus.stats import view_data as view_data_module @@ -43,9 +52,9 @@ FRONTEND_KEY_INT_CLEAN = "my_org_keys_frontend_INT" FRONTEND_KEY_STR_CLEAN = "my_org_keys_frontend_STR" -VIDEO_SIZE_MEASURE = measure_module.MeasureInt( +VIDEO_SIZE_MEASURE = measure_module.MeasureFloat( "my.org/measure/video_size_test2", "size of processed videos", "By") -VIDEO_SIZE_MEASURE_2 = measure_module.MeasureInt( +VIDEO_SIZE_MEASURE_2 = measure_module.MeasureFloat( "my.org/measure/video_size_test_2", "size of processed videos", "By") VIDEO_SIZE_MEASURE_FLOAT = measure_module.MeasureFloat( @@ -59,7 +68,8 @@ VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY], VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION) -TEST_TIME = utils.to_iso_str(datetime(2018, 12, 25, 1, 2, 3, 4)) +TEST_TIME = datetime(2018, 12, 25, 1, 2, 3, 4) +TEST_TIME_STR = utils.to_iso_str(TEST_TIME) class _Client(object): @@ -100,14 +110,13 @@ def test_constructor_param(self): project_id = 1 default_labels = {'key1': 'value1'} exporter = stackdriver.StackdriverStatsExporter( - options=stackdriver.Options(project_id=project_id), - default_labels=default_labels) - + options=stackdriver.Options( + project_id=project_id, + default_monitoring_labels=default_labels)) self.assertEqual(exporter.options.project_id, project_id) - self.assertEqual(exporter.default_labels, default_labels) def test_blank_project(self): - self.assertRaises(Exception, stackdriver.new_stats_exporter, + self.assertRaises(ValueError, stackdriver.new_stats_exporter, stackdriver.Options(project_id="")) def test_not_blank_project(self): @@ -116,7 +125,7 @@ def test_not_blank_project(self): '.monitoring_v3.MetricServiceClient'), _Client) with patch_client: - exporter_created = stackdriver.new_stats_exporter( + exporter_created, transport = stackdriver.new_stats_exporter( stackdriver.Options(project_id=1)) self.assertIsInstance(exporter_created, @@ -137,7 +146,7 @@ def test_client_info_user_agent(self): '.MetricServiceClient', _Client) with patch_client: - exporter = stackdriver.new_stats_exporter( + exporter, transport = stackdriver.new_stats_exporter( stackdriver.Options(project_id=1)) self.assertIn(stackdriver.get_user_agent_slug(), @@ -173,36 +182,11 @@ def test_sanitize(self): self.assertEqual(len(result), 100) self.assertEqual(result, "key_" + "0123456789" * 9 + "012345") - def test_singleton_with_params(self): - default_labels = {'key1': 'value1'} - patch_client = mock.patch( - ('opencensus.ext.stackdriver.stats_exporter' - '.monitoring_v3.MetricServiceClient'), _Client) - - with patch_client: - exporter_created = stackdriver.new_stats_exporter( - stackdriver.Options( - project_id=1, default_monitoring_labels=default_labels)) - - self.assertEqual(exporter_created.default_labels, default_labels) - def test_get_task_value(self): task_value = stackdriver.get_task_value() self.assertNotEqual(task_value, "") - def test_set_default_labels(self): - labels = {'key': 'value'} - exporter = stackdriver.StackdriverStatsExporter() - exporter.set_default_labels(labels) - self.assertEqual(exporter.default_labels, labels) - - def test_new_label_descriptors(self): - defaults = {'key1': 'value1'} - keys = [FRONTEND_KEY] - output = stackdriver.new_label_descriptors(defaults, keys) - self.assertEqual(len(output), 3) - - def test_namespacedviews(self): + def test_namespaced_views(self): view_name = "view-1" expected_view_name_namespaced = ( "custom.googleapis.com/opencensus/{}".format(view_name)) @@ -214,80 +198,275 @@ def test_namespacedviews(self): view_name, "kubernetes.io/myorg") self.assertEqual(expected_view_name_namespaced, view_name_namespaced) - def test_on_register_view(self): - client = mock.Mock() - view_none = None - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.on_register_view(VIDEO_SIZE_VIEW) - exporter.on_register_view(view_none) - self.assertTrue(client.create_metric_descriptor.called) + def test_stackdriver_register_exporter(self): + stats = stats_module.stats + view_manager = stats.view_manager - @mock.patch('opencensus.ext.stackdriver.stats_exporter.' - 'monitored_resource.get_instance', - return_value=None) - def test_emit(self, monitor_resource_mock): - client = mock.Mock() - v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - v_data.record(context=tag_map_module.TagMap(), value=2, - timestamp=None) - view_data = [v_data] - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.emit(view_data) - exporter.emit(None) - self.assertTrue(client.create_time_series.called) + exporter = mock.Mock() + if len(view_manager.measure_to_view_map.exporters) > 0: + view_manager.unregister_exporter( + view_manager.measure_to_view_map.exporters[0]) + view_manager.register_exporter(exporter) - def test_export_no_data(self): - client = mock.Mock() - transport = mock.Mock() - option = stackdriver.Options(project_id="project-test") + registered_exporters = len(view_manager.measure_to_view_map.exporters) + + self.assertEqual(registered_exporters, 1) + + @mock.patch('os.getpid', return_value=12345) + @mock.patch( + 'platform.uname', + return_value=('system', 'node', 'release', 'version', 'machine', + 'processor')) + def test_get_task_value_with_hostname(self, mock_uname, mock_pid): + self.assertEqual(stackdriver.get_task_value(), "py-12345@node") + + @mock.patch('os.getpid', return_value=12345) + @mock.patch( + 'platform.uname', + return_value=('system', '', 'release', 'version', 'machine', + 'processor')) + def test_get_task_value_without_hostname(self, mock_uname, mock_pid): + self.assertEqual(stackdriver.get_task_value(), "py-12345@localhost") + + def test_get_metric_descriptor(self): exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client, transport=transport) - exporter.export(None) - self.assertFalse(exporter.transport.export.called) + options=stackdriver.Options( + default_monitoring_labels={'dk': 'dd'}, + project_id='project_id'), + client=mock.Mock()) + + oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('ck', 'cd')] + ) - def test_export_with_data(self): - client = mock.Mock() - transport = mock.Mock() - v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - view_data = [v_data] - option = stackdriver.Options(project_id="project-test") + sd_md = exporter.get_metric_descriptor(oc_md) + self.assertEqual( + sd_md.metric_kind, + monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE) + self.assertEqual( + sd_md.value_type, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64) + + self.assertIsInstance(sd_md, monitoring_v3.types.MetricDescriptor) + exporter.client.create_metric_descriptor.assert_not_called() + + def test_get_metric_descriptor_bad_type(self): exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client, transport=transport) - exporter.export(view_data) - self.assertTrue(exporter.transport.export.called) + options=stackdriver.Options(project_id='project_id'), + client=mock.Mock()) + + bad_type_oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + # Need a valid type to create the descriptor + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + bad_type_oc_md._type = 100 + + with self.assertRaises(TypeError): + exporter.get_metric_descriptor(bad_type_oc_md) + + def test_get_metric_descriptor_custom_prefix(self): - def test_handle_upload_no_data(self): - client = mock.Mock() - option = stackdriver.Options(project_id="project-test") exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.handle_upload(None) - self.assertFalse(client.create_time_series.called) + options=stackdriver.Options( + default_monitoring_labels={'dk': 'dd'}, + metric_prefix='metric_prefix', + project_id='project_id'), + client=mock.Mock()) + + oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('ck', 'cd')] + ) - @mock.patch('opencensus.ext.stackdriver.stats_exporter.' - 'monitored_resource.get_instance', - return_value=None) - def test_handle_upload_with_data(self, monitor_resource_mock): - client = mock.Mock() - v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - v_data.record(context=tag_map_module.TagMap(), value=2, - timestamp=None) - view_data = [v_data] - option = stackdriver.Options(project_id="project-test") + sd_md = exporter.get_metric_descriptor(oc_md) + self.assertIn('metric_prefix', sd_md.type) + self.assertIn('metric_prefix', sd_md.name) + + def test_register_metric_descriptor(self): exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.handle_upload(view_data) - self.assertTrue(client.create_time_series.called) + options=stackdriver.Options( + metric_prefix='metric_prefix', + project_id='project_id'), + client=mock.Mock()) + + oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + + exporter.register_metric_descriptor(oc_md) + self.assertEqual( + exporter.client.create_metric_descriptor.call_count, + 1 + ) + exporter.register_metric_descriptor(oc_md) + self.assertEqual( + exporter.client.create_metric_descriptor.call_count, + 1 + ) + + def test_export_metrics(self): + lv = label_value.LabelValue('val') + val = value.ValueLong(value=123) + dt = datetime(2019, 3, 20, 21, 34, 0, 537954) + pp = point.Point(value=val, timestamp=dt) + + ts = [ + time_series.TimeSeries(label_values=[lv], points=[pp], + start_timestamp=utils.to_iso_str(dt)) + ] + + desc = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + + mm = metric.Metric(descriptor=desc, time_series=ts) + + exporter = stackdriver.StackdriverStatsExporter(client=mock.Mock()) + exporter.export_metrics([mm]) + + self.assertEqual(exporter.client.create_time_series.call_count, 1) + sd_args = exporter.client.create_time_series.call_args[0][1] + self.assertEqual(len(sd_args), 1) + [sd_arg] = exporter.client.create_time_series.call_args[0][1] + self.assertEqual(sd_arg.points[0].value.int64_value, 123) + + +class MockPeriodicTask(object): + """Testing mock of metrics.transport.PeriodicTask. + + Simulate calling export asynchronously from another thread synchronously + from this one. + """ + def __init__(self, func, interval=None, **kwargs): + self.func = func + self.logger = mock.Mock() + self.start = mock.Mock() + self.run = mock.Mock() + + def step(self): + try: + self.func() + except transport_module.TransportError as ex: + self.logger.exception(ex) + self.stop() + except Exception: + self.logger.exception("Error handling metric export") + + +@mock.patch('opencensus.ext.stackdriver.stats_exporter' + '.monitoring_v3.MetricServiceClient') +class TestAsyncStatsExport(unittest.TestCase): + """Check that metrics are exported using the exporter thread.""" + + def setUp(self): + patcher = mock.patch( + 'opencensus.metrics.transport.PeriodicTask', + MockPeriodicTask) + patcher.start() + self.addCleanup(patcher.stop) + + @mock.patch('opencensus.ext.stackdriver.stats_exporter' + '.stats.stats') + def test_export_empty(self, mock_stats, mock_client): + """Check that we don't attempt to export empty metric sets.""" + + mock_stats.get_metrics.return_value = [] + + exporter, transport = stackdriver.new_stats_exporter( + stackdriver.Options(project_id=1)) + + transport.step() + exporter.client.create_metric_descriptor.assert_not_called() + exporter.client.create_time_series.assert_not_called() + + @mock.patch('opencensus.ext.stackdriver.stats_exporter' + '.stats.stats') + def test_export_single_metric(self, mock_stats, mock_client): + """Check that we can export a set of a single metric.""" + + lv = label_value.LabelValue('val') + val = value.ValueLong(value=123) + dt = datetime(2019, 3, 20, 21, 34, 0, 537954) + pp = point.Point(value=val, timestamp=dt) + + ts = [ + time_series.TimeSeries(label_values=[lv], points=[pp], + start_timestamp=utils.to_iso_str(dt)) + ] + + desc = metric_descriptor.MetricDescriptor( + name='name2', + description='description2', + unit='unit2', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + + mm = metric.Metric(descriptor=desc, time_series=ts) + mock_stats.get_metrics.return_value = [mm] + + exporter, transport = stackdriver.new_stats_exporter( + stackdriver.Options(project_id=1)) + + transport.step() - def assertCorrectLabels(self, actual_labels, expected_labels, - include_opencensus=False): + exporter.client.create_metric_descriptor.assert_called() + self.assertEqual( + exporter.client.create_metric_descriptor.call_count, + 1) + md_call_arg =\ + exporter.client.create_metric_descriptor.call_args[0][1] + self.assertEqual( + md_call_arg.metric_kind, + monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE + ) + self.assertEqual( + md_call_arg.value_type, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64 + ) + + exporter.client.create_time_series.assert_called() + self.assertEqual( + exporter.client.create_time_series.call_count, + 1) + ts_call_arg = exporter.client.create_time_series.call_args[0][1] + self.assertEqual(len(ts_call_arg), 1) + self.assertEqual(len(ts_call_arg[0].points), 1) + self.assertEqual(ts_call_arg[0].points[0].value.int64_value, 123) + + +class TestCreateTimeseries(unittest.TestCase): + + def setUp(self): + patcher = mock.patch( + 'opencensus.ext.stackdriver.stats_exporter.stats.stats', + stats_module._Stats()) + patcher.start() + self.addCleanup(patcher.stop) + + def check_labels(self, + actual_labels, + expected_labels, + include_opencensus=False): actual_labels = dict(actual_labels) if include_opencensus: opencensus_tag = actual_labels.pop(stackdriver.OPENCENSUS_TASK) @@ -301,15 +480,18 @@ def assertCorrectLabels(self, actual_labels, expected_labels, def test_create_batched_time_series(self, monitor_resource_mock): client = mock.Mock() v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - v_data.record(context=tag_map_module.TagMap(), value=2, - timestamp=None) + view=VIDEO_SIZE_VIEW, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR) + v_data.record(context=tag_map_module.TagMap(), value=2, timestamp=None) view_data = [v_data] option = stackdriver.Options(project_id="project-test") exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) + view_data = [metric_utils.view_data_to_metric(view_data[0], TEST_TIME)] + time_series_batches = exporter.create_batched_time_series(view_data, 1) self.assertEqual(len(time_series_batches), 1) @@ -319,8 +501,8 @@ def test_create_batched_time_series(self, monitor_resource_mock): self.assertEqual( time_series.metric.type, 'custom.googleapis.com/opencensus/' + VIDEO_SIZE_VIEW_NAME) - self.assertCorrectLabels(time_series.metric.labels, {}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {}, include_opencensus=True) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -330,11 +512,11 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): # First view with 3 view_name1 = "view-name1" - view1 = view_module.View(view_name1, "test description", - ['test'], VIDEO_SIZE_MEASURE, + view1 = view_module.View(view_name1, "test description", ['test'], + VIDEO_SIZE_MEASURE, aggregation_module.LastValueAggregation()) v_data1 = view_data_module.ViewData( - view=view1, start_time=TEST_TIME, end_time=TEST_TIME) + view=view1, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) v_data1.record(context=tag_map_module.TagMap({'test': '1'}), value=7, timestamp=None) v_data1.record(context=tag_map_module.TagMap({'test': '2'}), value=5, @@ -344,17 +526,19 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): # Second view with 2 view_name2 = "view-name2" - view2 = view_module.View(view_name2, "test description", - ['test'], VIDEO_SIZE_MEASURE, + view2 = view_module.View(view_name2, "test description", ['test'], + VIDEO_SIZE_MEASURE, aggregation_module.LastValueAggregation()) v_data2 = view_data_module.ViewData( - view=view2, start_time=TEST_TIME, end_time=TEST_TIME) + view=view2, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) v_data2.record(context=tag_map_module.TagMap({'test': '1'}), value=7, timestamp=None) v_data2.record(context=tag_map_module.TagMap({'test': '2'}), value=5, timestamp=None) view_data = [v_data1, v_data2] + view_data = [metric_utils.view_data_to_metric(vd, TEST_TIME) + for vd in view_data] option = stackdriver.Options(project_id="project-test") exporter = stackdriver.StackdriverStatsExporter( @@ -368,19 +552,25 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): self.assertEqual(len(tsb2), 2) self.assertEqual(len(tsb3), 1) - def test_stackdriver_register_exporter(self): - stats = stats_module.Stats() + def setup_create_timeseries_test(self): + client = mock.Mock() + execution_context.clear() + + option = stackdriver.Options( + project_id="project-test", resource="global") + exporter = stackdriver.StackdriverStatsExporter( + options=option, client=client) + + stats = stats_module.stats view_manager = stats.view_manager + stats_recorder = stats.stats_recorder - exporter = mock.Mock() if len(view_manager.measure_to_view_map.exporters) > 0: view_manager.unregister_exporter( view_manager.measure_to_view_map.exporters[0]) - view_manager.register_exporter(exporter) - - registered_exporters = len(view_manager.measure_to_view_map.exporters) - self.assertEqual(registered_exporters, 1) + view_manager.register_exporter(exporter) + return view_manager, stats_recorder, exporter @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -402,7 +592,9 @@ def test_create_timeseries(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view( VIDEO_SIZE_VIEW_NAME, None) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] @@ -410,39 +602,48 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual( time_series_list[0].metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) value = time_series.points[0].value self.assertEqual(value.distribution_value.count, 1) - self.assertEqual(value.distribution_value.mean, 25 * MiB) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/my.org/views/video_size_test2") - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) value = time_series.points[0].value self.assertEqual(value.distribution_value.count, 1) - self.assertEqual(value.distribution_value.mean, 25 * MiB) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance') def test_create_timeseries_with_resource(self, monitor_resource_mock): - view_manager, stats_recorder, exporter = \ - self.setup_create_timeseries_test() + client = mock.Mock() + execution_context.clear() + + option = stackdriver.Options(project_id="project-test", resource="") + exporter = stackdriver.StackdriverStatsExporter( + options=option, client=client) + + stats = stats_module.stats + view_manager = stats.view_manager + stats_recorder = stats.stats_recorder + + if len(view_manager.measure_to_view_map.exporters) > 0: + view_manager.unregister_exporter( + view_manager.measure_to_view_map.exporters[0]) + + view_manager.register_exporter(exporter) view_manager.register_view(VIDEO_SIZE_VIEW) tag_value = tag_value_module.TagValue("1200") @@ -456,6 +657,8 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view( VIDEO_SIZE_VIEW_NAME, None) + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + # check for gce_instance monitored resource mocked_labels = { 'instance_id': 'my-instance', @@ -470,26 +673,25 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mocked_labels monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "gce_instance") - self.assertCorrectLabels(time_series.resource.labels, { - 'instance_id': 'my-instance', - 'project_id': 'my-project', - 'zone': 'us-east1', - }) + self.check_labels( + time_series.resource.labels, { + 'instance_id': 'my-instance', + 'project_id': 'my-project', + 'zone': 'us-east1', + }) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") self.assertIsNotNone(time_series) - time_series_list = exporter.create_time_series_list( - v_data, "global", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.resource.type, "global") - self.assertCorrectLabels(time_series.resource.labels, {}) + self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -509,17 +711,18 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mocked_labels monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "k8s_container") - self.assertCorrectLabels(time_series.resource.labels, { - 'project_id': 'my-project', - 'location': 'us-east1', - 'cluster_name': 'cluster', - 'pod_name': 'localhost', - 'namespace_name': 'namespace', - }) + self.check_labels( + time_series.resource.labels, { + 'project_id': 'my-project', + 'location': 'us-east1', + 'cluster_name': 'cluster', + 'pod_name': 'localhost', + 'namespace_name': 'namespace', + }) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -537,15 +740,16 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mocked_labels monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "aws_ec2_instance") - self.assertCorrectLabels(time_series.resource.labels, { - 'instance_id': 'my-instance', - 'aws_account': 'my-project', - 'region': 'aws:us-east1', - }) + self.check_labels( + time_series.resource.labels, { + 'instance_id': 'my-instance', + 'aws_account': 'my-project', + 'region': 'aws:us-east1', + }) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -557,11 +761,11 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mock.Mock() monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, 'global') - self.assertCorrectLabels(time_series.resource.labels, {}) + self.check_labels(time_series.resource.labels, {}) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -592,20 +796,21 @@ def test_create_timeseries_str_tagvalue(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view(view_name1, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg/") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/view-name1") - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_INT_CLEAN: "Abc"}, - include_opencensus=True) + + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_INT_CLEAN: "Abc"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) expected_value = monitoring_v3.types.TypedValue() - expected_value.int64_value = 25 * MiB + # TODO: #565 + expected_value.double_value = 25.0 * MiB self.assertEqual(time_series.points[0].value, expected_value) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' @@ -634,15 +839,14 @@ def test_create_timeseries_str_tagvalue_count_aggregtation( v_data = measure_map.measure_to_view_map.get_view(view_name1, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg/") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/view-name1") - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_INT_CLEAN: "Abc"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_INT_CLEAN: "Abc"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -676,15 +880,14 @@ def test_create_timeseries_last_value_float_tagvalue( v_data = measure_map.measure_to_view_map.get_view(view_name2, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/view-name2") - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_FLOAT_CLEAN: "Abc"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_FLOAT_CLEAN: "Abc"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -703,7 +906,7 @@ def test_create_timeseries_float_tagvalue(self, monitor_resource_mock): exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder @@ -731,15 +934,16 @@ def test_create_timeseries_float_tagvalue(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view(view_name3, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list self.assertEqual(time_series.metric.type, "custom.googleapis.com/opencensus/view-name3") - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_FLOAT_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_FLOAT_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -774,11 +978,15 @@ def test_create_timeseries_multiple_tag_values(self, v_data = measure_map.measure_to_view_map.get_view( VIDEO_SIZE_VIEW_NAME, None) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 2) - ts_by_frontend = {ts.metric.labels.get(FRONTEND_KEY_CLEAN): ts - for ts in time_series_list} + ts_by_frontend = { + ts.metric.labels.get(FRONTEND_KEY_CLEAN): ts + for ts in time_series_list + } self.assertEqual(set(ts_by_frontend.keys()), {"1200", "1400"}) ts1 = ts_by_frontend["1200"] ts2 = ts_by_frontend["1400"] @@ -793,7 +1001,6 @@ def test_create_timeseries_multiple_tag_values(self, self.assertEqual(len(ts1.points), 1) value1 = ts1.points[0].value self.assertEqual(value1.distribution_value.count, 1) - self.assertEqual(value1.distribution_value.mean, 25 * MiB) # Verify second time series self.assertEqual(ts2.resource.type, "global") @@ -805,7 +1012,6 @@ def test_create_timeseries_multiple_tag_values(self, self.assertEqual(len(ts2.points), 1) value2 = ts2.points[0].value self.assertEqual(value2.distribution_value.count, 1) - self.assertEqual(value2.distribution_value.mean, 12 * MiB) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -833,7 +1039,9 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): v_data = measure_map.measure_to_view_map.get_view(view_name, None) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list @@ -842,46 +1050,35 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): self.assertEqual(time_series.resource.type, "global") self.assertEqual(time_series.metric.type, "custom.googleapis.com/opencensus/" + view_name) - self.assertCorrectLabels(time_series.metric.labels, - {FRONTEND_KEY_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) expected_value = monitoring_v3.types.TypedValue() - expected_value.int64_value = 25 * MiB + # TODO: #565 + expected_value.double_value = 25.0 * MiB self.assertEqual(time_series.points[0].value, expected_value) - def setup_create_timeseries_test(self): - client = mock.Mock() - execution_context.clear() - - option = stackdriver.Options( - project_id="project-test", resource="global") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - - stats = stats_module.Stats() - view_manager = stats.view_manager - stats_recorder = stats.stats_recorder - - if len(view_manager.measure_to_view_map.exporters) > 0: - view_manager.unregister_exporter( - view_manager.measure_to_view_map.exporters[0]) - - view_manager.register_exporter(exporter) - return view_manager, stats_recorder, exporter - def test_create_timeseries_from_distribution(self): """Check for explicit 0-bound bucket for SD export.""" + agg = aggregation_module.DistributionAggregation( + aggregation_type=aggregation_module.Type.DISTRIBUTION) + + view = view_module.View( + name="example.org/test_view", + description="example.org/test_view", + columns=['tag_key'], + measure=mock.Mock(), + aggregation=agg, + ) - v_data = mock.Mock(spec=view_data_module.ViewData) - v_data.view.name = "example.org/test_view" - v_data.view.columns = ['tag_key'] - v_data.view.aggregation.aggregation_type = \ - aggregation_module.Type.DISTRIBUTION - v_data.start_time = TEST_TIME - v_data.end_time = TEST_TIME + v_data = view_data_module.ViewData( + view=view, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR, + ) # Aggregation over (10 * range(10)) for buckets [2, 4, 6, 8] dad = aggregation_data_module.DistributionAggregationData( @@ -892,57 +1089,62 @@ def test_create_timeseries_from_distribution(self): bounds=[2, 4, 6, 8], exemplars={mock.Mock() for ii in range(5)} ) - v_data.tag_value_aggregation_data_map = {('tag_value',): dad} + v_data._tag_value_aggregation_data_map = {('tag_value',): dad} - exporter = stackdriver.StackdriverStatsExporter( - options=mock.Mock(), - client=mock.Mock(), - ) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + exporter = stackdriver.StackdriverStatsExporter() + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list - self.assertCorrectLabels(time_series.metric.labels, - {'tag_key': 'tag_value'}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {'tag_key': 'tag_value'}, + include_opencensus=True) self.assertEqual(len(time_series.points), 1) [point] = time_series.points dv = point.value.distribution_value self.assertEqual(100, dv.count) - self.assertEqual(4.5, dv.mean) self.assertEqual(825.0, dv.sum_of_squared_deviation) self.assertEqual([0, 20, 20, 20, 20, 20], dv.bucket_counts) self.assertEqual([0, 2, 4, 6, 8], dv.bucket_options.explicit_buckets.bounds) - def test_create_timeseries_something(self): + def test_create_timeseries_multiple_tags(self): """Check that exporter creates timeseries for multiple tag values. create_time_series_list should return a time series for each set of values in the tag value aggregation map. """ + agg = aggregation_module.CountAggregation( + aggregation_type=aggregation_module.Type.COUNT) + + view = view_module.View( + name="example.org/test_view", + description="example.org/test_view", + columns=[tag_key_module.TagKey('color'), + tag_key_module.TagKey('shape')], + measure=mock.Mock(), + aggregation=agg, + ) - v_data = mock.Mock(spec=view_data_module.ViewData) - v_data.view.name = "example.org/test_view" - v_data.view.columns = [tag_key_module.TagKey('color'), - tag_key_module.TagKey('shape')] - v_data.view.aggregation.aggregation_type = \ - aggregation_module.Type.COUNT - v_data.start_time = TEST_TIME - v_data.end_time = TEST_TIME + v_data = view_data_module.ViewData( + view=view, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR, + ) rs_count = aggregation_data_module.CountAggregationData(10) bc_count = aggregation_data_module.CountAggregationData(20) - v_data.tag_value_aggregation_data_map = { + v_data._tag_value_aggregation_data_map = { ('red', 'square'): rs_count, ('blue', 'circle'): bc_count, } - exporter = stackdriver.StackdriverStatsExporter( - options=mock.Mock(), - client=mock.Mock(), - ) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + exporter = stackdriver.StackdriverStatsExporter() + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 2) self.assertEqual(len(time_series_list[0].points), 1) @@ -963,8 +1165,8 @@ def test_create_timeseries_invalid_aggregation(self): v_data.view.columns = [tag_key_module.TagKey('base_key')] v_data.view.aggregation.aggregation_type = \ aggregation_module.Type.NONE - v_data.start_time = TEST_TIME - v_data.end_time = TEST_TIME + v_data.start_time = TEST_TIME_STR + v_data.end_time = TEST_TIME_STR base_data = aggregation_data_module.BaseAggregationData(10) v_data.tag_value_aggregation_data_map = { @@ -975,128 +1177,5 @@ def test_create_timeseries_invalid_aggregation(self): options=mock.Mock(), client=mock.Mock(), ) - self.assertRaises(TypeError, exporter.create_time_series_list, - v_data, "", "") - - def test_create_metric_descriptor_count(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - view_name_count = "view-count" - agg_count = aggregation_module.CountAggregation(count=2) - view_count = view_module.View( - view_name_count, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE, agg_count) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_count) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_sum_int(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_sum_int = "view-sum-int" - agg_sum = aggregation_module.SumAggregation(sum=2) - view_sum_int = view_module.View( - view_name_sum_int, "processed video size over time", - [FRONTEND_KEY], VIDEO_SIZE_MEASURE, agg_sum) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_sum_int) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_sum_float(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_sum_float = "view-sum-float" - agg_sum = aggregation_module.SumAggregation(sum=2) - view_sum_float = view_module.View( - view_name_sum_float, "processed video size over time", - [FRONTEND_KEY_FLOAT], VIDEO_SIZE_MEASURE_FLOAT, agg_sum) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_sum_float) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(VIDEO_SIZE_VIEW) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_last_value_int(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_base = "view-base" - agg_base = aggregation_module.LastValueAggregation() - view_base = view_module.View( - view_name_base, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE, agg_base) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_base) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_last_value_float(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_base = "view-base" - agg_base = aggregation_module.LastValueAggregation() - view_base = view_module.View( - view_name_base, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE_FLOAT, agg_base) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_base) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_base(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_base = "view-base" - agg_base = aggregation_module.BaseAggregation() - view_base = view_module.View( - view_name_base, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE, agg_base) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - self.assertRaises(Exception, exporter.create_metric_descriptor, - view_base) - - def test_set_metric_labels(self): - series = monitoring_v3.types.TimeSeries() - tag_value = tag_value_module.TagValue("1200") - stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [tag_value]) - self.assertEqual(len(series.metric.labels), 2) - - def test_set_metric_labels_with_None(self): - series = monitoring_v3.types.TimeSeries() - stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [None]) - self.assertEqual(len(series.metric.labels), 1) - - @mock.patch('os.getpid', return_value=12345) - @mock.patch('platform.uname', return_value=('system', 'node', 'release', - 'version', 'machine', - 'processor')) - def test_get_task_value_with_hostname(self, mock_uname, mock_pid): - self.assertEqual(stackdriver.get_task_value(), "py-12345@node") - - @mock.patch('os.getpid', return_value=12345) - @mock.patch('platform.uname', return_value=('system', '', 'release', - 'version', 'machine', - 'processor')) - def test_get_task_value_without_hostname(self, mock_uname, mock_pid): - self.assertEqual(stackdriver.get_task_value(), "py-12345@localhost") + self.assertRaises(TypeError, exporter.create_time_series_list, v_data, + "", "") diff --git a/examples/stats/helloworld/main.py b/examples/stats/helloworld/main.py index f49107356..5f65696ca 100644 --- a/examples/stats/helloworld/main.py +++ b/examples/stats/helloworld/main.py @@ -39,7 +39,7 @@ def main(): - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/opencensus/metrics/transport.py b/opencensus/metrics/transport.py new file mode 100644 index 000000000..c804ea834 --- /dev/null +++ b/opencensus/metrics/transport.py @@ -0,0 +1,100 @@ +# Copyright 2019, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +from opencensus.common import utils + + +logger = logging.getLogger(__name__) + +DEFAULT_INTERVAL = 60 +GRACE_PERIOD = 5 + + +class TransportError(Exception): + pass + + +class PeriodicTask(threading.Thread): + """Thread that periodically calls a given function. + + :type func: function + :param func: The function to call. + + :type interval: int or float + :param interval: Seconds between calls to the function. + """ + + daemon = True + + def __init__(self, func, interval=None, **kwargs): + super(PeriodicTask, self).__init__(**kwargs) + if interval is None: + interval = DEFAULT_INTERVAL + self.func = func + self.interval = interval + self._stopped = threading.Event() + + def run(self): + while not self._stopped.wait(self.interval): + try: + self.func() + except TransportError as ex: + logger.exception(ex) + self.stop() + except Exception: + logger.exception("Error handling metric export") + + def stop(self): + self._stopped.set() + + +def get_exporter_thread(metric_producer, exporter, interval=None): + """Get a running task that periodically exports metrics. + + Get a `PeriodicTask` that periodically calls: + + exporter.export_metrics(metric_producer.get_metrics()) + + :type metric_producer: + :class:`opencensus.metrics.export.metric_producer.MetricProducer` + :param exporter: The producer to use to get metrics to export. + + :type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter` + :param exporter: The exporter to use to export metrics. + + :type interval: int or float + :param interval: Seconds between export calls. + + :rtype: :class:`PeriodicTask` + :return: A running thread responsible calling the exporter. + + """ + weak_get = utils.get_weakref(metric_producer.get_metrics) + weak_export = utils.get_weakref(exporter.export_metrics) + + def export_all(): + get = weak_get() + if get is None: + raise TransportError("Metric producer is not available") + export = weak_export() + if export is None: + raise TransportError("Metric exporter is not available") + export(get()) + + tt = PeriodicTask(export_all, interval=interval) + tt.start() + return tt diff --git a/opencensus/stats/measure_to_view_map.py b/opencensus/stats/measure_to_view_map.py index 3d23e5ac6..e127272b5 100644 --- a/opencensus/stats/measure_to_view_map.py +++ b/opencensus/stats/measure_to_view_map.py @@ -73,11 +73,15 @@ def filter_exported_views(self, all_views): views = set(all_views) return views + # TODO: deprecate def register_view(self, view, timestamp): """registers the view's measure name to View Datas given a view""" if len(self.exporters) > 0: - for e in self.exporters: - e.on_register_view(view) + try: + for e in self.exporters: + e.on_register_view(view) + except AttributeError: + pass self._exported_views = None existing_view = self._registered_views.get(view.name) @@ -118,13 +122,17 @@ def record(self, tags, measurement_map, timestamp, attachments=None): attachments=attachments) self.export(view_datas) + # TODO: deprecate def export(self, view_datas): """export view datas to registered exporters""" view_datas_copy = \ [self.copy_and_finalize_view_data(vd) for vd in view_datas] if len(self.exporters) > 0: for e in self.exporters: - e.export(view_datas_copy) + try: + e.export(view_datas_copy) + except AttributeError: + pass def get_metrics(self, timestamp): """Get a Metric for each registered view. diff --git a/opencensus/stats/stats.py b/opencensus/stats/stats.py index df1b27ee9..b3c4f322b 100644 --- a/opencensus/stats/stats.py +++ b/opencensus/stats/stats.py @@ -19,7 +19,7 @@ from opencensus.stats.view_manager import ViewManager -class Stats(MetricProducer): +class _Stats(MetricProducer): """Stats defines a View Manager and a Stats Recorder in order for the collection of Stats """ @@ -38,3 +38,6 @@ def get_metrics(self): """ return self.view_manager.measure_to_view_map.get_metrics( datetime.utcnow()) + + +stats = _Stats() diff --git a/tests/system/stats/prometheus/prometheus_stats_test.py b/tests/system/stats/prometheus/prometheus_stats_test.py index 541ec04be..06bcb31c7 100644 --- a/tests/system/stats/prometheus/prometheus_stats_test.py +++ b/tests/system/stats/prometheus/prometheus_stats_test.py @@ -39,7 +39,7 @@ def test_prometheus_stats(self): request_count_view_name, "number of requests broken down by methods", [method_key], request_count_measure, count_agg) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/tests/system/stats/stackdriver/stackdriver_stats_test.py b/tests/system/stats/stackdriver/stackdriver_stats_test.py index 1b6f8d09c..ce0c90133 100644 --- a/tests/system/stats/stackdriver/stackdriver_stats_test.py +++ b/tests/system/stats/stackdriver/stackdriver_stats_test.py @@ -14,9 +14,12 @@ import os import random +import sys import time -import unittest -from retrying import retry + +from google.cloud import monitoring_v3 +import mock + from opencensus.ext.stackdriver import stats_exporter as stackdriver from opencensus.stats import aggregation as aggregation_module from opencensus.stats import measure as measure_module @@ -25,17 +28,43 @@ from opencensus.tags import tag_key as tag_key_module from opencensus.tags import tag_map as tag_map_module from opencensus.tags import tag_value as tag_value_module -from opencensus.common.transports import sync -from google.cloud import monitoring_v3 + +if sys.version_info < (3,): + import unittest2 as unittest +else: + import unittest + MiB = 1 << 20 PROJECT = os.environ.get('GCLOUD_PROJECT_PYTHON') -RETRY_WAIT_PERIOD = 10000 # Wait 10 seconds between each retry -RETRY_MAX_ATTEMPT = 10 # Retry 10 times +ASYNC_TEST_INTERVAL = 15 # Background thread export interval class TestBasicStats(unittest.TestCase): + + def check_sd_md(self, exporter, view_description): + """Check that the metric descriptor was written to stackdriver.""" + name = exporter.client.project_path(PROJECT) + list_metrics_descriptors = exporter.client.list_metric_descriptors( + name) + + for ee in list_metrics_descriptors: + if ee.description == view_description: + break + else: + raise AssertionError("No matching metric descriptor") + + self.assertIsNotNone(ee) + self.assertEqual(ee.unit, "By") + + def setUp(self): + patcher = mock.patch( + 'opencensus.ext.stackdriver.stats_exporter.stats.stats', + stats_module._Stats()) + patcher.start() + self.addCleanup(patcher.stop) + def test_stats_record_sync(self): # We are using sufix in order to prevent cached objects sufix = str(os.getgid()) @@ -56,15 +85,14 @@ def test_stats_record_sync(self): VIDEO_SIZE_VIEW_NAME, view_description, [FRONTEND_KEY], VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder client = monitoring_v3.MetricServiceClient() exporter = stackdriver.StackdriverStatsExporter( options=stackdriver.Options(project_id=PROJECT), - client=client, - transport=sync.SyncTransport) + client=client) view_manager.register_exporter(exporter) # Register view. @@ -82,25 +110,12 @@ def test_stats_record_sync(self): measure_map.measure_int_put(VIDEO_SIZE_MEASURE, 25 * MiB) measure_map.record(tag_map) + exporter.export_metrics(stats_module.stats.get_metrics()) # Sleep for [0, 10] milliseconds to fake wait. time.sleep(random.randint(1, 10) / 1000.0) - @retry( - wait_fixed=RETRY_WAIT_PERIOD, - stop_max_attempt_number=RETRY_MAX_ATTEMPT) - def get_metric_descriptors(self, exporter, view_description): - name = exporter.client.project_path(PROJECT) - list_metrics_descriptors = exporter.client.list_metric_descriptors( - name) - element = next((element for element in list_metrics_descriptors - if element.description == view_description), None) - - self.assertIsNotNone(element) - self.assertEqual(element.description, view_description) - self.assertEqual(element.unit, "By") - - get_metric_descriptors(self, exporter, view_description) + self.check_sd_md(exporter, view_description) def test_stats_record_async(self): # We are using sufix in order to prevent cached objects @@ -124,12 +139,13 @@ def test_stats_record_async(self): VIDEO_SIZE_VIEW_NAME_ASYNC, view_description, [FRONTEND_KEY_ASYNC], VIDEO_SIZE_MEASURE_ASYNC, VIDEO_SIZE_DISTRIBUTION_ASYNC) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder - exporter = stackdriver.new_stats_exporter( - stackdriver.Options(project_id=PROJECT)) + exporter, transport = stackdriver.new_stats_exporter( + stackdriver.Options(project_id=PROJECT), + interval=ASYNC_TEST_INTERVAL) view_manager.register_exporter(exporter) # Register view. @@ -147,18 +163,8 @@ def test_stats_record_async(self): measure_map.measure_int_put(VIDEO_SIZE_MEASURE_ASYNC, 25 * MiB) measure_map.record(tag_map) + # Give the exporter thread enough time to export exactly once + time.sleep(ASYNC_TEST_INTERVAL * 2 - 1) + transport.stop() - @retry( - wait_fixed=RETRY_WAIT_PERIOD, - stop_max_attempt_number=RETRY_MAX_ATTEMPT) - def get_metric_descriptors(self, exporter, view_description): - name = exporter.client.project_path(PROJECT) - list_metrics_descriptors = exporter.client.list_metric_descriptors( - name) - element = next((element for element in list_metrics_descriptors - if element.description == view_description), None) - self.assertIsNotNone(element) - self.assertEqual(element.description, view_description) - self.assertEqual(element.unit, "By") - - get_metric_descriptors(self, exporter, view_description) + self.check_sd_md(exporter, view_description) diff --git a/tests/unit/metrics/test_transport.py b/tests/unit/metrics/test_transport.py new file mode 100644 index 000000000..69ee7769e --- /dev/null +++ b/tests/unit/metrics/test_transport.py @@ -0,0 +1,122 @@ +# Copyright 2019, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import sys +import time + +import mock + +from opencensus.metrics import transport + +if sys.version_info < (3,): + import unittest2 as unittest +else: + import unittest + + +# Some tests use real time! This is the time to wait between the exporter +# thread handling tasks, and doesn't account for processing time. If these +# tests become flaky, try increasing this. +INTERVAL = .05 + + +class TestPeriodicTask(unittest.TestCase): + + def test_default_constructor(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func) + self.assertEqual(task.func, mock_func) + self.assertEqual(task.interval, transport.DEFAULT_INTERVAL) + + def test_periodic_task_not_started(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func, INTERVAL) + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_func.assert_not_called() + task.stop() + + def test_periodic_task(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func, INTERVAL) + task.start() + mock_func.assert_not_called() + time.sleep(INTERVAL + INTERVAL / 2.0) + self.assertEqual(mock_func.call_count, 1) + time.sleep(INTERVAL) + self.assertEqual(mock_func.call_count, 2) + time.sleep(INTERVAL) + self.assertEqual(mock_func.call_count, 3) + + def test_periodic_task_stop(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func, INTERVAL) + task.start() + time.sleep(INTERVAL + INTERVAL / 2.0) + self.assertEqual(mock_func.call_count, 1) + task.stop() + time.sleep(INTERVAL) + self.assertEqual(mock_func.call_count, 1) + + +@mock.patch('opencensus.metrics.transport.DEFAULT_INTERVAL', INTERVAL) +@mock.patch('opencensus.metrics.transport.logger') +class TestGetExporterThreadPeriodic(unittest.TestCase): + + def test_threaded_export(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + metrics = mock.Mock() + producer.get_metrics.return_value = metrics + try: + task = transport.get_exporter_thread(producer, exporter) + producer.get_metrics.assert_not_called() + exporter.export_metrics.assert_not_called() + time.sleep(INTERVAL + INTERVAL / 2.0) + producer.get_metrics.assert_called_once_with() + exporter.export_metrics.assert_called_once_with(metrics) + finally: + task.stop() + task.join() + + def test_producer_error(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + + producer.get_metrics.side_effect = ValueError() + + task = transport.get_exporter_thread(producer, exporter) + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_logger.exception.assert_called() + self.assertFalse(task._stopped.is_set()) + + def test_producer_deleted(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + task = transport.get_exporter_thread(producer, exporter) + del producer + gc.collect() + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_logger.exception.assert_called() + self.assertTrue(task._stopped.is_set()) + + def test_exporter_deleted(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + task = transport.get_exporter_thread(producer, exporter) + del exporter + gc.collect() + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_logger.exception.assert_called() + self.assertTrue(task._stopped.is_set()) diff --git a/tests/unit/stats/test_stats.py b/tests/unit/stats/test_stats.py index 7eeae0c25..81588f41b 100644 --- a/tests/unit/stats/test_stats.py +++ b/tests/unit/stats/test_stats.py @@ -32,7 +32,7 @@ class TestStats(unittest.TestCase): def test_get_metrics(self): """Test that Stats converts recorded values into metrics.""" - stats = stats_module.Stats() + stats = stats_module.stats # Check that metrics are empty before view registration initial_metrics = list(stats.get_metrics())