diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index c4aa68697585..cd76f07133fc 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -353,7 +353,7 @@ def test_rename(self): class FusionTest(unittest.TestCase): @staticmethod def fused_stages(p): - return p.result.metrics().query( + return p.result.monitoring_metrics().query( metrics.MetricsFilter().with_name( fn_runner.FnApiRunner.NUM_FUSED_STAGES_COUNTER) )['counters'][0].result diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 9b08427c347d..9a3e1000904e 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -358,8 +358,11 @@ def run_stages(self, stage_context.components.environments, self._provision_info) pipeline_metrics = MetricsContainer('') pipeline_metrics.get_counter( - MetricName(str(type(self)), - self.NUM_FUSED_STAGES_COUNTER)).update(len(stages)) + MetricName( + str(type(self)), + self.NUM_FUSED_STAGES_COUNTER, + urn='internal:' + self.NUM_FUSED_STAGES_COUNTER)).update( + len(stages)) monitoring_infos_by_stage = {} runner_execution_context = execution.FnApiRunnerExecutionContext(