Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,18 @@ class PipelineRunner(object):
"""

def run(self, transform, options=None):
"""Run the given transform with this runner.
"""Run the given transform or callable with this runner.
"""
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import PTransform
from apache_beam.pvalue import PBegin
from apache_beam.pipeline import Pipeline
p = Pipeline(runner=self, options=options)
p | transform
if isinstance(transform, PTransform):
p | transform
else:
transform(PBegin(p))
return p.run()

def run_pipeline(self, pipeline):
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/runners/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ def test_run_api(self):
my_metric_value = result.metrics().query()['counters'][0].committed
self.assertEqual(my_metric_value, 111)

def test_run_api_with_callable(self):
my_metric = Metrics.counter('namespace', 'my_metric')

def fn(start):
return (start
| beam.Create([1, 10, 100])
| beam.Map(lambda x: my_metric.inc(x)))
runner = DirectRunner()
result = runner.run(fn)
result.wait_until_finish()
# Use counters to assert the pipeline actually ran.
my_metric_value = result.metrics().query()['counters'][0].committed
self.assertEqual(my_metric_value, 111)


if __name__ == '__main__':
unittest.main()