From d3fdf36025bdbc4b9afe70a632c4421f8f9561b9 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 14:29:28 -0700 Subject: [PATCH 01/21] Use metrics instead of stats in SD exporter --- .../stackdriver/stats_exporter/__init__.py | 360 ++++++++---------- .../metrics/export/metric_descriptor.py | 17 + 2 files changed, 177 insertions(+), 200 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index f083e17cc..3d77fc5ef 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -12,14 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime import itertools import logging import os import platform import re import string +import threading -from datetime import datetime from google.api_core.gapic_v1 import client_info from google.cloud import monitoring_v3 @@ -27,10 +28,12 @@ from opencensus.common.monitored_resource import monitored_resource from opencensus.common.transports import async_ from opencensus.common.version import __version__ +from opencensus.metrics.export import metric_descriptor from opencensus.stats import aggregation from opencensus.stats import base_exporter from opencensus.stats import measure + MAX_TIME_SERIES_PER_UPLOAD = 200 OPENCENSUS_TASK = "opencensus_task" OPENCENSUS_TASK_DESCRIPTION = "Opencensus task identifier" @@ -110,18 +113,14 @@ def default_monitoring_labels(self): return self._default_monitoring_labels -class StackdriverStatsExporter(base_exporter.StatsExporter): +class StackdriverStatsExporter(object): """Stats exporter for the Stackdriver Monitoring backend.""" - def __init__(self, - options=Options(), - client=None, - default_labels={}, - transport=async_.AsyncTransport): + def __init__(self, options=Options(), client=None): self._options = options self._client = client - self._transport = transport(self) - self._default_labels = default_labels + self._md_cache = {} + self._md_lock = threading.Lock() @property def options(self): @@ -131,209 +130,174 @@ def options(self): def client(self): return self._client - @property - def transport(self): - return self._transport - - @property - def default_labels(self): - return self._default_labels - - def set_default_labels(self, value): - self._default_labels = value - - def on_register_view(self, view): - """ create metric descriptor for the registered view""" - if view is not None: - self.create_metric_descriptor(view) - - def emit(self, view_data): - """ export data to Stackdriver Monitoring""" - if view_data is not None: - self.handle_upload(view_data) - - def export(self, view_data): - """ export data to transport class""" - if view_data is not None: - self.transport.export(view_data) - - def handle_upload(self, view_data): - """ handle_upload handles uploading a slice of Data - as well as error handling. - """ - if view_data is not None: - self.upload_stats(view_data) - - def upload_stats(self, view_data): - """ It receives an array of view_data object - and create time series for each value - """ - view_data_set = utils.uniq(view_data) - time_series_batches = self.create_batched_time_series( - view_data_set, MAX_TIME_SERIES_PER_UPLOAD) - for time_series_batch in time_series_batches: + def export(self, metrics): + ts_batches = self.create_batched_time_series(metrics) + for ts_batch in ts_batches: self.client.create_time_series( self.client.project_path(self.options.project_id), - time_series_batch) + ts_batch) - def create_batched_time_series(self, view_data, batch_size): - """ Create the data structure that will be - sent to Stackdriver Monitoring - """ + def create_batched_time_series(self, metrics, + batch_size=MAX_TIME_SERIES_PER_UPLOAD): time_series_list = itertools.chain.from_iterable( - self.create_time_series_list( - v_data, self.options.resource, self.options.metric_prefix) - for v_data in view_data) + self.create_time_series_list(metric) for metric in metrics) return list(utils.window(time_series_list, batch_size)) - def create_time_series_list(self, v_data, option_resource_type, - metric_prefix): - """ Create the TimeSeries object based on the view data - """ - time_series_list = [] - aggregation_type = v_data.view.aggregation.aggregation_type - tag_agg = v_data.tag_value_aggregation_data_map - for tag_value, agg in tag_agg.items(): - series = monitoring_v3.types.TimeSeries() - series.metric.type = namespaced_view_name(v_data.view.name, - metric_prefix) - set_metric_labels(series, v_data.view, tag_value) - set_monitored_resource(series, option_resource_type) - - point = series.points.add() - if aggregation_type is aggregation.Type.DISTRIBUTION: - dist_value = point.value.distribution_value - dist_value.count = agg.count_data - dist_value.mean = agg.mean_data - - sum_of_sqd = agg.sum_of_sqd_deviations - dist_value.sum_of_squared_deviation = sum_of_sqd - - # Uncomment this when stackdriver supports Range - # point.value.distribution_value.range.min = agg_data.min - # point.value.distribution_value.range.max = agg_data.max - bounds = dist_value.bucket_options.explicit_buckets.bounds - buckets = dist_value.bucket_counts - - # Stackdriver expects a first bucket for samples in (-inf, 0), - # but we record positive samples only, and our first bucket is - # [0, first_bound). - bounds.extend([0]) - buckets.extend([0]) - bounds.extend(list(map(float, agg.bounds))) - buckets.extend(list(map(int, agg.counts_per_bucket))) - elif aggregation_type is aggregation.Type.COUNT: - point.value.int64_value = agg.count_data - elif aggregation_type is aggregation.Type.SUM: - if isinstance(v_data.view.measure, measure.MeasureInt): - # TODO: Add implementation of sum aggregation that does not - # store it's data as a float. - point.value.int64_value = int(agg.sum_data) - if isinstance(v_data.view.measure, measure.MeasureFloat): - point.value.double_value = float(agg.sum_data) - elif aggregation_type is aggregation.Type.LASTVALUE: - if isinstance(v_data.view.measure, measure.MeasureInt): - point.value.int64_value = int(agg.value) - if isinstance(v_data.view.measure, measure.MeasureFloat): - point.value.double_value = float(agg.value) - else: - raise TypeError("Unsupported aggregation type: %s" % - type(v_data.view.aggregation)) - - start = datetime.strptime(v_data.start_time, EPOCH_PATTERN) - end = datetime.strptime(v_data.end_time, EPOCH_PATTERN) - - timestamp_start = (start - EPOCH_DATETIME).total_seconds() - timestamp_end = (end - EPOCH_DATETIME).total_seconds() - - point.interval.end_time.seconds = int(timestamp_end) - - secs = point.interval.end_time.seconds - point.interval.end_time.nanos = int((timestamp_end - secs) * 10**9) - - if aggregation_type is not aggregation.Type.LASTVALUE: - if timestamp_start == timestamp_end: - # avoiding start_time and end_time to be equal - timestamp_start = timestamp_start - 1 - - start_time = point.interval.start_time - start_time.seconds = int(timestamp_start) - start_secs = start_time.seconds - start_time.nanos = int((timestamp_start - start_secs) * 1e9) - - time_series_list.append(series) - - return time_series_list - - def create_metric_descriptor(self, view): - """ it creates a MetricDescriptor - for the given view data in Stackdriver Monitoring. - An error will be raised if there is - already a metric descriptor created with the same name - but it has a different aggregation or keys. - """ - view_measure = view.measure - view_aggregation = view.aggregation - view_name = view.name - - metric_type = namespaced_view_name(view_name, - self.options.metric_prefix) - value_type = None - unit = view_measure.unit - metric_desc = monitoring_v3.enums.MetricDescriptor - agg_type = aggregation.Type - - # Default metric Kind - metric_kind = metric_desc.MetricKind.CUMULATIVE - - if view_aggregation.aggregation_type is agg_type.COUNT: - value_type = metric_desc.ValueType.INT64 - # If the aggregation type is count - # which counts the number of recorded measurements - # the unit must be "1", because this view - # does not apply to the recorded values. - unit = str(1) - elif view_aggregation.aggregation_type is agg_type.SUM: - if isinstance(view_measure, measure.MeasureInt): - value_type = metric_desc.ValueType.INT64 - if isinstance(view_measure, measure.MeasureFloat): - value_type = metric_desc.ValueType.DOUBLE - elif view_aggregation.aggregation_type is agg_type.DISTRIBUTION: - value_type = metric_desc.ValueType.DISTRIBUTION - elif view_aggregation.aggregation_type is agg_type.LASTVALUE: - metric_kind = metric_desc.MetricKind.GAUGE - if isinstance(view_measure, measure.MeasureInt): - value_type = metric_desc.ValueType.INT64 - if isinstance(view_measure, measure.MeasureFloat): - value_type = metric_desc.ValueType.DOUBLE + def create_time_series_list(self, metric): + tsl = [] + for ts in metric.time_series: + tsl.append(self._convert_series(metric, ts)) + return tsl + + def _convert_series(self, metric, ts): + """Convert an OC timeseries to a SD series.""" + series = monitoring_v3.types.TimeSeries() + series.metric.type = namespaced_view_name( + metric.descriptor.name, self.options.metric_prefix) + + series.metric.labels[OPENCENSUS_TASK] = get_task_value() + + for key, val in zip(metric.descriptor.label_keys, ts.label_values): + if val.value is not None: + safe_key = sanitize_label(key.key) + series.metric.labels[safe_key] = val.value + + set_monitored_resource(series, self.options.resource) + + for point in ts.points: + sd_point = series.points.add() + # this just modifies points, no return + self._convert_point(metric, ts, point, sd_point) + return series + + def _convert_point(self, metric, ts, point, sd_point): + """Convert an OC metric point to a SD point.""" + if (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION): + + sd_dist_val = sd_point.value.distribution_value + sd_dist_val.count = point.value.count + sd_dist_val.sum_of_squared_deviation = point.value.sum_of_squared_deviation + + assert sd_dist_val.bucket_options.explicit_buckets.bounds == [] + sd_dist_val.bucket_options.explicit_buckets.bounds.extend( + [0.0] + + list(map(float, point.value.bucket_options.type_.bounds)) + ) + + assert sd_dist_val.bucket_counts == [] + sd_dist_val.bucket_counts.extend( + [0] + + [bb.count for bb in point.value.buckets] + ) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64): + sd_point.value.int64_value = int(point.value.value) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE): + sd_point.value.double_value = float(point.value.value) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.GAUGE_INT64): + sd_point.value.int64_value = int(point.value.value) + + elif (metric.descriptor.type == + metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE): + sd_point.value.double_value = float(point.value.value) + else: - raise Exception( - "unsupported aggregation type: %s" % type(view_aggregation)) + md_type_name = metric_descriptor.MetricDescriptorType.to_name( + metric.descriptor.type) + raise TypeError("Unsupported metric type: {}".format(md_type_name)) + + start = datetime.strptime(ts.start_timestamp, EPOCH_PATTERN) + end = point.timestamp + + timestamp_start = (start - EPOCH_DATETIME).total_seconds() + timestamp_end = (end - EPOCH_DATETIME).total_seconds() + + sd_point.interval.end_time.seconds = int(timestamp_end) + + secs = sd_point.interval.end_time.seconds + sd_point.interval.end_time.nanos = int((timestamp_end - secs) * 1e9) + + start_time = sd_point.interval.start_time + start_time.seconds = int(timestamp_start) + start_secs = start_time.seconds + start_time.nanos = int((timestamp_start - start_secs) * 1e9) + + def get_descriptor_type(self, oc_md): + """Get a SD descriptor type for an OC metric descriptor.""" + return namespaced_view_name(oc_md.name, self.options.metric_prefix) + + def get_metric_descriptor(self, oc_md): + """Convert an OC metric descriptor to a SD metric descriptor.""" + if (oc_md.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64): + mk = monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE + vt = monitoring_v3.enums.MetricDescriptor.ValueType.INT64 + elif (oc_md.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE): + mk = monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE + vt = monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE + elif (oc_md.type == + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION): + mk = monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE + vt = monitoring_v3.enums.MetricDescriptor.ValueType.DISTRIBUTION + elif (oc_md.type == + metric_descriptor.MetricDescriptorType.GAUGE_INT64): + mk = monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE + vt = monitoring_v3.enums.MetricDescriptor.ValueType.INT64 + elif (oc_md.type == + metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE): + mk = monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE + vt = monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE + else: + md_type_name = metric_descriptor.MetricDescriptorType.to_name( + oc_md.type) + raise TypeError("Unsupported metric type: {}".format(md_type_name)) - display_name_prefix = DEFAULT_DISPLAY_NAME_PREFIX - if self.options.metric_prefix != "": + if self.options.metric_prefix: display_name_prefix = self.options.metric_prefix + else: + display_name_prefix = DEFAULT_DISPLAY_NAME_PREFIX - descriptor_pattern = "projects/%s/metricDescriptors/%s" - project_id = self.options.project_id - - desc_labels = new_label_descriptors(self.default_labels, view.columns) + lks = [lk.key for lk in oc_md.label_keys] + default_labels = self.options.default_monitoring_labels + if default_labels is None: + default_labels = {} + desc_labels = new_label_descriptors(default_labels, lks) descriptor = monitoring_v3.types.MetricDescriptor(labels=desc_labels) + metric_type = self.get_descriptor_type(oc_md) descriptor.type = metric_type - descriptor.metric_kind = metric_kind - descriptor.value_type = value_type - descriptor.description = view.description - descriptor.unit = unit - - descriptor.name = descriptor_pattern % (project_id, metric_type) - descriptor.display_name = "%s/%s" % (display_name_prefix, view_name) + descriptor.metric_kind = mk + descriptor.value_type = vt + descriptor.description = oc_md.description + descriptor.unit = oc_md.unit + descriptor.name = ("projects/{}/metricDescriptors/{}" + .format(self.options.project_id, metric_type)) + descriptor.display_name = ("{}/{}" + .format(display_name_prefix, oc_md.name)) - client = self.client - project_name = client.project_path(project_id) - descriptor = client.create_metric_descriptor(project_name, descriptor) return descriptor + def register_metric_descriptor(self, oc_md): + """Register a metric descriptor with stackdriver.""" + descriptor_type = self.get_descriptor_type(oc_md) + with self._md_lock: + try: + return self._md_cache[descriptor_type] + except KeyError: + descriptor = self.get_metric_descriptor(oc_md) + project_name = self.client.project_path(self.options.project_id) + sd_md = self.client.create_metric_descriptor( + project_name, descriptor) + self._md_cache[descriptor_type] = sd_md + return sd_md + def set_monitored_resource(series, option_resource_type): """Set a resource(type and labels) that can be used for monitoring. @@ -408,11 +372,7 @@ def new_stats_exporter(options): ci = client_info.ClientInfo(client_library_version=get_user_agent_slug()) client = monitoring_v3.MetricServiceClient(client_info=ci) - exporter = StackdriverStatsExporter(client=client, options=options) - - if options.default_monitoring_labels is not None: - exporter.set_default_labels(options.default_monitoring_labels) return exporter diff --git a/opencensus/metrics/export/metric_descriptor.py b/opencensus/metrics/export/metric_descriptor.py index cbb258547..f5b43f1d9 100644 --- a/opencensus/metrics/export/metric_descriptor.py +++ b/opencensus/metrics/export/metric_descriptor.py @@ -90,6 +90,16 @@ class MetricDescriptorType(object): SUMMARY: value.ValueSummary } + _name_map = { + GAUGE_INT64: 'GAUGE_INT64', + GAUGE_DOUBLE: 'GAUGE_DOUBLE', + GAUGE_DISTRIBUTION: 'GAUGE_DISTRIBUTION', + CUMULATIVE_INT64: 'CUMULATIVE_INT64', + CUMULATIVE_DOUBLE: 'CUMULATIVE_DOUBLE', + CUMULATIVE_DISTRIBUTION: 'CUMULATIVE_DISTRIBUTION', + SUMMARY: 'SUMMARY', + } + @classmethod def to_type_class(cls, metric_descriptor_type): try: @@ -97,6 +107,13 @@ def to_type_class(cls, metric_descriptor_type): except KeyError: raise ValueError("Unknown MetricDescriptorType value") + @classmethod + def to_name(cls, metric_descriptor_type): + try: + return cls._name_map[metric_descriptor_type] + except KeyError: + raise ValueError("Unknown MetricDescriptorType value") + class MetricDescriptor(object): """Defines a metric type and its schema. From bce0b8661fd8751e0d7ee97d33ee66e25adf2ac1 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 16:23:13 -0700 Subject: [PATCH 02/21] Cleanup for PR comments --- .../stackdriver/stats_exporter/__init__.py | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 3d77fc5ef..eca6b9214 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -45,6 +45,25 @@ EPOCH_PATTERN = "%Y-%m-%dT%H:%M:%S.%fZ" GLOBAL_RESOURCE_TYPE = 'global' +# OC metric descriptor type to SD metric kind and value type +OC_MD_TO_SD_TYPE = { + metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64: + (monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64), + metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE: + (monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE, + monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE), + metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION: + (monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE, + monitoring_v3.enums.MetricDescriptor.ValueType.DISTRIBUTION), + metric_descriptor.MetricDescriptorType.GAUGE_INT64: + (monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64), + metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE: + (monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE, + monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE) +} + class Options(object): """ Options contains options for configuring the exporter. @@ -144,10 +163,7 @@ def create_batched_time_series(self, metrics, return list(utils.window(time_series_list, batch_size)) def create_time_series_list(self, metric): - tsl = [] - for ts in metric.time_series: - tsl.append(self._convert_series(metric, ts)) - return tsl + return [self._convert_series(metric, ts) for ts in metric.time_series] def _convert_series(self, metric, ts): """Convert an OC timeseries to a SD series.""" @@ -234,27 +250,9 @@ def get_descriptor_type(self, oc_md): def get_metric_descriptor(self, oc_md): """Convert an OC metric descriptor to a SD metric descriptor.""" - if (oc_md.type == - metric_descriptor.MetricDescriptorType.CUMULATIVE_INT64): - mk = monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE - vt = monitoring_v3.enums.MetricDescriptor.ValueType.INT64 - elif (oc_md.type == - metric_descriptor.MetricDescriptorType.CUMULATIVE_DOUBLE): - mk = monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE - vt = monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE - elif (oc_md.type == - metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION): - mk = monitoring_v3.enums.MetricDescriptor.MetricKind.CUMULATIVE - vt = monitoring_v3.enums.MetricDescriptor.ValueType.DISTRIBUTION - elif (oc_md.type == - metric_descriptor.MetricDescriptorType.GAUGE_INT64): - mk = monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE - vt = monitoring_v3.enums.MetricDescriptor.ValueType.INT64 - elif (oc_md.type == - metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE): - mk = monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE - vt = monitoring_v3.enums.MetricDescriptor.ValueType.DOUBLE - else: + try: + metric_kind, value_type = OC_MD_TO_SD_TYPE[oc_md.type] + except KeyError: md_type_name = metric_descriptor.MetricDescriptorType.to_name( oc_md.type) raise TypeError("Unsupported metric type: {}".format(md_type_name)) @@ -273,8 +271,8 @@ def get_metric_descriptor(self, oc_md): descriptor = monitoring_v3.types.MetricDescriptor(labels=desc_labels) metric_type = self.get_descriptor_type(oc_md) descriptor.type = metric_type - descriptor.metric_kind = mk - descriptor.value_type = vt + descriptor.metric_kind = metric_kind + descriptor.value_type = value_type descriptor.description = oc_md.description descriptor.unit = oc_md.unit descriptor.name = ("projects/{}/metricDescriptors/{}" From 156f068b62dd8f4dc7e7c25c60a52930ac201069 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 16:51:43 -0700 Subject: [PATCH 03/21] Don't raise bare exception --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index eca6b9214..3935a1862 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -366,7 +366,7 @@ def new_stats_exporter(options): uploads stats data to Stackdriver Monitoring. """ if str(options.project_id).strip() == "": - raise Exception(ERROR_BLANK_PROJECT_ID) + raise ValueError(ERROR_BLANK_PROJECT_ID) ci = client_info.ClientInfo(client_library_version=get_user_agent_slug()) client = monitoring_v3.MetricServiceClient(client_info=ci) From 7cc40f8e362877d30b57e939fbf7ddcfcc03702e Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 19:20:51 -0700 Subject: [PATCH 04/21] Handle missing ts in gauge metrics on export --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 3935a1862..9bb865194 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -228,8 +228,11 @@ def _convert_point(self, metric, ts, point, sd_point): metric.descriptor.type) raise TypeError("Unsupported metric type: {}".format(md_type_name)) - start = datetime.strptime(ts.start_timestamp, EPOCH_PATTERN) end = point.timestamp + if ts.start_timestamp is None: + start = end + else: + start = datetime.strptime(ts.start_timestamp, EPOCH_PATTERN) timestamp_start = (start - EPOCH_DATETIME).total_seconds() timestamp_end = (end - EPOCH_DATETIME).total_seconds() From 05c37b497896e62c28ab7278ec42710413f35353 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 19:25:05 -0700 Subject: [PATCH 05/21] Allow for missing register_view, export methods --- opencensus/stats/measure_to_view_map.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/opencensus/stats/measure_to_view_map.py b/opencensus/stats/measure_to_view_map.py index 3d23e5ac6..e127272b5 100644 --- a/opencensus/stats/measure_to_view_map.py +++ b/opencensus/stats/measure_to_view_map.py @@ -73,11 +73,15 @@ def filter_exported_views(self, all_views): views = set(all_views) return views + # TODO: deprecate def register_view(self, view, timestamp): """registers the view's measure name to View Datas given a view""" if len(self.exporters) > 0: - for e in self.exporters: - e.on_register_view(view) + try: + for e in self.exporters: + e.on_register_view(view) + except AttributeError: + pass self._exported_views = None existing_view = self._registered_views.get(view.name) @@ -118,13 +122,17 @@ def record(self, tags, measurement_map, timestamp, attachments=None): attachments=attachments) self.export(view_datas) + # TODO: deprecate def export(self, view_datas): """export view datas to registered exporters""" view_datas_copy = \ [self.copy_and_finalize_view_data(vd) for vd in view_datas] if len(self.exporters) > 0: for e in self.exporters: - e.export(view_datas_copy) + try: + e.export(view_datas_copy) + except AttributeError: + pass def get_metrics(self, timestamp): """Get a Metric for each registered view. From 6523277e142765349df28d7d33d9e00bbf641964 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 19:31:00 -0700 Subject: [PATCH 06/21] Separate metric export method --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 9bb865194..565d60753 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -149,12 +149,11 @@ def options(self): def client(self): return self._client - def export(self, metrics): + def export_metrics(self, metrics): ts_batches = self.create_batched_time_series(metrics) for ts_batch in ts_batches: self.client.create_time_series( - self.client.project_path(self.options.project_id), - ts_batch) + self.client.project_path(self.options.project_id), ts_batch) def create_batched_time_series(self, metrics, batch_size=MAX_TIME_SERIES_PER_UPLOAD): From 89d4d60997381118e470e1aa8c84e0a830b6cfd3 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 18 Mar 2019 19:32:03 -0700 Subject: [PATCH 07/21] Guard specifically against old VD export calls --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 565d60753..8a712fa62 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -28,6 +28,7 @@ from opencensus.common.monitored_resource import monitored_resource from opencensus.common.transports import async_ from opencensus.common.version import __version__ +from opencensus.metrics.export import metric as metric_module from opencensus.metrics.export import metric_descriptor from opencensus.stats import aggregation from opencensus.stats import base_exporter @@ -162,6 +163,8 @@ def create_batched_time_series(self, metrics, return list(utils.window(time_series_list, batch_size)) def create_time_series_list(self, metric): + if not isinstance(metric, metric_module.Metric): + raise ValueError return [self._convert_series(metric, ts) for ts in metric.time_series] def _convert_series(self, metric, ts): From e89c84e408c8ca5286642ab7c97484a59f9b630e Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 12:21:50 -0700 Subject: [PATCH 08/21] Update tests to use metrics --- .../stackdriver/stats_exporter/__init__.py | 2 +- .../tests/test_stackdriver_stats.py | 502 ++++++------------ 2 files changed, 178 insertions(+), 326 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 8a712fa62..6ccdb04ba 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -163,7 +163,7 @@ def create_batched_time_series(self, metrics, return list(utils.window(time_series_list, batch_size)) def create_time_series_list(self, metric): - if not isinstance(metric, metric_module.Metric): + if not isinstance(metric, metric_module.Metric): # pragma: NO COVER raise ValueError return [self._convert_series(metric, ts) for ts in metric.time_series] diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index cf3d07d0f..ec17eecd2 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -25,6 +25,7 @@ from opencensus.stats import aggregation_data as aggregation_data_module from opencensus.stats import execution_context from opencensus.stats import measure as measure_module +from opencensus.stats import metric_utils from opencensus.stats import stats as stats_module from opencensus.stats import view as view_module from opencensus.stats import view_data as view_data_module @@ -59,7 +60,8 @@ VIDEO_SIZE_VIEW_NAME, "processed video size over time", [FRONTEND_KEY], VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION) -TEST_TIME = utils.to_iso_str(datetime(2018, 12, 25, 1, 2, 3, 4)) +TEST_TIME = datetime(2018, 12, 25, 1, 2, 3, 4) +TEST_TIME_STR = utils.to_iso_str(TEST_TIME) class _Client(object): @@ -100,14 +102,12 @@ def test_constructor_param(self): project_id = 1 default_labels = {'key1': 'value1'} exporter = stackdriver.StackdriverStatsExporter( - options=stackdriver.Options(project_id=project_id), - default_labels=default_labels) - + options=stackdriver.Options( + project_id=project_id, default_monitoring_labels=default_labels)) self.assertEqual(exporter.options.project_id, project_id) - self.assertEqual(exporter.default_labels, default_labels) def test_blank_project(self): - self.assertRaises(Exception, stackdriver.new_stats_exporter, + self.assertRaises(ValueError, stackdriver.new_stats_exporter, stackdriver.Options(project_id="")) def test_not_blank_project(self): @@ -173,36 +173,17 @@ def test_sanitize(self): self.assertEqual(len(result), 100) self.assertEqual(result, "key_" + "0123456789" * 9 + "012345") - def test_singleton_with_params(self): - default_labels = {'key1': 'value1'} - patch_client = mock.patch( - ('opencensus.ext.stackdriver.stats_exporter' - '.monitoring_v3.MetricServiceClient'), _Client) - - with patch_client: - exporter_created = stackdriver.new_stats_exporter( - stackdriver.Options( - project_id=1, default_monitoring_labels=default_labels)) - - self.assertEqual(exporter_created.default_labels, default_labels) - def test_get_task_value(self): task_value = stackdriver.get_task_value() self.assertNotEqual(task_value, "") - def test_set_default_labels(self): - labels = {'key': 'value'} - exporter = stackdriver.StackdriverStatsExporter() - exporter.set_default_labels(labels) - self.assertEqual(exporter.default_labels, labels) - def test_new_label_descriptors(self): defaults = {'key1': 'value1'} keys = [FRONTEND_KEY] output = stackdriver.new_label_descriptors(defaults, keys) self.assertEqual(len(output), 3) - def test_namespacedviews(self): + def test_namespaced_views(self): view_name = "view-1" expected_view_name_namespaced = ( "custom.googleapis.com/opencensus/{}".format(view_name)) @@ -214,80 +195,50 @@ def test_namespacedviews(self): view_name, "kubernetes.io/myorg") self.assertEqual(expected_view_name_namespaced, view_name_namespaced) - def test_on_register_view(self): - client = mock.Mock() - view_none = None - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.on_register_view(VIDEO_SIZE_VIEW) - exporter.on_register_view(view_none) - self.assertTrue(client.create_metric_descriptor.called) + def test_stackdriver_register_exporter(self): + stats = stats_module.Stats() + view_manager = stats.view_manager - @mock.patch('opencensus.ext.stackdriver.stats_exporter.' - 'monitored_resource.get_instance', - return_value=None) - def test_emit(self, monitor_resource_mock): - client = mock.Mock() - v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - v_data.record(context=tag_map_module.TagMap(), value=2, - timestamp=None) - view_data = [v_data] - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.emit(view_data) - exporter.emit(None) - self.assertTrue(client.create_time_series.called) + exporter = mock.Mock() + if len(view_manager.measure_to_view_map.exporters) > 0: + view_manager.unregister_exporter( + view_manager.measure_to_view_map.exporters[0]) + view_manager.register_exporter(exporter) - def test_export_no_data(self): - client = mock.Mock() - transport = mock.Mock() - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client, transport=transport) - exporter.export(None) - self.assertFalse(exporter.transport.export.called) + registered_exporters = len(view_manager.measure_to_view_map.exporters) - def test_export_with_data(self): - client = mock.Mock() - transport = mock.Mock() - v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - view_data = [v_data] - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client, transport=transport) - exporter.export(view_data) - self.assertTrue(exporter.transport.export.called) + self.assertEqual(registered_exporters, 1) - def test_handle_upload_no_data(self): - client = mock.Mock() - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.handle_upload(None) - self.assertFalse(client.create_time_series.called) + def test_set_metric_labels(self): + series = monitoring_v3.types.TimeSeries() + tag_value = tag_value_module.TagValue("1200") + stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [tag_value]) + self.assertEqual(len(series.metric.labels), 2) - @mock.patch('opencensus.ext.stackdriver.stats_exporter.' - 'monitored_resource.get_instance', - return_value=None) - def test_handle_upload_with_data(self, monitor_resource_mock): - client = mock.Mock() - v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) - v_data.record(context=tag_map_module.TagMap(), value=2, - timestamp=None) - view_data = [v_data] - option = stackdriver.Options(project_id="project-test") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - exporter.handle_upload(view_data) - self.assertTrue(client.create_time_series.called) + def test_set_metric_labels_with_None(self): + series = monitoring_v3.types.TimeSeries() + stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [None]) + self.assertEqual(len(series.metric.labels), 1) - def assertCorrectLabels(self, actual_labels, expected_labels, - include_opencensus=False): + @mock.patch('os.getpid', return_value=12345) + @mock.patch('platform.uname', return_value=('system', 'node', 'release', + 'version', 'machine', + 'processor')) + def test_get_task_value_with_hostname(self, mock_uname, mock_pid): + self.assertEqual(stackdriver.get_task_value(), "py-12345@node") + + @mock.patch('os.getpid', return_value=12345) + @mock.patch('platform.uname', return_value=('system', '', 'release', + 'version', 'machine', + 'processor')) + def test_get_task_value_without_hostname(self, mock_uname, mock_pid): + self.assertEqual(stackdriver.get_task_value(), "py-12345@localhost") + + +class TestCreateTimeseries(unittest.TestCase): + + def check_labels(self, actual_labels, expected_labels, + include_opencensus=False): actual_labels = dict(actual_labels) if include_opencensus: opencensus_tag = actual_labels.pop(stackdriver.OPENCENSUS_TASK) @@ -301,7 +252,7 @@ def assertCorrectLabels(self, actual_labels, expected_labels, def test_create_batched_time_series(self, monitor_resource_mock): client = mock.Mock() v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME, end_time=TEST_TIME) + view=VIDEO_SIZE_VIEW, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) v_data.record(context=tag_map_module.TagMap(), value=2, timestamp=None) view_data = [v_data] @@ -310,6 +261,9 @@ def test_create_batched_time_series(self, monitor_resource_mock): exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) + view_data = [metric_utils.view_data_to_metric( + view_data[0], TEST_TIME)] + time_series_batches = exporter.create_batched_time_series(view_data, 1) self.assertEqual(len(time_series_batches), 1) @@ -319,7 +273,7 @@ def test_create_batched_time_series(self, monitor_resource_mock): self.assertEqual( time_series.metric.type, 'custom.googleapis.com/opencensus/' + VIDEO_SIZE_VIEW_NAME) - self.assertCorrectLabels(time_series.metric.labels, {}, + self.check_labels(time_series.metric.labels, {}, include_opencensus=True) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' @@ -334,7 +288,7 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): ['test'], VIDEO_SIZE_MEASURE, aggregation_module.LastValueAggregation()) v_data1 = view_data_module.ViewData( - view=view1, start_time=TEST_TIME, end_time=TEST_TIME) + view=view1, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) v_data1.record(context=tag_map_module.TagMap({'test': '1'}), value=7, timestamp=None) v_data1.record(context=tag_map_module.TagMap({'test': '2'}), value=5, @@ -348,13 +302,15 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): ['test'], VIDEO_SIZE_MEASURE, aggregation_module.LastValueAggregation()) v_data2 = view_data_module.ViewData( - view=view2, start_time=TEST_TIME, end_time=TEST_TIME) + view=view2, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) v_data2.record(context=tag_map_module.TagMap({'test': '1'}), value=7, timestamp=None) v_data2.record(context=tag_map_module.TagMap({'test': '2'}), value=5, timestamp=None) view_data = [v_data1, v_data2] + view_data = [metric_utils.view_data_to_metric(vd, TEST_TIME) + for vd in view_data] option = stackdriver.Options(project_id="project-test") exporter = stackdriver.StackdriverStatsExporter( @@ -368,19 +324,26 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): self.assertEqual(len(tsb2), 2) self.assertEqual(len(tsb3), 1) - def test_stackdriver_register_exporter(self): + def setup_create_timeseries_test(self): + client = mock.Mock() + execution_context.clear() + + option = stackdriver.Options( + project_id="project-test", resource="global") + exporter = stackdriver.StackdriverStatsExporter( + options=option, client=client) + stats = stats_module.Stats() view_manager = stats.view_manager + stats_recorder = stats.stats_recorder - exporter = mock.Mock() if len(view_manager.measure_to_view_map.exporters) > 0: view_manager.unregister_exporter( view_manager.measure_to_view_map.exporters[0]) - view_manager.register_exporter(exporter) - registered_exporters = len(view_manager.measure_to_view_map.exporters) + view_manager.register_exporter(exporter) + return view_manager, stats_recorder, exporter - self.assertEqual(registered_exporters, 1) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -402,7 +365,9 @@ def test_create_timeseries(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view( VIDEO_SIZE_VIEW_NAME, None) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] @@ -410,7 +375,7 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual( time_series_list[0].metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -418,16 +383,13 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual(len(time_series.points), 1) value = time_series.points[0].value self.assertEqual(value.distribution_value.count, 1) - self.assertEqual(value.distribution_value.mean, 25 * MiB) + # self.assertEqual(value.distribution_value.mean, 25 * MiB) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/my.org/views/video_size_test2") - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -435,14 +397,29 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual(len(time_series.points), 1) value = time_series.points[0].value self.assertEqual(value.distribution_value.count, 1) - self.assertEqual(value.distribution_value.mean, 25 * MiB) + # self.assertEqual(value.distribution_value.mean, 25 * MiB) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance') def test_create_timeseries_with_resource(self, monitor_resource_mock): - view_manager, stats_recorder, exporter = \ - self.setup_create_timeseries_test() + client = mock.Mock() + execution_context.clear() + + option = stackdriver.Options( + project_id="project-test", resource="") + exporter = stackdriver.StackdriverStatsExporter( + options=option, client=client) + + stats = stats_module.Stats() + view_manager = stats.view_manager + stats_recorder = stats.stats_recorder + + if len(view_manager.measure_to_view_map.exporters) > 0: + view_manager.unregister_exporter( + view_manager.measure_to_view_map.exporters[0]) + + view_manager.register_exporter(exporter) view_manager.register_view(VIDEO_SIZE_VIEW) tag_value = tag_value_module.TagValue("1200") @@ -456,6 +433,8 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view( VIDEO_SIZE_VIEW_NAME, None) + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + # check for gce_instance monitored resource mocked_labels = { 'instance_id': 'my-instance', @@ -470,11 +449,11 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mocked_labels monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "gce_instance") - self.assertCorrectLabels(time_series.resource.labels, { + self.check_labels(time_series.resource.labels, { 'instance_id': 'my-instance', 'project_id': 'my-project', 'zone': 'us-east1', @@ -484,12 +463,10 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): "custom.googleapis.com/opencensus/my.org/views/video_size_test2") self.assertIsNotNone(time_series) - time_series_list = exporter.create_time_series_list( - v_data, "global", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.resource.type, "global") - self.assertCorrectLabels(time_series.resource.labels, {}) + self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -509,11 +486,11 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mocked_labels monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "k8s_container") - self.assertCorrectLabels(time_series.resource.labels, { + self.check_labels(time_series.resource.labels, { 'project_id': 'my-project', 'location': 'us-east1', 'cluster_name': 'cluster', @@ -537,11 +514,11 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mocked_labels monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "aws_ec2_instance") - self.assertCorrectLabels(time_series.resource.labels, { + self.check_labels(time_series.resource.labels, { 'instance_id': 'my-instance', 'aws_account': 'my-project', 'region': 'aws:us-east1', @@ -557,11 +534,11 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): mock_resource.get_labels.return_value = mock.Mock() monitor_resource_mock.return_value = mock_resource - time_series_list = exporter.create_time_series_list(v_data, "", "") + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, 'global') - self.assertCorrectLabels(time_series.resource.labels, {}) + self.check_labels(time_series.resource.labels, {}) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -592,13 +569,13 @@ def test_create_timeseries_str_tagvalue(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view(view_name1, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg/") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/view-name1") - self.assertCorrectLabels(time_series.metric.labels, + + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_INT_CLEAN: "Abc"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -634,13 +611,12 @@ def test_create_timeseries_str_tagvalue_count_aggregtation( v_data = measure_map.measure_to_view_map.get_view(view_name1, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg/") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/view-name1") - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_INT_CLEAN: "Abc"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -676,13 +652,12 @@ def test_create_timeseries_last_value_float_tagvalue( v_data = measure_map.measure_to_view_map.get_view(view_name2, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "kubernetes.io/myorg") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.assertEqual(time_series.metric.type, - "kubernetes.io/myorg/view-name2") - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_FLOAT_CLEAN: "Abc"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -731,13 +706,14 @@ def test_create_timeseries_float_tagvalue(self, monitor_resource_mock): v_data = measure_map.measure_to_view_map.get_view(view_name3, None) - time_series_list = exporter.create_time_series_list( - v_data, "global", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list self.assertEqual(time_series.metric.type, "custom.googleapis.com/opencensus/view-name3") - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_FLOAT_CLEAN: "1200"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -774,7 +750,9 @@ def test_create_timeseries_multiple_tag_values(self, v_data = measure_map.measure_to_view_map.get_view( VIDEO_SIZE_VIEW_NAME, None) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 2) ts_by_frontend = {ts.metric.labels.get(FRONTEND_KEY_CLEAN): ts @@ -793,7 +771,7 @@ def test_create_timeseries_multiple_tag_values(self, self.assertEqual(len(ts1.points), 1) value1 = ts1.points[0].value self.assertEqual(value1.distribution_value.count, 1) - self.assertEqual(value1.distribution_value.mean, 25 * MiB) + # self.assertEqual(value1.distribution_value.mean, 25 * MiB) # Verify second time series self.assertEqual(ts2.resource.type, "global") @@ -805,7 +783,7 @@ def test_create_timeseries_multiple_tag_values(self, self.assertEqual(len(ts2.points), 1) value2 = ts2.points[0].value self.assertEqual(value2.distribution_value.count, 1) - self.assertEqual(value2.distribution_value.mean, 12 * MiB) + # self.assertEqual(value2.distribution_value.mean, 12 * MiB) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -833,7 +811,9 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): v_data = measure_map.measure_to_view_map.get_view(view_name, None) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list @@ -842,7 +822,7 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): self.assertEqual(time_series.resource.type, "global") self.assertEqual(time_series.metric.type, "custom.googleapis.com/opencensus/" + view_name) - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, include_opencensus=True) self.assertIsNotNone(time_series.resource) @@ -852,36 +832,25 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): expected_value.int64_value = 25 * MiB self.assertEqual(time_series.points[0].value, expected_value) - def setup_create_timeseries_test(self): - client = mock.Mock() - execution_context.clear() - - option = stackdriver.Options( - project_id="project-test", resource="global") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - - stats = stats_module.Stats() - view_manager = stats.view_manager - stats_recorder = stats.stats_recorder - - if len(view_manager.measure_to_view_map.exporters) > 0: - view_manager.unregister_exporter( - view_manager.measure_to_view_map.exporters[0]) - - view_manager.register_exporter(exporter) - return view_manager, stats_recorder, exporter def test_create_timeseries_from_distribution(self): """Check for explicit 0-bound bucket for SD export.""" + agg = aggregation_module.DistributionAggregation( + aggregation_type=aggregation_module.Type.DISTRIBUTION) + + view = view_module.View( + name="example.org/test_view", + description="example.org/test_view", + columns=['tag_key'], + measure=mock.Mock(), + aggregation=agg, + ) - v_data = mock.Mock(spec=view_data_module.ViewData) - v_data.view.name = "example.org/test_view" - v_data.view.columns = ['tag_key'] - v_data.view.aggregation.aggregation_type = \ - aggregation_module.Type.DISTRIBUTION - v_data.start_time = TEST_TIME - v_data.end_time = TEST_TIME + v_data = view_data_module.ViewData( + view=view, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR, + ) # Aggregation over (10 * range(10)) for buckets [2, 4, 6, 8] dad = aggregation_data_module.DistributionAggregationData( @@ -892,57 +861,63 @@ def test_create_timeseries_from_distribution(self): bounds=[2, 4, 6, 8], exemplars={mock.Mock() for ii in range(5)} ) - v_data.tag_value_aggregation_data_map = {('tag_value',): dad} + v_data._tag_value_aggregation_data_map = {('tag_value',): dad} - exporter = stackdriver.StackdriverStatsExporter( - options=mock.Mock(), - client=mock.Mock(), - ) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + exporter = stackdriver.StackdriverStatsExporter() + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list - self.assertCorrectLabels(time_series.metric.labels, + self.check_labels(time_series.metric.labels, {'tag_key': 'tag_value'}, include_opencensus=True) self.assertEqual(len(time_series.points), 1) [point] = time_series.points dv = point.value.distribution_value self.assertEqual(100, dv.count) - self.assertEqual(4.5, dv.mean) + # self.assertEqual(4.5, dv.mean) self.assertEqual(825.0, dv.sum_of_squared_deviation) self.assertEqual([0, 20, 20, 20, 20, 20], dv.bucket_counts) self.assertEqual([0, 2, 4, 6, 8], dv.bucket_options.explicit_buckets.bounds) - def test_create_timeseries_something(self): + def test_create_timeseries_multiple_tags(self): """Check that exporter creates timeseries for multiple tag values. create_time_series_list should return a time series for each set of values in the tag value aggregation map. """ + agg = aggregation_module.CountAggregation( + aggregation_type=aggregation_module.Type.COUNT) + + view = view_module.View( + name="example.org/test_view", + description="example.org/test_view", + columns=[tag_key_module.TagKey('color'), + tag_key_module.TagKey('shape')], + measure=mock.Mock(), + aggregation=agg, + ) - v_data = mock.Mock(spec=view_data_module.ViewData) - v_data.view.name = "example.org/test_view" - v_data.view.columns = [tag_key_module.TagKey('color'), - tag_key_module.TagKey('shape')] - v_data.view.aggregation.aggregation_type = \ - aggregation_module.Type.COUNT - v_data.start_time = TEST_TIME - v_data.end_time = TEST_TIME + v_data = view_data_module.ViewData( + view=view, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR, + ) rs_count = aggregation_data_module.CountAggregationData(10) bc_count = aggregation_data_module.CountAggregationData(20) - v_data.tag_value_aggregation_data_map = { + v_data._tag_value_aggregation_data_map = { ('red', 'square'): rs_count, ('blue', 'circle'): bc_count, } - exporter = stackdriver.StackdriverStatsExporter( - options=mock.Mock(), - client=mock.Mock(), - ) - time_series_list = exporter.create_time_series_list(v_data, "", "") + v_data = metric_utils.view_data_to_metric(v_data, TEST_TIME) + + exporter = stackdriver.StackdriverStatsExporter() + time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 2) self.assertEqual(len(time_series_list[0].points), 1) @@ -963,8 +938,8 @@ def test_create_timeseries_invalid_aggregation(self): v_data.view.columns = [tag_key_module.TagKey('base_key')] v_data.view.aggregation.aggregation_type = \ aggregation_module.Type.NONE - v_data.start_time = TEST_TIME - v_data.end_time = TEST_TIME + v_data.start_time = TEST_TIME_STR + v_data.end_time = TEST_TIME_STR base_data = aggregation_data_module.BaseAggregationData(10) v_data.tag_value_aggregation_data_map = { @@ -975,128 +950,5 @@ def test_create_timeseries_invalid_aggregation(self): options=mock.Mock(), client=mock.Mock(), ) - self.assertRaises(TypeError, exporter.create_time_series_list, - v_data, "", "") - - def test_create_metric_descriptor_count(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - view_name_count = "view-count" - agg_count = aggregation_module.CountAggregation(count=2) - view_count = view_module.View( - view_name_count, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE, agg_count) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_count) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_sum_int(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_sum_int = "view-sum-int" - agg_sum = aggregation_module.SumAggregation(sum=2) - view_sum_int = view_module.View( - view_name_sum_int, "processed video size over time", - [FRONTEND_KEY], VIDEO_SIZE_MEASURE, agg_sum) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_sum_int) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_sum_float(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_sum_float = "view-sum-float" - agg_sum = aggregation_module.SumAggregation(sum=2) - view_sum_float = view_module.View( - view_name_sum_float, "processed video size over time", - [FRONTEND_KEY_FLOAT], VIDEO_SIZE_MEASURE_FLOAT, agg_sum) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_sum_float) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(VIDEO_SIZE_VIEW) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_last_value_int(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_base = "view-base" - agg_base = aggregation_module.LastValueAggregation() - view_base = view_module.View( - view_name_base, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE, agg_base) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_base) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_last_value_float(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_base = "view-base" - agg_base = aggregation_module.LastValueAggregation() - view_base = view_module.View( - view_name_base, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE_FLOAT, agg_base) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - desc = exporter.create_metric_descriptor(view_base) - self.assertIsNotNone(desc) - - def test_create_metric_descriptor_base(self): - client = mock.Mock() - option = stackdriver.Options( - project_id="project-test", metric_prefix="teste") - - view_name_base = "view-base" - agg_base = aggregation_module.BaseAggregation() - view_base = view_module.View( - view_name_base, "processed video size over time", [FRONTEND_KEY], - VIDEO_SIZE_MEASURE, agg_base) - exporter = stackdriver.StackdriverStatsExporter( - options=option, client=client) - self.assertRaises(Exception, exporter.create_metric_descriptor, - view_base) - - def test_set_metric_labels(self): - series = monitoring_v3.types.TimeSeries() - tag_value = tag_value_module.TagValue("1200") - stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [tag_value]) - self.assertEqual(len(series.metric.labels), 2) - - def test_set_metric_labels_with_None(self): - series = monitoring_v3.types.TimeSeries() - stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [None]) - self.assertEqual(len(series.metric.labels), 1) - - @mock.patch('os.getpid', return_value=12345) - @mock.patch('platform.uname', return_value=('system', 'node', 'release', - 'version', 'machine', - 'processor')) - def test_get_task_value_with_hostname(self, mock_uname, mock_pid): - self.assertEqual(stackdriver.get_task_value(), "py-12345@node") - - @mock.patch('os.getpid', return_value=12345) - @mock.patch('platform.uname', return_value=('system', '', 'release', - 'version', 'machine', - 'processor')) - def test_get_task_value_without_hostname(self, mock_uname, mock_pid): - self.assertEqual(stackdriver.get_task_value(), "py-12345@localhost") + self.assertRaises(TypeError, exporter.create_time_series_list, v_data, + "", "") From b8c4bc7762dda2d5d65e1834bc3f8dd358f3262b Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 13:36:34 -0700 Subject: [PATCH 09/21] Fix test formatting --- .../tests/test_stackdriver_stats.py | 140 +++++++++--------- 1 file changed, 73 insertions(+), 67 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index ec17eecd2..cdf0ea8b6 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -103,7 +103,8 @@ def test_constructor_param(self): default_labels = {'key1': 'value1'} exporter = stackdriver.StackdriverStatsExporter( options=stackdriver.Options( - project_id=project_id, default_monitoring_labels=default_labels)) + project_id=project_id, + default_monitoring_labels=default_labels)) self.assertEqual(exporter.options.project_id, project_id) def test_blank_project(self): @@ -221,23 +222,26 @@ def test_set_metric_labels_with_None(self): self.assertEqual(len(series.metric.labels), 1) @mock.patch('os.getpid', return_value=12345) - @mock.patch('platform.uname', return_value=('system', 'node', 'release', - 'version', 'machine', - 'processor')) + @mock.patch( + 'platform.uname', + return_value=('system', 'node', 'release', 'version', 'machine', + 'processor')) def test_get_task_value_with_hostname(self, mock_uname, mock_pid): self.assertEqual(stackdriver.get_task_value(), "py-12345@node") @mock.patch('os.getpid', return_value=12345) - @mock.patch('platform.uname', return_value=('system', '', 'release', - 'version', 'machine', - 'processor')) + @mock.patch( + 'platform.uname', + return_value=('system', '', 'release', 'version', 'machine', + 'processor')) def test_get_task_value_without_hostname(self, mock_uname, mock_pid): self.assertEqual(stackdriver.get_task_value(), "py-12345@localhost") class TestCreateTimeseries(unittest.TestCase): - - def check_labels(self, actual_labels, expected_labels, + def check_labels(self, + actual_labels, + expected_labels, include_opencensus=False): actual_labels = dict(actual_labels) if include_opencensus: @@ -252,17 +256,17 @@ def check_labels(self, actual_labels, expected_labels, def test_create_batched_time_series(self, monitor_resource_mock): client = mock.Mock() v_data = view_data_module.ViewData( - view=VIDEO_SIZE_VIEW, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) - v_data.record(context=tag_map_module.TagMap(), value=2, - timestamp=None) + view=VIDEO_SIZE_VIEW, + start_time=TEST_TIME_STR, + end_time=TEST_TIME_STR) + v_data.record(context=tag_map_module.TagMap(), value=2, timestamp=None) view_data = [v_data] option = stackdriver.Options(project_id="project-test") exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) - view_data = [metric_utils.view_data_to_metric( - view_data[0], TEST_TIME)] + view_data = [metric_utils.view_data_to_metric(view_data[0], TEST_TIME)] time_series_batches = exporter.create_batched_time_series(view_data, 1) @@ -273,8 +277,8 @@ def test_create_batched_time_series(self, monitor_resource_mock): self.assertEqual( time_series.metric.type, 'custom.googleapis.com/opencensus/' + VIDEO_SIZE_VIEW_NAME) - self.check_labels(time_series.metric.labels, {}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {}, include_opencensus=True) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -284,8 +288,8 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): # First view with 3 view_name1 = "view-name1" - view1 = view_module.View(view_name1, "test description", - ['test'], VIDEO_SIZE_MEASURE, + view1 = view_module.View(view_name1, "test description", ['test'], + VIDEO_SIZE_MEASURE, aggregation_module.LastValueAggregation()) v_data1 = view_data_module.ViewData( view=view1, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) @@ -298,8 +302,8 @@ def test_create_batched_time_series_with_many(self, monitor_resource_mock): # Second view with 2 view_name2 = "view-name2" - view2 = view_module.View(view_name2, "test description", - ['test'], VIDEO_SIZE_MEASURE, + view2 = view_module.View(view_name2, "test description", ['test'], + VIDEO_SIZE_MEASURE, aggregation_module.LastValueAggregation()) v_data2 = view_data_module.ViewData( view=view2, start_time=TEST_TIME_STR, end_time=TEST_TIME_STR) @@ -344,7 +348,6 @@ def setup_create_timeseries_test(self): view_manager.register_exporter(exporter) return view_manager, stats_recorder, exporter - @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', return_value=None) @@ -375,9 +378,9 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual( time_series_list[0].metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -389,9 +392,9 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -406,8 +409,7 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): client = mock.Mock() execution_context.clear() - option = stackdriver.Options( - project_id="project-test", resource="") + option = stackdriver.Options(project_id="project-test", resource="") exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) @@ -453,11 +455,12 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "gce_instance") - self.check_labels(time_series.resource.labels, { - 'instance_id': 'my-instance', - 'project_id': 'my-project', - 'zone': 'us-east1', - }) + self.check_labels( + time_series.resource.labels, { + 'instance_id': 'my-instance', + 'project_id': 'my-project', + 'zone': 'us-east1', + }) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -490,13 +493,14 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "k8s_container") - self.check_labels(time_series.resource.labels, { - 'project_id': 'my-project', - 'location': 'us-east1', - 'cluster_name': 'cluster', - 'pod_name': 'localhost', - 'namespace_name': 'namespace', - }) + self.check_labels( + time_series.resource.labels, { + 'project_id': 'my-project', + 'location': 'us-east1', + 'cluster_name': 'cluster', + 'pod_name': 'localhost', + 'namespace_name': 'namespace', + }) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -518,11 +522,12 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] self.assertEqual(time_series.resource.type, "aws_ec2_instance") - self.check_labels(time_series.resource.labels, { - 'instance_id': 'my-instance', - 'aws_account': 'my-project', - 'region': 'aws:us-east1', - }) + self.check_labels( + time_series.resource.labels, { + 'instance_id': 'my-instance', + 'aws_account': 'my-project', + 'region': 'aws:us-east1', + }) self.assertEqual( time_series.metric.type, "custom.googleapis.com/opencensus/my.org/views/video_size_test2") @@ -575,9 +580,9 @@ def test_create_timeseries_str_tagvalue(self, monitor_resource_mock): self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_INT_CLEAN: "Abc"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_INT_CLEAN: "Abc"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -616,9 +621,9 @@ def test_create_timeseries_str_tagvalue_count_aggregtation( time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_INT_CLEAN: "Abc"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_INT_CLEAN: "Abc"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -657,9 +662,9 @@ def test_create_timeseries_last_value_float_tagvalue( time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 1) time_series = time_series_list[0] - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_FLOAT_CLEAN: "Abc"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_FLOAT_CLEAN: "Abc"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -713,9 +718,9 @@ def test_create_timeseries_float_tagvalue(self, monitor_resource_mock): [time_series] = time_series_list self.assertEqual(time_series.metric.type, "custom.googleapis.com/opencensus/view-name3") - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_FLOAT_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_FLOAT_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -755,8 +760,10 @@ def test_create_timeseries_multiple_tag_values(self, time_series_list = exporter.create_time_series_list(v_data) self.assertEqual(len(time_series_list), 2) - ts_by_frontend = {ts.metric.labels.get(FRONTEND_KEY_CLEAN): ts - for ts in time_series_list} + ts_by_frontend = { + ts.metric.labels.get(FRONTEND_KEY_CLEAN): ts + for ts in time_series_list + } self.assertEqual(set(ts_by_frontend.keys()), {"1200", "1400"}) ts1 = ts_by_frontend["1200"] ts2 = ts_by_frontend["1400"] @@ -822,9 +829,9 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): self.assertEqual(time_series.resource.type, "global") self.assertEqual(time_series.metric.type, "custom.googleapis.com/opencensus/" + view_name) - self.check_labels(time_series.metric.labels, - {FRONTEND_KEY_CLEAN: "1200"}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {FRONTEND_KEY_CLEAN: "1200"}, + include_opencensus=True) self.assertIsNotNone(time_series.resource) self.assertEqual(len(time_series.points), 1) @@ -832,7 +839,6 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): expected_value.int64_value = 25 * MiB self.assertEqual(time_series.points[0].value, expected_value) - def test_create_timeseries_from_distribution(self): """Check for explicit 0-bound bucket for SD export.""" agg = aggregation_module.DistributionAggregation( @@ -870,9 +876,9 @@ def test_create_timeseries_from_distribution(self): self.assertEqual(len(time_series_list), 1) [time_series] = time_series_list - self.check_labels(time_series.metric.labels, - {'tag_key': 'tag_value'}, - include_opencensus=True) + self.check_labels( + time_series.metric.labels, {'tag_key': 'tag_value'}, + include_opencensus=True) self.assertEqual(len(time_series.points), 1) [point] = time_series.points dv = point.value.distribution_value From dfef4eb6aef240e2e431768129914fdff5cbf809 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 13:42:30 -0700 Subject: [PATCH 10/21] Lint fixes --- .../ext/stackdriver/stats_exporter/__init__.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 6ccdb04ba..d659fcc61 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -26,13 +26,9 @@ from opencensus.common import utils from opencensus.common.monitored_resource import monitored_resource -from opencensus.common.transports import async_ from opencensus.common.version import __version__ from opencensus.metrics.export import metric as metric_module from opencensus.metrics.export import metric_descriptor -from opencensus.stats import aggregation -from opencensus.stats import base_exporter -from opencensus.stats import measure MAX_TIME_SERIES_PER_UPLOAD = 200 @@ -190,12 +186,13 @@ def _convert_series(self, metric, ts): def _convert_point(self, metric, ts, point, sd_point): """Convert an OC metric point to a SD point.""" - if (metric.descriptor.type == - metric_descriptor.MetricDescriptorType.CUMULATIVE_DISTRIBUTION): + if (metric.descriptor.type == metric_descriptor.MetricDescriptorType + .CUMULATIVE_DISTRIBUTION): sd_dist_val = sd_point.value.distribution_value sd_dist_val.count = point.value.count - sd_dist_val.sum_of_squared_deviation = point.value.sum_of_squared_deviation + sd_dist_val.sum_of_squared_deviation =\ + point.value.sum_of_squared_deviation assert sd_dist_val.bucket_options.explicit_buckets.bounds == [] sd_dist_val.bucket_options.explicit_buckets.bounds.extend( @@ -295,7 +292,8 @@ def register_metric_descriptor(self, oc_md): return self._md_cache[descriptor_type] except KeyError: descriptor = self.get_metric_descriptor(oc_md) - project_name = self.client.project_path(self.options.project_id) + project_name =\ + self.client.project_path(self.options.project_id) sd_md = self.client.create_metric_descriptor( project_name, descriptor) self._md_cache[descriptor_type] = sd_md From 66850792059ba34519a63049abd57638008662f3 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 15:34:13 -0700 Subject: [PATCH 11/21] Use float measures for lv aggregations in test --- .../tests/test_stackdriver_stats.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index cdf0ea8b6..05f8d41b9 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -44,9 +44,9 @@ FRONTEND_KEY_INT_CLEAN = "my_org_keys_frontend_INT" FRONTEND_KEY_STR_CLEAN = "my_org_keys_frontend_STR" -VIDEO_SIZE_MEASURE = measure_module.MeasureInt( +VIDEO_SIZE_MEASURE = measure_module.MeasureFloat( "my.org/measure/video_size_test2", "size of processed videos", "By") -VIDEO_SIZE_MEASURE_2 = measure_module.MeasureInt( +VIDEO_SIZE_MEASURE_2 = measure_module.MeasureFloat( "my.org/measure/video_size_test_2", "size of processed videos", "By") VIDEO_SIZE_MEASURE_FLOAT = measure_module.MeasureFloat( @@ -587,7 +587,8 @@ def test_create_timeseries_str_tagvalue(self, monitor_resource_mock): self.assertEqual(len(time_series.points), 1) expected_value = monitoring_v3.types.TypedValue() - expected_value.int64_value = 25 * MiB + # TODO: #565 + expected_value.double_value = 25.0 * MiB self.assertEqual(time_series.points[0].value, expected_value) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' @@ -836,7 +837,8 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock): self.assertEqual(len(time_series.points), 1) expected_value = monitoring_v3.types.TypedValue() - expected_value.int64_value = 25 * MiB + # TODO: #565 + expected_value.double_value = 25.0 * MiB self.assertEqual(time_series.points[0].value, expected_value) def test_create_timeseries_from_distribution(self): From a98553ec1628fa88482dd1f29449ccb5e101c084 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 15:43:55 -0700 Subject: [PATCH 12/21] Add TODO note for summary metric conversion --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index d659fcc61..f7c2ec584 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -222,6 +222,7 @@ def _convert_point(self, metric, ts, point, sd_point): metric_descriptor.MetricDescriptorType.GAUGE_DOUBLE): sd_point.value.double_value = float(point.value.value) + # TODO: handle SUMMARY metrics, #567 else: md_type_name = metric_descriptor.MetricDescriptorType.to_name( metric.descriptor.type) From 5a0e7e08b905f53036fbbe226b177c7c6d6fad16 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 23:29:05 -0700 Subject: [PATCH 13/21] Register metric descriptor on export --- .../stackdriver/stats_exporter/__init__.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index f7c2ec584..03f4207c5 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -147,6 +147,8 @@ def client(self): return self._client def export_metrics(self, metrics): + for metric in metrics: + self.register_metric_descriptor(metric.descriptor) ts_batches = self.create_batched_time_series(metrics) for ts_batch in ts_batches: self.client.create_time_series( @@ -289,16 +291,15 @@ def register_metric_descriptor(self, oc_md): """Register a metric descriptor with stackdriver.""" descriptor_type = self.get_descriptor_type(oc_md) with self._md_lock: - try: + if descriptor_type in self._md_cache: return self._md_cache[descriptor_type] - except KeyError: - descriptor = self.get_metric_descriptor(oc_md) - project_name =\ - self.client.project_path(self.options.project_id) - sd_md = self.client.create_metric_descriptor( - project_name, descriptor) - self._md_cache[descriptor_type] = sd_md - return sd_md + + descriptor = self.get_metric_descriptor(oc_md) + project_name = self.client.project_path(self.options.project_id) + sd_md = self.client.create_metric_descriptor(project_name, descriptor) + with self._md_lock: + self._md_cache[descriptor_type] = sd_md + return sd_md def set_monitored_resource(series, option_resource_type): From 7afc2d0cbdc1b7ea1d43ee40ecc57f9d6064a73b Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 23:58:24 -0700 Subject: [PATCH 14/21] Update SDSE system test --- .../ext/stackdriver/stats_exporter/__init__.py | 3 +-- .../system/stats/stackdriver/stackdriver_stats_test.py | 10 ++++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 03f4207c5..2c6c4a67d 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -246,8 +246,7 @@ def _convert_point(self, metric, ts, point, sd_point): start_time = sd_point.interval.start_time start_time.seconds = int(timestamp_start) - start_secs = start_time.seconds - start_time.nanos = int((timestamp_start - start_secs) * 1e9) + start_time.nanos = int((timestamp_start - start_time.seconds) * 1e9) def get_descriptor_type(self, oc_md): """Get a SD descriptor type for an OC metric descriptor.""" diff --git a/tests/system/stats/stackdriver/stackdriver_stats_test.py b/tests/system/stats/stackdriver/stackdriver_stats_test.py index 1b6f8d09c..ab7073a08 100644 --- a/tests/system/stats/stackdriver/stackdriver_stats_test.py +++ b/tests/system/stats/stackdriver/stackdriver_stats_test.py @@ -16,7 +16,10 @@ import random import time import unittest + +from google.cloud import monitoring_v3 from retrying import retry + from opencensus.ext.stackdriver import stats_exporter as stackdriver from opencensus.stats import aggregation as aggregation_module from opencensus.stats import measure as measure_module @@ -25,8 +28,6 @@ from opencensus.tags import tag_key as tag_key_module from opencensus.tags import tag_map as tag_map_module from opencensus.tags import tag_value as tag_value_module -from opencensus.common.transports import sync -from google.cloud import monitoring_v3 MiB = 1 << 20 @@ -63,8 +64,7 @@ def test_stats_record_sync(self): client = monitoring_v3.MetricServiceClient() exporter = stackdriver.StackdriverStatsExporter( options=stackdriver.Options(project_id=PROJECT), - client=client, - transport=sync.SyncTransport) + client=client) view_manager.register_exporter(exporter) # Register view. @@ -82,6 +82,7 @@ def test_stats_record_sync(self): measure_map.measure_int_put(VIDEO_SIZE_MEASURE, 25 * MiB) measure_map.record(tag_map) + exporter.export_metrics(stats_module.Stats().get_metrics()) # Sleep for [0, 10] milliseconds to fake wait. time.sleep(random.randint(1, 10) / 1000.0) @@ -147,6 +148,7 @@ def test_stats_record_async(self): measure_map.measure_int_put(VIDEO_SIZE_MEASURE_ASYNC, 25 * MiB) measure_map.record(tag_map) + exporter.export_metrics(stats_module.Stats().get_metrics()) @retry( wait_fixed=RETRY_WAIT_PERIOD, From 64f9ac286cd33186735defcb3b5952a2e83837be Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 19 Mar 2019 23:33:18 -0700 Subject: [PATCH 15/21] s/now/utcnow/ --- .../opencensus/ext/ocagent/trace_exporter/__init__.py | 2 +- opencensus/metrics/export/gauge.py | 2 +- opencensus/stats/stats.py | 2 +- tests/unit/trace/test_blank_span.py | 2 +- tests/unit/trace/test_span.py | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py index 988f2d828..ccbef67e8 100644 --- a/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py +++ b/contrib/opencensus-ext-ocagent/opencensus/ext/ocagent/trace_exporter/__init__.py @@ -84,7 +84,7 @@ def __init__( else host_name, pid=os.getpid(), start_timestamp=utils.proto_ts_from_datetime( - datetime.datetime.now()) + datetime.datetime.utcnow()) ), library_info=common_pb2.LibraryInfo( language=common_pb2.LibraryInfo.Language.Value('PYTHON'), diff --git a/opencensus/metrics/export/gauge.py b/opencensus/metrics/export/gauge.py index 1788e9aca..5352b0e6c 100644 --- a/opencensus/metrics/export/gauge.py +++ b/opencensus/metrics/export/gauge.py @@ -481,7 +481,7 @@ def get_metrics(self): :rtype: set(:class:`opencensus.metrics.export.metric.Metric`) :return: A set of `Metric`s, one for each registered gauge. """ - now = datetime.now() + now = datetime.utcnow() metrics = set() for gauge in self.gauges.values(): metrics.add(gauge.get_metric(now)) diff --git a/opencensus/stats/stats.py b/opencensus/stats/stats.py index a7cc49940..df1b27ee9 100644 --- a/opencensus/stats/stats.py +++ b/opencensus/stats/stats.py @@ -37,4 +37,4 @@ def get_metrics(self): :rtype: Iterator[:class: `opencensus.metrics.export.metric.Metric`] """ return self.view_manager.measure_to_view_map.get_metrics( - datetime.now()) + datetime.utcnow()) diff --git a/tests/unit/trace/test_blank_span.py b/tests/unit/trace/test_blank_span.py index cf8126233..0a0170451 100644 --- a/tests/unit/trace/test_blank_span.py +++ b/tests/unit/trace/test_blank_span.py @@ -64,7 +64,7 @@ def test_do_not_crash(self): with self.assertRaises(TypeError): span.add_time_event(time_event) - time_event = TimeEvent(datetime.datetime.now()) + time_event = TimeEvent(datetime.datetime.utcnow()) span.add_time_event(time_event) span_iter_list = list(iter(span)) diff --git a/tests/unit/trace/test_span.py b/tests/unit/trace/test_span.py index 9565bfb34..f5f2ccef4 100644 --- a/tests/unit/trace/test_span.py +++ b/tests/unit/trace/test_span.py @@ -145,7 +145,7 @@ def test_add_time_event(self): with self.assertRaises(TypeError): span.add_time_event(time_event) - time_event = TimeEvent(datetime.datetime.now()) + time_event = TimeEvent(datetime.datetime.utcnow()) span.add_time_event(time_event) self.assertEqual(len(span.time_events), 1) @@ -357,7 +357,7 @@ def test_format_span_json_with_parent_span(self, time_event_mock, span.start_time = start_time span.end_time = end_time span._child_spans = [] - span.time_events = [TimeEvent(datetime.datetime.now())] + span.time_events = [TimeEvent(datetime.datetime.utcnow())] span.stack_trace = StackTrace() span.status = Status(code='200', message='test') span.links = [Link(trace_id, span_id)] From a0f8c2ba0012b55a079a097baa0f01e689cd13ea Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Wed, 20 Mar 2019 14:40:36 -0700 Subject: [PATCH 16/21] Update SDSE tests --- .../stackdriver/stats_exporter/__init__.py | 19 ++- .../tests/test_stackdriver_stats.py | 134 +++++++++++++++++- .../metrics/export/metric_descriptor.py | 17 --- 3 files changed, 136 insertions(+), 34 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 2c6c4a67d..920931789 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -225,10 +225,9 @@ def _convert_point(self, metric, ts, point, sd_point): sd_point.value.double_value = float(point.value.value) # TODO: handle SUMMARY metrics, #567 - else: - md_type_name = metric_descriptor.MetricDescriptorType.to_name( - metric.descriptor.type) - raise TypeError("Unsupported metric type: {}".format(md_type_name)) + else: # pragma: NO COVER + raise TypeError("Unsupported metric type: {}" + .format(metric.descriptor.type)) end = point.timestamp if ts.start_timestamp is None: @@ -257,20 +256,17 @@ def get_metric_descriptor(self, oc_md): try: metric_kind, value_type = OC_MD_TO_SD_TYPE[oc_md.type] except KeyError: - md_type_name = metric_descriptor.MetricDescriptorType.to_name( - oc_md.type) - raise TypeError("Unsupported metric type: {}".format(md_type_name)) + raise TypeError("Unsupported metric type: {}".format(oc_md.type)) if self.options.metric_prefix: display_name_prefix = self.options.metric_prefix else: display_name_prefix = DEFAULT_DISPLAY_NAME_PREFIX - lks = [lk.key for lk in oc_md.label_keys] default_labels = self.options.default_monitoring_labels if default_labels is None: default_labels = {} - desc_labels = new_label_descriptors(default_labels, lks) + desc_labels = new_label_descriptors(default_labels, oc_md.label_keys) descriptor = monitoring_v3.types.MetricDescriptor(labels=desc_labels) metric_type = self.get_descriptor_type(oc_md) @@ -406,9 +402,10 @@ def new_label_descriptors(defaults, keys): label["description"] = lbl label_descriptors.append(label) - for tag_key in keys: + for label_key in keys: label = {} - label["key"] = sanitize_label(tag_key) + label["key"] = sanitize_label(label_key.key) + label["description"] = sanitize_label(label_key.description) label_descriptors.append(label) label_descriptors.append({"key": OPENCENSUS_TASK, "description": OPENCENSUS_TASK_DESCRIPTION}) diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index 05f8d41b9..23b127d42 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -21,6 +21,13 @@ from opencensus.common import utils from opencensus.common.version import __version__ from opencensus.ext.stackdriver import stats_exporter as stackdriver +from opencensus.metrics import label_key +from opencensus.metrics import label_value +from opencensus.metrics.export import metric +from opencensus.metrics.export import metric_descriptor +from opencensus.metrics.export import point +from opencensus.metrics.export import time_series +from opencensus.metrics.export import value from opencensus.stats import aggregation as aggregation_module from opencensus.stats import aggregation_data as aggregation_data_module from opencensus.stats import execution_context @@ -178,12 +185,6 @@ def test_get_task_value(self): task_value = stackdriver.get_task_value() self.assertNotEqual(task_value, "") - def test_new_label_descriptors(self): - defaults = {'key1': 'value1'} - keys = [FRONTEND_KEY] - output = stackdriver.new_label_descriptors(defaults, keys) - self.assertEqual(len(output), 3) - def test_namespaced_views(self): view_name = "view-1" expected_view_name_namespaced = ( @@ -237,6 +238,127 @@ def test_get_task_value_with_hostname(self, mock_uname, mock_pid): def test_get_task_value_without_hostname(self, mock_uname, mock_pid): self.assertEqual(stackdriver.get_task_value(), "py-12345@localhost") + def test_get_metric_descriptor(self): + exporter = stackdriver.StackdriverStatsExporter( + options=stackdriver.Options( + default_monitoring_labels={'dk': 'dd'}, + project_id='project_id'), + client=mock.Mock()) + + oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('ck', 'cd')] + ) + + sd_md = exporter.get_metric_descriptor(oc_md) + self.assertEqual( + sd_md.metric_kind, + monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE) + self.assertEqual( + sd_md.value_type, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64) + + self.assertIsInstance(sd_md, monitoring_v3.types.MetricDescriptor) + exporter.client.create_metric_descriptor.assert_not_called() + + def test_get_metric_descriptor_bad_type(self): + exporter = stackdriver.StackdriverStatsExporter( + options=stackdriver.Options(project_id='project_id'), + client=mock.Mock()) + + bad_type_oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + # Need a valid type to create the descriptor + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + bad_type_oc_md._type = 100 + + with self.assertRaises(TypeError): + exporter.get_metric_descriptor(bad_type_oc_md) + + def test_get_metric_descriptor_custom_prefix(self): + + exporter = stackdriver.StackdriverStatsExporter( + options=stackdriver.Options( + default_monitoring_labels={'dk': 'dd'}, + metric_prefix='metric_prefix', + project_id='project_id'), + client=mock.Mock()) + + oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('ck', 'cd')] + ) + + sd_md = exporter.get_metric_descriptor(oc_md) + self.assertIn('metric_prefix', sd_md.type) + self.assertIn('metric_prefix', sd_md.name) + + def test_register_metric_descriptor(self): + exporter = stackdriver.StackdriverStatsExporter( + options=stackdriver.Options( + metric_prefix='metric_prefix', + project_id='project_id'), + client=mock.Mock()) + + oc_md = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + + exporter.register_metric_descriptor(oc_md) + self.assertEqual( + exporter.client.create_metric_descriptor.call_count, + 1 + ) + exporter.register_metric_descriptor(oc_md) + self.assertEqual( + exporter.client.create_metric_descriptor.call_count, + 1 + ) + + def test_export_metrics(self): + lv = label_value.LabelValue('val') + val = value.ValueLong(value=123) + dt = datetime(2019, 3, 20, 21, 34, 0, 537954) + pp = point.Point(value=val, timestamp=dt) + + ts = [ + time_series.TimeSeries(label_values=[lv], points=[pp], + start_timestamp=utils.to_iso_str(dt)) + ] + + desc = metric_descriptor.MetricDescriptor( + name='name', + description='description', + unit='unit', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + + mm = metric.Metric(descriptor=desc, time_series=ts) + + exporter = stackdriver.StackdriverStatsExporter(client=mock.Mock()) + exporter.export_metrics([mm]) + + self.assertEqual(exporter.client.create_time_series.call_count, 1) + sd_args = exporter.client.create_time_series.call_args[0][1] + self.assertEqual(len(sd_args), 1) + [sd_arg] = exporter.client.create_time_series.call_args[0][1] + self.assertEqual(sd_arg.points[0].value.int64_value, 123) + class TestCreateTimeseries(unittest.TestCase): def check_labels(self, diff --git a/opencensus/metrics/export/metric_descriptor.py b/opencensus/metrics/export/metric_descriptor.py index f5b43f1d9..cbb258547 100644 --- a/opencensus/metrics/export/metric_descriptor.py +++ b/opencensus/metrics/export/metric_descriptor.py @@ -90,16 +90,6 @@ class MetricDescriptorType(object): SUMMARY: value.ValueSummary } - _name_map = { - GAUGE_INT64: 'GAUGE_INT64', - GAUGE_DOUBLE: 'GAUGE_DOUBLE', - GAUGE_DISTRIBUTION: 'GAUGE_DISTRIBUTION', - CUMULATIVE_INT64: 'CUMULATIVE_INT64', - CUMULATIVE_DOUBLE: 'CUMULATIVE_DOUBLE', - CUMULATIVE_DISTRIBUTION: 'CUMULATIVE_DISTRIBUTION', - SUMMARY: 'SUMMARY', - } - @classmethod def to_type_class(cls, metric_descriptor_type): try: @@ -107,13 +97,6 @@ def to_type_class(cls, metric_descriptor_type): except KeyError: raise ValueError("Unknown MetricDescriptorType value") - @classmethod - def to_name(cls, metric_descriptor_type): - try: - return cls._name_map[metric_descriptor_type] - except KeyError: - raise ValueError("Unknown MetricDescriptorType value") - class MetricDescriptor(object): """Defines a metric type and its schema. From 39cfba3b59b24cb618dd4e2c2d6bb29bfe7708d0 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 1 Apr 2019 14:12:20 -0700 Subject: [PATCH 17/21] Switch SD exporter to polling (#577) --- README.rst | 2 +- contrib/opencensus-ext-prometheus/README.rst | 2 +- .../examples/prometheus.py | 2 +- .../tests/test_prometheus_stats.py | 2 +- contrib/opencensus-ext-stackdriver/README.rst | 2 +- .../examples/stackdriver.py | 2 +- .../stackdriver/stats_exporter/__init__.py | 20 +-- .../tests/test_stackdriver_stats.py | 136 +++++++++++++++--- examples/stats/helloworld/main.py | 2 +- opencensus/metrics/transport.py | 97 +++++++++++++ opencensus/stats/stats.py | 5 +- .../stats/prometheus/prometheus_stats_test.py | 2 +- .../stackdriver/stackdriver_stats_test.py | 83 ++++++----- tests/unit/metrics/test_transport.py | 122 ++++++++++++++++ tests/unit/stats/test_stats.py | 2 +- 15 files changed, 401 insertions(+), 80 deletions(-) create mode 100644 opencensus/metrics/transport.py create mode 100644 tests/unit/metrics/test_transport.py diff --git a/README.rst b/README.rst index bdbe252dc..fd6569342 100644 --- a/README.rst +++ b/README.rst @@ -54,7 +54,7 @@ Installation & basic usage from opencensus.stats import stats as stats_module - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/contrib/opencensus-ext-prometheus/README.rst b/contrib/opencensus-ext-prometheus/README.rst index 2845ff7c9..5e36d506c 100644 --- a/contrib/opencensus-ext-prometheus/README.rst +++ b/contrib/opencensus-ext-prometheus/README.rst @@ -45,7 +45,7 @@ Register the Prometheus exporter .. code:: python - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager exporter = prometheus.new_stats_exporter(prometheus.Options(namespace="")) diff --git a/contrib/opencensus-ext-prometheus/examples/prometheus.py b/contrib/opencensus-ext-prometheus/examples/prometheus.py index 272d3e902..5e808d007 100644 --- a/contrib/opencensus-ext-prometheus/examples/prometheus.py +++ b/contrib/opencensus-ext-prometheus/examples/prometheus.py @@ -40,7 +40,7 @@ def main(): - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py b/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py index a3888af39..5d9795534 100644 --- a/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py +++ b/contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py @@ -280,7 +280,7 @@ def test_exporter_constructor_no_namespace(self): def test_emit(self): options = prometheus.Options(namespace="opencensus", port=9005) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder exporter = prometheus.new_stats_exporter(options) diff --git a/contrib/opencensus-ext-stackdriver/README.rst b/contrib/opencensus-ext-stackdriver/README.rst index 30b6b3132..9702c3bbc 100644 --- a/contrib/opencensus-ext-stackdriver/README.rst +++ b/contrib/opencensus-ext-stackdriver/README.rst @@ -89,7 +89,7 @@ Register the Stackdriver exporter .. code:: python - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager exporter = stackdriver.new_stats_exporter(stackdriver.Options(project_id="")) diff --git a/contrib/opencensus-ext-stackdriver/examples/stackdriver.py b/contrib/opencensus-ext-stackdriver/examples/stackdriver.py index 9dbfa682d..f0632e7e1 100644 --- a/contrib/opencensus-ext-stackdriver/examples/stackdriver.py +++ b/contrib/opencensus-ext-stackdriver/examples/stackdriver.py @@ -30,7 +30,7 @@ "task_latency", "The task latency in milliseconds", "ms") # The stats recorder -stats = stats_module.Stats() +stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index 920931789..b3384ba3d 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -14,7 +14,6 @@ from datetime import datetime import itertools -import logging import os import platform import re @@ -27,8 +26,10 @@ from opencensus.common import utils from opencensus.common.monitored_resource import monitored_resource from opencensus.common.version import __version__ +from opencensus.metrics import transport from opencensus.metrics.export import metric as metric_module from opencensus.metrics.export import metric_descriptor +from opencensus.stats import stats MAX_TIME_SERIES_PER_UPLOAD = 200 @@ -147,6 +148,7 @@ def client(self): return self._client def export_metrics(self, metrics): + metrics = list(metrics) for metric in metrics: self.register_metric_descriptor(metric.descriptor) ts_batches = self.create_batched_time_series(metrics) @@ -371,7 +373,9 @@ def new_stats_exporter(options): ci = client_info.ClientInfo(client_library_version=get_user_agent_slug()) client = monitoring_v3.MetricServiceClient(client_info=ci) exporter = StackdriverStatsExporter(client=client, options=options) - return exporter + + tt = transport.get_exporter_thread(stats.stats, exporter) + return exporter, tt def get_task_value(): @@ -412,18 +416,6 @@ def new_label_descriptors(defaults, keys): return label_descriptors -def set_metric_labels(series, view, tag_values): - if len(view.columns) != len(tag_values): - logging.warning( - "TagKeys and TagValues don't have same size." - ) # pragma: NO COVER - - for key, value in zip(view.columns, tag_values): - if value is not None: - series.metric.labels[sanitize_label(key)] = value - series.metric.labels[OPENCENSUS_TASK] = get_task_value() - - def sanitize_label(text): """Remove characters not accepted in labels key diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index 23b127d42..f911dc273 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -23,6 +23,7 @@ from opencensus.ext.stackdriver import stats_exporter as stackdriver from opencensus.metrics import label_key from opencensus.metrics import label_value +from opencensus.metrics import transport as transport_module from opencensus.metrics.export import metric from opencensus.metrics.export import metric_descriptor from opencensus.metrics.export import point @@ -124,7 +125,7 @@ def test_not_blank_project(self): '.monitoring_v3.MetricServiceClient'), _Client) with patch_client: - exporter_created = stackdriver.new_stats_exporter( + exporter_created, transport = stackdriver.new_stats_exporter( stackdriver.Options(project_id=1)) self.assertIsInstance(exporter_created, @@ -145,7 +146,7 @@ def test_client_info_user_agent(self): '.MetricServiceClient', _Client) with patch_client: - exporter = stackdriver.new_stats_exporter( + exporter, transport = stackdriver.new_stats_exporter( stackdriver.Options(project_id=1)) self.assertIn(stackdriver.get_user_agent_slug(), @@ -198,7 +199,7 @@ def test_namespaced_views(self): self.assertEqual(expected_view_name_namespaced, view_name_namespaced) def test_stackdriver_register_exporter(self): - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager exporter = mock.Mock() @@ -211,17 +212,6 @@ def test_stackdriver_register_exporter(self): self.assertEqual(registered_exporters, 1) - def test_set_metric_labels(self): - series = monitoring_v3.types.TimeSeries() - tag_value = tag_value_module.TagValue("1200") - stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [tag_value]) - self.assertEqual(len(series.metric.labels), 2) - - def test_set_metric_labels_with_None(self): - series = monitoring_v3.types.TimeSeries() - stackdriver.set_metric_labels(series, VIDEO_SIZE_VIEW, [None]) - self.assertEqual(len(series.metric.labels), 1) - @mock.patch('os.getpid', return_value=12345) @mock.patch( 'platform.uname', @@ -360,7 +350,119 @@ def test_export_metrics(self): self.assertEqual(sd_arg.points[0].value.int64_value, 123) +class MockPeriodicTask(object): + """Testing mock of metrics.transport.PeriodicTask. + + Simulate calling export asynchronously from another thread synchronously + from this one. + """ + def __init__(self, func, interval=None, **kwargs): + self.func = func + self.logger = mock.Mock() + self.start = mock.Mock() + self.run = mock.Mock() + + def step(self): + try: + self.func() + except transport_module.TransportError as ex: + self.logger.exception(ex) + self.stop() + except Exception: + self.logger.exception("Error handling metric export") + + +@mock.patch('opencensus.ext.stackdriver.stats_exporter' + '.monitoring_v3.MetricServiceClient') +class TestAsyncStatsExport(unittest.TestCase): + """Check that metrics are exported using the exporter thread.""" + + def setUp(self): + patcher = mock.patch( + 'opencensus.metrics.transport.PeriodicTask', + MockPeriodicTask) + patcher.start() + self.addCleanup(patcher.stop) + + @mock.patch('opencensus.ext.stackdriver.stats_exporter' + '.stats.stats') + def test_export_empty(self, mock_stats, mock_client): + """Check that we don't attempt to export empty metric sets.""" + + mock_stats.get_metrics.return_value = [] + + exporter, transport = stackdriver.new_stats_exporter( + stackdriver.Options(project_id=1)) + + transport.step() + exporter.client.create_metric_descriptor.assert_not_called() + exporter.client.create_time_series.assert_not_called() + + @mock.patch('opencensus.ext.stackdriver.stats_exporter' + '.stats.stats') + def test_export_single_metric(self, mock_stats, mock_client): + """Check that we can export a set of a single metric.""" + + lv = label_value.LabelValue('val') + val = value.ValueLong(value=123) + dt = datetime(2019, 3, 20, 21, 34, 0, 537954) + pp = point.Point(value=val, timestamp=dt) + + ts = [ + time_series.TimeSeries(label_values=[lv], points=[pp], + start_timestamp=utils.to_iso_str(dt)) + ] + + desc = metric_descriptor.MetricDescriptor( + name='name2', + description='description2', + unit='unit2', + type_=metric_descriptor.MetricDescriptorType.GAUGE_INT64, + label_keys=[label_key.LabelKey('key', 'description')] + ) + + mm = metric.Metric(descriptor=desc, time_series=ts) + mock_stats.get_metrics.return_value = [mm] + + exporter, transport = stackdriver.new_stats_exporter( + stackdriver.Options(project_id=1)) + + transport.step() + + exporter.client.create_metric_descriptor.assert_called() + self.assertEqual( + exporter.client.create_metric_descriptor.call_count, + 1) + md_call_arg =\ + exporter.client.create_metric_descriptor.call_args[0][1] + self.assertEqual( + md_call_arg.metric_kind, + monitoring_v3.enums.MetricDescriptor.MetricKind.GAUGE + ) + self.assertEqual( + md_call_arg.value_type, + monitoring_v3.enums.MetricDescriptor.ValueType.INT64 + ) + + exporter.client.create_time_series.assert_called() + self.assertEqual( + exporter.client.create_time_series.call_count, + 1) + ts_call_arg = exporter.client.create_time_series.call_args[0][1] + self.assertEqual(len(ts_call_arg), 1) + self.assertEqual(len(ts_call_arg[0].points), 1) + self.assertEqual(ts_call_arg[0].points[0].value.int64_value, 123) + + class TestCreateTimeseries(unittest.TestCase): + + def setUp(self): + patcher = mock.patch( + 'opencensus.ext.stackdriver.stats_exporter.stats.stats', + stats_module._Stats()) + patcher.start() + self.addCleanup(patcher.stop) + def check_labels(self, actual_labels, expected_labels, @@ -459,7 +561,7 @@ def setup_create_timeseries_test(self): exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder @@ -535,7 +637,7 @@ def test_create_timeseries_with_resource(self, monitor_resource_mock): exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder @@ -806,7 +908,7 @@ def test_create_timeseries_float_tagvalue(self, monitor_resource_mock): exporter = stackdriver.StackdriverStatsExporter( options=option, client=client) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/examples/stats/helloworld/main.py b/examples/stats/helloworld/main.py index f49107356..5f65696ca 100644 --- a/examples/stats/helloworld/main.py +++ b/examples/stats/helloworld/main.py @@ -39,7 +39,7 @@ def main(): - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/opencensus/metrics/transport.py b/opencensus/metrics/transport.py new file mode 100644 index 000000000..b7ac3f8b2 --- /dev/null +++ b/opencensus/metrics/transport.py @@ -0,0 +1,97 @@ +# Copyright 2019, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +from opencensus.common import utils + + +logger = logging.getLogger(__name__) + +DEFAULT_INTERVAL = 60 +GRACE_PERIOD = 5 + + +class TransportError(Exception): + pass + + +class PeriodicTask(threading.Thread): + """Thread that periodically calls a given function. + + :type func: function + :param func: The function to call. + + :type interval: int or float + :param interval: Seconds between calls to the function. + """ + + daemon = True + + def __init__(self, func, interval=None, **kwargs): + super(PeriodicTask, self).__init__(**kwargs) + if interval is None: + interval = DEFAULT_INTERVAL + self.func = func + self.interval = interval + self._stopped = threading.Event() + + def run(self): + while not self._stopped.wait(self.interval): + try: + self.func() + except TransportError as ex: + logger.exception(ex) + self.stop() + except Exception: + logger.exception("Error handling metric export") + + def stop(self): + self._stopped.set() + + +def get_exporter_thread(metric_producer, exporter): + """Get a running task that periodically exports metrics. + + Get a `PeriodicTask` that exports periodically calls: + + exporter.export_metrics(metric_producer.get_metrics()) + + :type metric_producer: + :class:`opencensus.metrics.export.metric_producer.MetricProducer` + :param exporter: The producer to use to get metrics to export. + + :type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter` + :param exporter: The exporter to use to export metrics. + + :rtype: :class:`threading.Thread` + :return: A running thread responsible calling the exporter. + + """ + weak_get = utils.get_weakref(metric_producer.get_metrics) + weak_export = utils.get_weakref(exporter.export_metrics) + + def export_all(): + get = weak_get() + if get is None: + raise TransportError("Metric producer is not available") + export = weak_export() + if export is None: + raise TransportError("Metric exporter is not available") + export(get()) + + tt = PeriodicTask(export_all) + tt.start() + return tt diff --git a/opencensus/stats/stats.py b/opencensus/stats/stats.py index df1b27ee9..b3c4f322b 100644 --- a/opencensus/stats/stats.py +++ b/opencensus/stats/stats.py @@ -19,7 +19,7 @@ from opencensus.stats.view_manager import ViewManager -class Stats(MetricProducer): +class _Stats(MetricProducer): """Stats defines a View Manager and a Stats Recorder in order for the collection of Stats """ @@ -38,3 +38,6 @@ def get_metrics(self): """ return self.view_manager.measure_to_view_map.get_metrics( datetime.utcnow()) + + +stats = _Stats() diff --git a/tests/system/stats/prometheus/prometheus_stats_test.py b/tests/system/stats/prometheus/prometheus_stats_test.py index 541ec04be..06bcb31c7 100644 --- a/tests/system/stats/prometheus/prometheus_stats_test.py +++ b/tests/system/stats/prometheus/prometheus_stats_test.py @@ -39,7 +39,7 @@ def test_prometheus_stats(self): request_count_view_name, "number of requests broken down by methods", [method_key], request_count_measure, count_agg) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder diff --git a/tests/system/stats/stackdriver/stackdriver_stats_test.py b/tests/system/stats/stackdriver/stackdriver_stats_test.py index ab7073a08..0a6f1cbd6 100644 --- a/tests/system/stats/stackdriver/stackdriver_stats_test.py +++ b/tests/system/stats/stackdriver/stackdriver_stats_test.py @@ -14,11 +14,11 @@ import os import random +import sys import time -import unittest from google.cloud import monitoring_v3 -from retrying import retry +import mock from opencensus.ext.stackdriver import stats_exporter as stackdriver from opencensus.stats import aggregation as aggregation_module @@ -29,14 +29,42 @@ from opencensus.tags import tag_map as tag_map_module from opencensus.tags import tag_value as tag_value_module +if sys.version_info < (3,): + import unittest2 as unittest +else: + import unittest + + MiB = 1 << 20 PROJECT = os.environ.get('GCLOUD_PROJECT_PYTHON') -RETRY_WAIT_PERIOD = 10000 # Wait 10 seconds between each retry -RETRY_MAX_ATTEMPT = 10 # Retry 10 times +ASYNC_TEST_INTERVAL = 15 # Background thread export interval class TestBasicStats(unittest.TestCase): + + def check_sd_md(self, exporter, view_description): + """Check that the metric descriptor was written to stackdriver.""" + name = exporter.client.project_path(PROJECT) + list_metrics_descriptors = exporter.client.list_metric_descriptors( + name) + + for ee in list_metrics_descriptors: + if ee.description == view_description: + break + else: + raise AssertionError("No matching metric descriptor") + + self.assertIsNotNone(ee) + self.assertEqual(ee.unit, "By") + + def setUp(self): + patcher = mock.patch( + 'opencensus.ext.stackdriver.stats_exporter.stats.stats', + stats_module._Stats()) + patcher.start() + self.addCleanup(patcher.stop) + def test_stats_record_sync(self): # We are using sufix in order to prevent cached objects sufix = str(os.getgid()) @@ -57,7 +85,7 @@ def test_stats_record_sync(self): VIDEO_SIZE_VIEW_NAME, view_description, [FRONTEND_KEY], VIDEO_SIZE_MEASURE, VIDEO_SIZE_DISTRIBUTION) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder @@ -82,27 +110,15 @@ def test_stats_record_sync(self): measure_map.measure_int_put(VIDEO_SIZE_MEASURE, 25 * MiB) measure_map.record(tag_map) - exporter.export_metrics(stats_module.Stats().get_metrics()) + exporter.export_metrics(stats_module.stats.get_metrics()) # Sleep for [0, 10] milliseconds to fake wait. time.sleep(random.randint(1, 10) / 1000.0) - @retry( - wait_fixed=RETRY_WAIT_PERIOD, - stop_max_attempt_number=RETRY_MAX_ATTEMPT) - def get_metric_descriptors(self, exporter, view_description): - name = exporter.client.project_path(PROJECT) - list_metrics_descriptors = exporter.client.list_metric_descriptors( - name) - element = next((element for element in list_metrics_descriptors - if element.description == view_description), None) - - self.assertIsNotNone(element) - self.assertEqual(element.description, view_description) - self.assertEqual(element.unit, "By") - - get_metric_descriptors(self, exporter, view_description) + self.check_sd_md(exporter, view_description) + @mock.patch('opencensus.metrics.transport.DEFAULT_INTERVAL', + ASYNC_TEST_INTERVAL) def test_stats_record_async(self): # We are using sufix in order to prevent cached objects sufix = str(os.getpid()) @@ -125,11 +141,11 @@ def test_stats_record_async(self): VIDEO_SIZE_VIEW_NAME_ASYNC, view_description, [FRONTEND_KEY_ASYNC], VIDEO_SIZE_MEASURE_ASYNC, VIDEO_SIZE_DISTRIBUTION_ASYNC) - stats = stats_module.Stats() + stats = stats_module.stats view_manager = stats.view_manager stats_recorder = stats.stats_recorder - exporter = stackdriver.new_stats_exporter( + exporter, transport = stackdriver.new_stats_exporter( stackdriver.Options(project_id=PROJECT)) view_manager.register_exporter(exporter) @@ -148,19 +164,8 @@ def test_stats_record_async(self): measure_map.measure_int_put(VIDEO_SIZE_MEASURE_ASYNC, 25 * MiB) measure_map.record(tag_map) - exporter.export_metrics(stats_module.Stats().get_metrics()) - - @retry( - wait_fixed=RETRY_WAIT_PERIOD, - stop_max_attempt_number=RETRY_MAX_ATTEMPT) - def get_metric_descriptors(self, exporter, view_description): - name = exporter.client.project_path(PROJECT) - list_metrics_descriptors = exporter.client.list_metric_descriptors( - name) - element = next((element for element in list_metrics_descriptors - if element.description == view_description), None) - self.assertIsNotNone(element) - self.assertEqual(element.description, view_description) - self.assertEqual(element.unit, "By") - - get_metric_descriptors(self, exporter, view_description) + # Give the exporter thread enough time to export exactly once + time.sleep(ASYNC_TEST_INTERVAL * 2 - 1) + transport.stop() + + self.check_sd_md(exporter, view_description) diff --git a/tests/unit/metrics/test_transport.py b/tests/unit/metrics/test_transport.py new file mode 100644 index 000000000..69ee7769e --- /dev/null +++ b/tests/unit/metrics/test_transport.py @@ -0,0 +1,122 @@ +# Copyright 2019, OpenCensus Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import sys +import time + +import mock + +from opencensus.metrics import transport + +if sys.version_info < (3,): + import unittest2 as unittest +else: + import unittest + + +# Some tests use real time! This is the time to wait between the exporter +# thread handling tasks, and doesn't account for processing time. If these +# tests become flaky, try increasing this. +INTERVAL = .05 + + +class TestPeriodicTask(unittest.TestCase): + + def test_default_constructor(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func) + self.assertEqual(task.func, mock_func) + self.assertEqual(task.interval, transport.DEFAULT_INTERVAL) + + def test_periodic_task_not_started(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func, INTERVAL) + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_func.assert_not_called() + task.stop() + + def test_periodic_task(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func, INTERVAL) + task.start() + mock_func.assert_not_called() + time.sleep(INTERVAL + INTERVAL / 2.0) + self.assertEqual(mock_func.call_count, 1) + time.sleep(INTERVAL) + self.assertEqual(mock_func.call_count, 2) + time.sleep(INTERVAL) + self.assertEqual(mock_func.call_count, 3) + + def test_periodic_task_stop(self): + mock_func = mock.Mock() + task = transport.PeriodicTask(mock_func, INTERVAL) + task.start() + time.sleep(INTERVAL + INTERVAL / 2.0) + self.assertEqual(mock_func.call_count, 1) + task.stop() + time.sleep(INTERVAL) + self.assertEqual(mock_func.call_count, 1) + + +@mock.patch('opencensus.metrics.transport.DEFAULT_INTERVAL', INTERVAL) +@mock.patch('opencensus.metrics.transport.logger') +class TestGetExporterThreadPeriodic(unittest.TestCase): + + def test_threaded_export(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + metrics = mock.Mock() + producer.get_metrics.return_value = metrics + try: + task = transport.get_exporter_thread(producer, exporter) + producer.get_metrics.assert_not_called() + exporter.export_metrics.assert_not_called() + time.sleep(INTERVAL + INTERVAL / 2.0) + producer.get_metrics.assert_called_once_with() + exporter.export_metrics.assert_called_once_with(metrics) + finally: + task.stop() + task.join() + + def test_producer_error(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + + producer.get_metrics.side_effect = ValueError() + + task = transport.get_exporter_thread(producer, exporter) + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_logger.exception.assert_called() + self.assertFalse(task._stopped.is_set()) + + def test_producer_deleted(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + task = transport.get_exporter_thread(producer, exporter) + del producer + gc.collect() + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_logger.exception.assert_called() + self.assertTrue(task._stopped.is_set()) + + def test_exporter_deleted(self, mock_logger): + producer = mock.Mock() + exporter = mock.Mock() + task = transport.get_exporter_thread(producer, exporter) + del exporter + gc.collect() + time.sleep(INTERVAL + INTERVAL / 2.0) + mock_logger.exception.assert_called() + self.assertTrue(task._stopped.is_set()) diff --git a/tests/unit/stats/test_stats.py b/tests/unit/stats/test_stats.py index 7eeae0c25..81588f41b 100644 --- a/tests/unit/stats/test_stats.py +++ b/tests/unit/stats/test_stats.py @@ -32,7 +32,7 @@ class TestStats(unittest.TestCase): def test_get_metrics(self): """Test that Stats converts recorded values into metrics.""" - stats = stats_module.Stats() + stats = stats_module.stats # Check that metrics are empty before view registration initial_metrics = list(stats.get_metrics()) From f55ac1fcf6e0562ed8f3446d0a873432a110593e Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 1 Apr 2019 15:11:58 -0700 Subject: [PATCH 18/21] Remove references to metrics distribution mean --- .../tests/test_stackdriver_stats.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py index f911dc273..8200bd2c4 100644 --- a/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py +++ b/contrib/opencensus-ext-stackdriver/tests/test_stackdriver_stats.py @@ -610,7 +610,6 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual(len(time_series.points), 1) value = time_series.points[0].value self.assertEqual(value.distribution_value.count, 1) - # self.assertEqual(value.distribution_value.mean, 25 * MiB) time_series_list = exporter.create_time_series_list(v_data) @@ -624,7 +623,6 @@ def test_create_timeseries(self, monitor_resource_mock): self.assertEqual(len(time_series.points), 1) value = time_series.points[0].value self.assertEqual(value.distribution_value.count, 1) - # self.assertEqual(value.distribution_value.mean, 25 * MiB) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance') @@ -1003,7 +1001,6 @@ def test_create_timeseries_multiple_tag_values(self, self.assertEqual(len(ts1.points), 1) value1 = ts1.points[0].value self.assertEqual(value1.distribution_value.count, 1) - # self.assertEqual(value1.distribution_value.mean, 25 * MiB) # Verify second time series self.assertEqual(ts2.resource.type, "global") @@ -1015,7 +1012,6 @@ def test_create_timeseries_multiple_tag_values(self, self.assertEqual(len(ts2.points), 1) value2 = ts2.points[0].value self.assertEqual(value2.distribution_value.count, 1) - # self.assertEqual(value2.distribution_value.mean, 12 * MiB) @mock.patch('opencensus.ext.stackdriver.stats_exporter.' 'monitored_resource.get_instance', @@ -1109,7 +1105,6 @@ def test_create_timeseries_from_distribution(self): [point] = time_series.points dv = point.value.distribution_value self.assertEqual(100, dv.count) - # self.assertEqual(4.5, dv.mean) self.assertEqual(825.0, dv.sum_of_squared_deviation) self.assertEqual([0, 20, 20, 20, 20, 20], dv.bucket_counts) self.assertEqual([0, 2, 4, 6, 8], From c87f9b6eea3809e0a4663f9c4a760deb28284ab6 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 1 Apr 2019 16:16:36 -0700 Subject: [PATCH 19/21] Make metrics export interval configurable --- .../stackdriver/stats_exporter/__init__.py | 26 ++++++++++++++++--- opencensus/metrics/transport.py | 11 +++++--- .../stackdriver/stackdriver_stats_test.py | 5 ++-- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index b3384ba3d..ab9a8f44d 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -363,9 +363,23 @@ def get_user_agent_slug(): return "opencensus-python/{}".format(__version__) -def new_stats_exporter(options): - """ new_stats_exporter returns an exporter that - uploads stats data to Stackdriver Monitoring. +def new_stats_exporter(options, interval=None): + """Get a stats exporter and running transport thread. + + Create a new `StackdriverStatsExporter` with the given options and start + periodically exporting stats to stackdriver in the background. + + See `opencensus.metrics.transport.get_exporter_thread` for details on the + transport thread. + + :type options: :class:`Options` + :param exporter: Options to pass to the exporter + + :type interval: int or float + :param interval: Seconds between export calls. + + :rtype: :class:`StackdriverStatsExporter` and :class:`PeriodicTask` + :return: A tuple of the exporter and transport thread. """ if str(options.project_id).strip() == "": raise ValueError(ERROR_BLANK_PROJECT_ID) @@ -374,7 +388,11 @@ def new_stats_exporter(options): client = monitoring_v3.MetricServiceClient(client_info=ci) exporter = StackdriverStatsExporter(client=client, options=options) - tt = transport.get_exporter_thread(stats.stats, exporter) + if interval is None: + tt = transport.get_exporter_thread(stats.stats, exporter) + else: + tt = transport.get_exporter_thread(stats.stats, exporter, + interval=interval) return exporter, tt diff --git a/opencensus/metrics/transport.py b/opencensus/metrics/transport.py index b7ac3f8b2..c804ea834 100644 --- a/opencensus/metrics/transport.py +++ b/opencensus/metrics/transport.py @@ -62,10 +62,10 @@ def stop(self): self._stopped.set() -def get_exporter_thread(metric_producer, exporter): +def get_exporter_thread(metric_producer, exporter, interval=None): """Get a running task that periodically exports metrics. - Get a `PeriodicTask` that exports periodically calls: + Get a `PeriodicTask` that periodically calls: exporter.export_metrics(metric_producer.get_metrics()) @@ -76,7 +76,10 @@ def get_exporter_thread(metric_producer, exporter): :type exporter: :class:`opencensus.stats.base_exporter.MetricsExporter` :param exporter: The exporter to use to export metrics. - :rtype: :class:`threading.Thread` + :type interval: int or float + :param interval: Seconds between export calls. + + :rtype: :class:`PeriodicTask` :return: A running thread responsible calling the exporter. """ @@ -92,6 +95,6 @@ def export_all(): raise TransportError("Metric exporter is not available") export(get()) - tt = PeriodicTask(export_all) + tt = PeriodicTask(export_all, interval=interval) tt.start() return tt diff --git a/tests/system/stats/stackdriver/stackdriver_stats_test.py b/tests/system/stats/stackdriver/stackdriver_stats_test.py index 0a6f1cbd6..ce0c90133 100644 --- a/tests/system/stats/stackdriver/stackdriver_stats_test.py +++ b/tests/system/stats/stackdriver/stackdriver_stats_test.py @@ -117,8 +117,6 @@ def test_stats_record_sync(self): self.check_sd_md(exporter, view_description) - @mock.patch('opencensus.metrics.transport.DEFAULT_INTERVAL', - ASYNC_TEST_INTERVAL) def test_stats_record_async(self): # We are using sufix in order to prevent cached objects sufix = str(os.getpid()) @@ -146,7 +144,8 @@ def test_stats_record_async(self): stats_recorder = stats.stats_recorder exporter, transport = stackdriver.new_stats_exporter( - stackdriver.Options(project_id=PROJECT)) + stackdriver.Options(project_id=PROJECT), + interval=ASYNC_TEST_INTERVAL) view_manager.register_exporter(exporter) # Register view. From 2a5b6cc64eaf8869d36f2efd45dd4fc2d4aac2ad Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Mon, 1 Apr 2019 17:52:02 -0700 Subject: [PATCH 20/21] Pass default interval kwarg --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index ab9a8f44d..ee0ddda54 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -388,11 +388,8 @@ def new_stats_exporter(options, interval=None): client = monitoring_v3.MetricServiceClient(client_info=ci) exporter = StackdriverStatsExporter(client=client, options=options) - if interval is None: - tt = transport.get_exporter_thread(stats.stats, exporter) - else: - tt = transport.get_exporter_thread(stats.stats, exporter, - interval=interval) + tt = transport.get_exporter_thread(stats.stats, exporter, + interval=interval) return exporter, tt From 0dc919f826a4424088530fe626396d928fd246b4 Mon Sep 17 00:00:00 2001 From: Chris Kleinknecht Date: Tue, 2 Apr 2019 11:32:02 -0700 Subject: [PATCH 21/21] Fix mutable options arg in exporter constructor --- .../opencensus/ext/stackdriver/stats_exporter/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py index ee0ddda54..eb7ec5f9f 100644 --- a/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py +++ b/contrib/opencensus-ext-stackdriver/opencensus/ext/stackdriver/stats_exporter/__init__.py @@ -133,7 +133,9 @@ def default_monitoring_labels(self): class StackdriverStatsExporter(object): """Stats exporter for the Stackdriver Monitoring backend.""" - def __init__(self, options=Options(), client=None): + def __init__(self, options=None, client=None): + if options is None: + options = Options() self._options = options self._client = client self._md_cache = {}