Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,15 @@ message Metrics {
// the actual watermarks.
map<string, int64> watermarks = 3;

repeated User user = 4;
// TODO: Define other transform level system metrics.
}

// User defined metrics
message User {

// A key for identifying a metric at the most granular level.
message MetricKey {
// The step, if any, this metric is associated with.
string step = 1;

message MetricName {
// (Required): The namespace of this metric.
string namespace = 2;

Expand All @@ -310,7 +308,7 @@ message Metrics {
}

// (Required) The identifier for this metric.
MetricKey key = 1;
MetricName metric_name = 1;

// (Required) The data for this metric.
oneof data {
Expand All @@ -320,7 +318,6 @@ message Metrics {
}

map<string, PTransform> ptransforms = 1;
repeated User user = 2;
}

message ProcessBundleProgressResponse {
Expand Down
15 changes: 2 additions & 13 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.metricbase import MetricName
from apache_beam.portability.api import beam_fn_api_pb2


Expand Down Expand Up @@ -65,14 +64,6 @@ def __repr__(self):
def __hash__(self):
return hash((self.step, self.metric))

def to_runner_api(self):
return beam_fn_api_pb2.Metrics.User.MetricKey(
step=self.step, namespace=self.metric.namespace, name=self.metric.name)

@staticmethod
def from_runner_api(proto):
return MetricKey(proto.step, MetricName(proto.namespace, proto.name))


class MetricResult(object):
"""Keeps track of the status of a metric within a single bundle.
Expand Down Expand Up @@ -205,14 +196,12 @@ def get_cumulative(self):
def to_runner_api(self):
return (
[beam_fn_api_pb2.Metrics.User(
key=beam_fn_api_pb2.Metrics.User.MetricKey(
step=self.step_name, namespace=k.namespace, name=k.name),
metric_name=k.to_runner_api(),
counter_data=beam_fn_api_pb2.Metrics.User.CounterData(
value=v.get_cumulative()))
for k, v in self.counters.items()] +
[beam_fn_api_pb2.Metrics.User(
key=beam_fn_api_pb2.Metrics.User.MetricKey(
step=self.step_name, namespace=k.namespace, name=k.name),
metric_name=k.to_runner_api(),
distribution_data=v.get_cumulative().to_runner_api())
for k, v in self.distributions.items()])

Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/metrics/metricbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
- MetricName - Namespace and name used to refer to a Metric.
"""

from apache_beam.portability.api import beam_fn_api_pb2

__all__ = ['Metric', 'Counter', 'Distribution', 'MetricName']


Expand Down Expand Up @@ -65,6 +67,14 @@ def __str__(self):
def __hash__(self):
return hash((self.namespace, self.name))

def to_runner_api(self):
return beam_fn_api_pb2.Metrics.User.MetricName(
namespace=self.namespace, name=self.name)

@staticmethod
def from_runner_api(proto):
return MetricName(proto.namespace, proto.name)


class Metric(object):
"""Base interface of a metric object."""
Expand Down
21 changes: 12 additions & 9 deletions sdks/python/apache_beam/runners/portability/fn_api_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,15 +1174,18 @@ def __init__(self, step_metrics):
self._counters = {}
self._distributions = {}
for step_metric in step_metrics.values():
for proto in step_metric.user:
key = metrics.execution.MetricKey.from_runner_api(proto.key)
if proto.HasField('counter_data'):
self._counters[key] = proto.counter_data.value
elif proto.HasField('distribution_data'):
self._distributions[
key] = metrics.cells.DistributionResult(
metrics.cells.DistributionData.from_runner_api(
proto.distribution_data))
for ptransform_id, ptransform in step_metric.ptransforms.items():
for proto in ptransform.user:
key = metrics.execution.MetricKey(
ptransform_id,
metrics.metricbase.MetricName.from_runner_api(proto.metric_name))
if proto.HasField('counter_data'):
self._counters[key] = proto.counter_data.value
elif proto.HasField('distribution_data'):
self._distributions[
key] = metrics.cells.DistributionResult(
metrics.cells.DistributionData.from_runner_api(
proto.distribution_data))

def query(self, filter=None):
counters = [metrics.execution.MetricResult(k, v, v)
Expand Down
5 changes: 1 addition & 4 deletions sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,7 @@ def metrics(self):
ptransforms={
transform_id:
self._fix_output_tags(transform_id, op.progress_metrics())
for transform_id, op in self.ops.items()},
user=sum(
[op.metrics_container.to_runner_api() for op in self.ops.values()],
[]))
for transform_id, op in self.ops.items()})

def _fix_output_tags(self, transform_id, metrics):
# Outputs are still referred to by index, not by name, in many Operations.
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ def progress_metrics(self):
{'ONLY_OUTPUT': self.receivers[0].opcounter
.element_counter.value()}
if len(self.receivers) == 1
else None))))
else None))),
user=self.metrics_container.to_runner_api())

def __str__(self):
"""Generates a useful string for this object.
Expand Down