From b49d3078ea82273aca06ef2c8fbf0fde50cee13f Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 29 Jan 2025 13:12:58 -0800 Subject: [PATCH 1/8] The Bag Partition is now configurable. Configuring the number of partitions in the Dask runner is very important to tune performance. This CL gives users control over this parameter. --- .../apache_beam/runners/dask/dask_runner.py | 40 ++++++++++++++++--- .../runners/dask/dask_runner_test.py | 19 +++++++++ .../runners/dask/transform_evaluator.py | 29 ++++++++++++-- 3 files changed, 80 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index cc17d9919b8e..6ea4ed41ca47 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -58,6 +58,18 @@ def _parse_timeout(candidate): import dask return dask.config.no_default + @staticmethod + def _extract_bag_kwargs(dask_options: t.Dict) -> t.Dict: + """Parse keyword arguments for `dask.Bag`s, used during graph translation.""" + out = {} + + if npartitions := dask_options.pop('npartitions', None): + out['npartitions'] = npartitions + if partition_size := dask_options.pop('partition_size', None): + out['partition_size'] = partition_size + + return out + @classmethod def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None: parser.add_argument( @@ -93,7 +105,21 @@ def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None: default=512, help='The number of open comms to maintain at once in the connection ' 'pool.') - + partitions_parser = parser.add_mutually_exclusive_group() + partitions_parser.add_argument( + '--dask_npartitions', + dest='npartitions', + type=int, + default=None, + help='The desired number of `dask.Bag` partitions. When unspecified, ' + 'an educated guess is made.') + partitions_parser.add_argument( + '--dask_partition_size', + dest='partition_size', + type=int, + default=None, + help='The length of each `dask.Bag` partition. When unspecified, ' + 'an educated guess is made.') @dataclasses.dataclass class DaskRunnerResult(PipelineResult): @@ -139,9 +165,12 @@ def metrics(self): class DaskRunner(BundleBasedDirectRunner): """Executes a pipeline on a Dask distributed client.""" @staticmethod - def to_dask_bag_visitor() -> PipelineVisitor: + def to_dask_bag_visitor(bag_kwargs=None) -> PipelineVisitor: from dask import bag as db + if bag_kwargs is None: + bag_kwargs = {} + @dataclasses.dataclass class DaskBagVisitor(PipelineVisitor): bags: t.Dict[AppliedPTransform, db.Bag] = dataclasses.field( @@ -149,7 +178,7 @@ class DaskBagVisitor(PipelineVisitor): def visit_transform(self, transform_node: AppliedPTransform) -> None: op_class = TRANSLATIONS.get(transform_node.transform.__class__, NoOp) - op = op_class(transform_node) + op = op_class(transform_node, bag_kwargs=bag_kwargs) op_kws = {"input_bag": None, "side_inputs": None} inputs = list(transform_node.inputs) @@ -195,7 +224,7 @@ def is_fnapi_compatible(): def run_pipeline(self, pipeline, options): import dask - # TODO(alxr): Create interactive notebook support. + # TODO(alxmrs): Create interactive notebook support. if is_in_notebook(): raise NotImplementedError('interactive support will come later!') @@ -207,11 +236,12 @@ def run_pipeline(self, pipeline, options): dask_options = options.view_as(DaskOptions).get_all_options( drop_default=True) + bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options) client = ddist.Client(**dask_options) pipeline.replace_all(dask_overrides()) - dask_visitor = self.to_dask_bag_visitor() + dask_visitor = self.to_dask_bag_visitor(bag_kwargs) pipeline.visit(dask_visitor) # The dictionary in this visitor keeps a mapping of every Beam # PTransform to the equivalent Bag operation. This is highly diff --git a/sdks/python/apache_beam/runners/dask/dask_runner_test.py b/sdks/python/apache_beam/runners/dask/dask_runner_test.py index 66dda4a984f4..afe363ba3ee6 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner_test.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner_test.py @@ -66,6 +66,25 @@ def test_parser_destinations__agree_with_dask_client(self): with self.subTest(f'{opt_name} in dask.distributed.Client constructor'): self.assertIn(opt_name, client_args) + def test_parser_extract_bag_kwargs__deletes_dask_kwargs(self): + options = PipelineOptions('--dask_npartitions 8'.split()) + dask_options = options.view_as(DaskOptions).get_all_options() + + self.assertIn('npartitions', dask_options) + bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options) + self.assertNotIn('npartitions', dask_options) + self.assertEqual(bag_kwargs, {'npartitions': 8}) + + def test_parser_extract_bag_kwargs__unconfigured(self): + options = PipelineOptions() + dask_options = options.view_as(DaskOptions).get_all_options() + + # It's present as a default option. + self.assertIn('npartitions', dask_options) + bag_kwargs = DaskOptions._extract_bag_kwargs(dask_options) + self.assertNotIn('npartitions', dask_options) + self.assertEqual(bag_kwargs, {}) + class DaskRunnerRunPipelineTest(unittest.TestCase): """Test class used to introspect the dask runner via a debugger.""" diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index e72ebcce8b13..f35fb8b41bcc 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -127,8 +127,11 @@ class DaskBagOp(abc.ABC): Attributes applied: The underlying `AppliedPTransform` which holds the code for the target operation. + bag_kwargs: (optional) Keyword arguments applied to input bags, usually + from the pipeline's `DaskOptions`. """ applied: AppliedPTransform + bag_kwargs: t.Dict = dataclasses.field(default_factory=dict) @property def transform(self): @@ -151,10 +154,30 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag: assert input_bag is None, 'Create expects no input!' original_transform = t.cast(_Create, self.transform) items = original_transform.values + + npartitions = self.bag_kwargs.get('npartitions') + partition_size = self.bag_kwargs.get('partition_size') + if npartitions and partition_size: + raise ValueError( + f'Please specify either `dask_npartitions` or ' + f'`dask_parition_size` but not both: ' + f'{npartitions=}, {partition_size=}.' + ) + if not npartitions and not partition_size: + # partition_size is inversely related to `npartitions`. + # Ideal "chunk sizes" in dask are around 10-100 MBs. + # Let's hope ~128 items per partition is around this + # memory overhead. + partition_size = max( + 128, + math.ceil(math.sqrt(len(items)) / 10) + ) + return db.from_sequence( - items, - partition_size=max( - 1, math.ceil(math.sqrt(len(items)) / math.sqrt(100)))) + items, + npartitions=npartitions, + partition_size=partition_size + ) def apply_dofn_to_bundle( From 486f207f6e8c6291b743505e22c5ffa219b2c350 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 29 Jan 2025 14:07:26 -0800 Subject: [PATCH 2/8] Apply formatter. --- .../apache_beam/runners/dask/dask_runner.py | 5 ++-- .../runners/dask/transform_evaluator.py | 23 +++++++------------ 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 6ea4ed41ca47..2ba9092c62fd 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -112,14 +112,15 @@ def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None: type=int, default=None, help='The desired number of `dask.Bag` partitions. When unspecified, ' - 'an educated guess is made.') + 'an educated guess is made.') partitions_parser.add_argument( '--dask_partition_size', dest='partition_size', type=int, default=None, help='The length of each `dask.Bag` partition. When unspecified, ' - 'an educated guess is made.') + 'an educated guess is made.') + @dataclasses.dataclass class DaskRunnerResult(PipelineResult): diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index f35fb8b41bcc..77c0b977c65c 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -158,26 +158,19 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag: npartitions = self.bag_kwargs.get('npartitions') partition_size = self.bag_kwargs.get('partition_size') if npartitions and partition_size: - raise ValueError( + raise ValueError( f'Please specify either `dask_npartitions` or ' f'`dask_parition_size` but not both: ' - f'{npartitions=}, {partition_size=}.' - ) + f'{npartitions=}, {partition_size=}.') if not npartitions and not partition_size: - # partition_size is inversely related to `npartitions`. - # Ideal "chunk sizes" in dask are around 10-100 MBs. - # Let's hope ~128 items per partition is around this - # memory overhead. - partition_size = max( - 128, - math.ceil(math.sqrt(len(items)) / 10) - ) + # partition_size is inversely related to `npartitions`. + # Ideal "chunk sizes" in dask are around 10-100 MBs. + # Let's hope ~128 items per partition is around this + # memory overhead. + partition_size = max(128, math.ceil(math.sqrt(len(items)) / 10)) return db.from_sequence( - items, - npartitions=npartitions, - partition_size=partition_size - ) + items, npartitions=npartitions, partition_size=partition_size) def apply_dofn_to_bundle( From 059c753a865ea7064c37b983bb7a3844071bc6be Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 29 Jan 2025 15:48:30 -0800 Subject: [PATCH 3/8] Passing lint via the `run_pylint.sh` script. --- sdks/python/apache_beam/runners/dask/dask_runner.py | 2 +- sdks/python/apache_beam/runners/dask/overrides.py | 1 + .../python/apache_beam/runners/dask/transform_evaluator.py | 7 ++++--- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 2ba9092c62fd..8975fcf1e138 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -60,7 +60,7 @@ def _parse_timeout(candidate): @staticmethod def _extract_bag_kwargs(dask_options: t.Dict) -> t.Dict: - """Parse keyword arguments for `dask.Bag`s, used during graph translation.""" + """Parse keyword arguments for `dask.Bag`s; used in graph translation.""" out = {} if npartitions := dask_options.pop('npartitions', None): diff --git a/sdks/python/apache_beam/runners/dask/overrides.py b/sdks/python/apache_beam/runners/dask/overrides.py index b952834f12d7..4870eadff0f8 100644 --- a/sdks/python/apache_beam/runners/dask/overrides.py +++ b/sdks/python/apache_beam/runners/dask/overrides.py @@ -15,6 +15,7 @@ # limitations under the License. # import dataclasses + import typing as t import apache_beam as beam diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index 77c0b977c65c..ea982c403551 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -20,14 +20,15 @@ A minimum set of operation substitutions, to adap Beam's PTransform model to Dask Bag functions. """ -import abc import dataclasses +from dataclasses import field + +import abc +import dask.bag as db import math import typing as t -from dataclasses import field import apache_beam -import dask.bag as db from apache_beam import DoFn from apache_beam import TaggedOutput from apache_beam.pipeline import AppliedPTransform From 3e7ccb694be3a62792a2ef5a32e41510c05f4838 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 30 Jan 2025 15:09:00 -0800 Subject: [PATCH 4/8] Implementing review feedback. --- CHANGES.md | 1 + .../apache_beam/runners/dask/transform_evaluator.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 799d26dc05e3..fde00b9da4ce 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -86,6 +86,7 @@ * Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651)) * Support the AnyOf Environment for execution in Prism ([#33705](https://github.com/apache/beam/pull/33705)) * This improves support for developing Xlang pipelines, when using a compatible cross language service. +* Partitions are now configurable for the DaskRunner in the Python SDK ([#33805](https://github.com/apache/beam/pull/33805)). ## Breaking Changes diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index ea982c403551..bde6f328a2ab 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -25,6 +25,7 @@ import abc import dask.bag as db +import logging import math import typing as t @@ -53,6 +54,7 @@ # Value types for PCollections (possibly Windowed Values). PCollVal = t.Union[WindowedValue, t.Any] +_LOGGER = logging.getLogger(__name__) def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue: """Wraps a value (item) inside a Window.""" @@ -168,7 +170,11 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag: # Ideal "chunk sizes" in dask are around 10-100 MBs. # Let's hope ~128 items per partition is around this # memory overhead. - partition_size = max(128, math.ceil(math.sqrt(len(items)) / 10)) + default_size = 128 + partition_size = max(default_size, math.ceil(math.sqrt(len(items)) / 10)) + if partition_size == default_size: + _LOGGER.warning('The new default partition size is %d, it used to be 1 ' + 'in previous DaskRunner versions.' % default_size) return db.from_sequence( items, npartitions=npartitions, partition_size=partition_size) From e665928e90acd9095e07d6db219745c2c08927da Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 30 Jan 2025 15:24:34 -0800 Subject: [PATCH 5/8] Attempting to pass lint/fmt check. --- sdks/python/apache_beam/runners/dask/transform_evaluator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index bde6f328a2ab..ae9060cb6280 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -56,6 +56,7 @@ _LOGGER = logging.getLogger(__name__) + def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue: """Wraps a value (item) inside a Window.""" if isinstance(item, TaggedOutput): @@ -173,8 +174,9 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag: default_size = 128 partition_size = max(default_size, math.ceil(math.sqrt(len(items)) / 10)) if partition_size == default_size: - _LOGGER.warning('The new default partition size is %d, it used to be 1 ' - 'in previous DaskRunner versions.' % default_size) + _LOGGER.warning( + 'The new default partition size is %d, it used to be 1 ' + 'in previous DaskRunner versions.' % default_size) return db.from_sequence( items, npartitions=npartitions, partition_size=partition_size) From 0f07780203e254ffc8f3111c30cc6ccc555ecda5 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 30 Jan 2025 16:52:25 -0800 Subject: [PATCH 6/8] Fixing isort issues by reading CI output. --- sdks/python/apache_beam/runners/dask/overrides.py | 1 - sdks/python/apache_beam/runners/dask/transform_evaluator.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/overrides.py b/sdks/python/apache_beam/runners/dask/overrides.py index 4870eadff0f8..b952834f12d7 100644 --- a/sdks/python/apache_beam/runners/dask/overrides.py +++ b/sdks/python/apache_beam/runners/dask/overrides.py @@ -15,7 +15,6 @@ # limitations under the License. # import dataclasses - import typing as t import apache_beam as beam diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index ae9060cb6280..19186f2c6d32 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -20,16 +20,16 @@ A minimum set of operation substitutions, to adap Beam's PTransform model to Dask Bag functions. """ +import abc import dataclasses -from dataclasses import field -import abc -import dask.bag as db import logging import math import typing as t +from dataclasses import field import apache_beam +import dask.bag as db from apache_beam import DoFn from apache_beam import TaggedOutput from apache_beam.pipeline import AppliedPTransform From 5f75d5c1dc53d886b4e60e56055185b9f5b5726a Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 30 Jan 2025 17:12:36 -0800 Subject: [PATCH 7/8] More indentation. --- sdks/python/apache_beam/runners/dask/transform_evaluator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index 19186f2c6d32..c615e48d1559 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -175,8 +175,8 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag: partition_size = max(default_size, math.ceil(math.sqrt(len(items)) / 10)) if partition_size == default_size: _LOGGER.warning( - 'The new default partition size is %d, it used to be 1 ' - 'in previous DaskRunner versions.' % default_size) + 'The new default partition size is %d, it used to be 1 ' + 'in previous DaskRunner versions.' % default_size) return db.from_sequence( items, npartitions=npartitions, partition_size=partition_size) From 550914d41a081cbc557a99947aa06065501562a5 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 31 Jan 2025 15:19:47 -0800 Subject: [PATCH 8/8] rm blank like for isort. --- sdks/python/apache_beam/runners/dask/transform_evaluator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index c615e48d1559..7cad1fe40451 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -22,7 +22,6 @@ """ import abc import dataclasses - import logging import math import typing as t