From 2de70ff2a02c50a32de4d0cdc3ab71a495ab079d Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 30 Jan 2018 16:21:04 -0800 Subject: [PATCH] Moving User metrics to be in the PTransform proto for Fn API. --- .../src/main/proto/beam_fn_api.proto | 9 +++----- sdks/python/apache_beam/metrics/execution.py | 15 ++----------- sdks/python/apache_beam/metrics/metricbase.py | 10 +++++++++ .../runners/portability/fn_api_runner.py | 21 +++++++++++-------- .../runners/worker/bundle_processor.py | 5 +---- .../apache_beam/runners/worker/operations.py | 3 ++- 6 files changed, 30 insertions(+), 33 deletions(-) diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto index 16f7709982d1..97edb710dd94 100644 --- a/model/fn-execution/src/main/proto/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/beam_fn_api.proto @@ -278,6 +278,7 @@ message Metrics { // the actual watermarks. map watermarks = 3; + repeated User user = 4; // TODO: Define other transform level system metrics. } @@ -285,10 +286,7 @@ message Metrics { message User { // A key for identifying a metric at the most granular level. - message MetricKey { - // The step, if any, this metric is associated with. - string step = 1; - + message MetricName { // (Required): The namespace of this metric. string namespace = 2; @@ -310,7 +308,7 @@ message Metrics { } // (Required) The identifier for this metric. - MetricKey key = 1; + MetricName metric_name = 1; // (Required) The data for this metric. oneof data { @@ -320,7 +318,6 @@ message Metrics { } map ptransforms = 1; - repeated User user = 2; } message ProcessBundleProgressResponse { diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 959424160bc2..e67b64512c5f 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -34,7 +34,6 @@ from apache_beam.metrics.cells import CounterCell from apache_beam.metrics.cells import DistributionCell -from apache_beam.metrics.metricbase import MetricName from apache_beam.portability.api import beam_fn_api_pb2 @@ -65,14 +64,6 @@ def __repr__(self): def __hash__(self): return hash((self.step, self.metric)) - def to_runner_api(self): - return beam_fn_api_pb2.Metrics.User.MetricKey( - step=self.step, namespace=self.metric.namespace, name=self.metric.name) - - @staticmethod - def from_runner_api(proto): - return MetricKey(proto.step, MetricName(proto.namespace, proto.name)) - class MetricResult(object): """Keeps track of the status of a metric within a single bundle. @@ -205,14 +196,12 @@ def get_cumulative(self): def to_runner_api(self): return ( [beam_fn_api_pb2.Metrics.User( - key=beam_fn_api_pb2.Metrics.User.MetricKey( - step=self.step_name, namespace=k.namespace, name=k.name), + metric_name=k.to_runner_api(), counter_data=beam_fn_api_pb2.Metrics.User.CounterData( value=v.get_cumulative())) for k, v in self.counters.items()] + [beam_fn_api_pb2.Metrics.User( - key=beam_fn_api_pb2.Metrics.User.MetricKey( - step=self.step_name, namespace=k.namespace, name=k.name), + metric_name=k.to_runner_api(), distribution_data=v.get_cumulative().to_runner_api()) for k, v in self.distributions.items()]) diff --git a/sdks/python/apache_beam/metrics/metricbase.py b/sdks/python/apache_beam/metrics/metricbase.py index 9b1918907f6b..014863d91fbb 100644 --- a/sdks/python/apache_beam/metrics/metricbase.py +++ b/sdks/python/apache_beam/metrics/metricbase.py @@ -30,6 +30,8 @@ - MetricName - Namespace and name used to refer to a Metric. """ +from apache_beam.portability.api import beam_fn_api_pb2 + __all__ = ['Metric', 'Counter', 'Distribution', 'MetricName'] @@ -65,6 +67,14 @@ def __str__(self): def __hash__(self): return hash((self.namespace, self.name)) + def to_runner_api(self): + return beam_fn_api_pb2.Metrics.User.MetricName( + namespace=self.namespace, name=self.name) + + @staticmethod + def from_runner_api(proto): + return MetricName(proto.namespace, proto.name) + class Metric(object): """Base interface of a metric object.""" diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 9daaebb4b506..e7d2d1a9c32c 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -1174,15 +1174,18 @@ def __init__(self, step_metrics): self._counters = {} self._distributions = {} for step_metric in step_metrics.values(): - for proto in step_metric.user: - key = metrics.execution.MetricKey.from_runner_api(proto.key) - if proto.HasField('counter_data'): - self._counters[key] = proto.counter_data.value - elif proto.HasField('distribution_data'): - self._distributions[ - key] = metrics.cells.DistributionResult( - metrics.cells.DistributionData.from_runner_api( - proto.distribution_data)) + for ptransform_id, ptransform in step_metric.ptransforms.items(): + for proto in ptransform.user: + key = metrics.execution.MetricKey( + ptransform_id, + metrics.metricbase.MetricName.from_runner_api(proto.metric_name)) + if proto.HasField('counter_data'): + self._counters[key] = proto.counter_data.value + elif proto.HasField('distribution_data'): + self._distributions[ + key] = metrics.cells.DistributionResult( + metrics.cells.DistributionData.from_runner_api( + proto.distribution_data)) def query(self, filter=None): counters = [metrics.execution.MetricResult(k, v, v) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 3c14a6f6781d..f798e093bf96 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -274,10 +274,7 @@ def metrics(self): ptransforms={ transform_id: self._fix_output_tags(transform_id, op.progress_metrics()) - for transform_id, op in self.ops.items()}, - user=sum( - [op.metrics_container.to_runner_api() for op in self.ops.values()], - [])) + for transform_id, op in self.ops.items()}) def _fix_output_tags(self, transform_id, metrics): # Outputs are still referred to by index, not by name, in many Operations. diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 576dc60a9adc..2281f4cb7eb5 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -177,7 +177,8 @@ def progress_metrics(self): {'ONLY_OUTPUT': self.receivers[0].opcounter .element_counter.value()} if len(self.receivers) == 1 - else None)))) + else None))), + user=self.metrics_container.to_runner_api()) def __str__(self): """Generates a useful string for this object.