From 84bbd887ab7160d86a49345a1453e3995902b586 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 13 Jul 2025 14:14:18 -0400 Subject: [PATCH 1/7] feat(options): add support for comma-separated experiments and service options Implement _CommaSeparatedListAction to handle comma-separated values for experiments and dataflow_service_options, matching Java SDK behavior. This allows more flexible input formats while maintaining backward compatibility. --- .../apache_beam/options/pipeline_options.py | 75 +++++++++++++++++-- .../options/pipeline_options_test.py | 50 +++++++++++++ 2 files changed, 117 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index dad905fec79a..3b4f501a027a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -143,8 +143,8 @@ class _DictUnionAction(argparse.Action): than one of the values, the last value takes precedence. """ def __call__(self, parser, namespace, values, option_string=None): - if not hasattr(namespace, self.dest) or getattr(namespace, - self.dest) is None: + if not hasattr(namespace, + self.dest) or getattr(namespace, self.dest) is None: setattr(namespace, self.dest, {}) getattr(namespace, self.dest).update(values) @@ -194,8 +194,8 @@ def _add_entry(self, key, value): key] = value def __call__(self, parser, namespace, values, option_string=None): - if not hasattr(namespace, self.dest) or getattr(namespace, - self.dest) is None: + if not hasattr(namespace, + self.dest) or getattr(namespace, self.dest) is None: setattr(namespace, self.dest, {}) self._custom_audit_entries = getattr(namespace, self.dest) @@ -220,6 +220,66 @@ def __call__(self, parser, namespace, values, option_string=None): % _GcsCustomAuditEntriesAction.MAX_ENTRIES) +class _CommaSeparatedListAction(argparse.Action): + """ + Argparse Action that splits comma-separated values and appends them to a list. + This allows options like --experiments=abc,def to be treated as separate + experiments 'abc' and 'def', similar to how Java SDK handles them. + + For key=value experiments, only splits at commas that are not part of the value. + For example: 'abc,def,master_key=k1=v1,k2=v2' becomes ['abc', 'def', 'master_key=k1=v1,k2=v2'] + """ + def __call__(self, parser, namespace, values, option_string=None): + if not hasattr(namespace, + self.dest) or getattr(namespace, self.dest) is None: + setattr(namespace, self.dest, []) + + # Split comma-separated values and extend the list + if isinstance(values, str): + # Smart splitting: only split at commas that are not part of key=value pairs + split_values = self._smart_split(values) + getattr(namespace, self.dest).extend(split_values) + else: + # If values is not a string, just append it + getattr(namespace, self.dest).append(values) + + def _smart_split(self, values): + """Split comma-separated values, but preserve commas within key=value pairs.""" + result = [] + current = [] + equals_depth = 0 + + i = 0 + while i < len(values): + char = values[i] + + if char == '=': + equals_depth += 1 + current.append(char) + elif char == ',' and equals_depth <= 1: + # This comma is a top-level separator (not inside a complex value) + if current: + result.append(''.join(current).strip()) + current = [] + equals_depth = 0 + elif char == ',' and equals_depth > 1: + # This comma is inside a complex value, keep it + current.append(char) + elif char == ' ' and not current: + # Skip leading spaces + pass + else: + current.append(char) + + i += 1 + + # Add the last item + if current: + result.append(''.join(current).strip()) + + return [v for v in result if v] # Filter out empty values + + class PipelineOptions(HasDisplayData): """This class and subclasses are used as containers for command line options. @@ -727,8 +787,7 @@ def additional_option_ptransform_fn(): # Optional type checks that aren't enabled by default. additional_type_checks: Dict[str, Callable[[], None]] = { - 'ptransform_fn': additional_option_ptransform_fn, -} + 'ptransform_fn': additional_option_ptransform_fn, } def enable_all_additional_type_checks(): @@ -977,7 +1036,7 @@ def _add_argparse_args(cls, parser): '--dataflow_service_option', '--dataflow_service_options', dest='dataflow_service_options', - action='append', + action=_CommaSeparatedListAction, default=None, help=( 'Options to configure the Dataflow service. These ' @@ -1412,7 +1471,7 @@ def _add_argparse_args(cls, parser): '--experiment', '--experiments', dest='experiments', - action='append', + action=_CommaSeparatedListAction, default=None, help=( 'Runners may provide a number of experimental features that can be ' diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 099c9e80e21b..89199f5c3d6c 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -893,6 +893,56 @@ def test_validation_bad_stg_bad_temp_no_default(self): 'staging_location.' ]) + def test_comma_separated_experiments(self): + """Test that comma-separated experiments are parsed correctly.""" + # Test single experiment + options = PipelineOptions(['--experiments=abc']) + self.assertEqual(['abc'], options.get_all_options()['experiments']) + + # Test comma-separated experiments + options = PipelineOptions(['--experiments=abc,def,ghi']) + self.assertEqual(['abc', 'def', 'ghi'], + options.get_all_options()['experiments']) + + # Test multiple flags with comma-separated values + options = PipelineOptions( + ['--experiments=abc,def', '--experiments=ghi,jkl']) + self.assertEqual(['abc', 'def', 'ghi', 'jkl'], + options.get_all_options()['experiments']) + + # Test with spaces around commas + options = PipelineOptions(['--experiments=abc, def , ghi']) + self.assertEqual(['abc', 'def', 'ghi'], + options.get_all_options()['experiments']) + + # Test empty values are filtered out + options = PipelineOptions(['--experiments=abc,,def,']) + self.assertEqual(['abc', 'def'], options.get_all_options()['experiments']) + + def test_comma_separated_dataflow_service_options(self): + """Test that comma-separated dataflow service options are parsed correctly.""" + # Test single option + options = PipelineOptions(['--dataflow_service_options=option1=value1']) + self.assertEqual(['option1=value1'], + options.get_all_options()['dataflow_service_options']) + + # Test comma-separated options + options = PipelineOptions([ + '--dataflow_service_options=option1=value1,option2=value2,option3=value3' + ]) + self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'], + options.get_all_options()['dataflow_service_options']) + + # Test multiple flags with comma-separated values + options = PipelineOptions([ + '--dataflow_service_options=option1=value1,option2=value2', + '--dataflow_service_options=option3=value3,option4=value4' + ]) + self.assertEqual([ + 'option1=value1', 'option2=value2', 'option3=value3', 'option4=value4' + ], + options.get_all_options()['dataflow_service_options']) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From d038216d0fe74a33b731756b5231ab6f2ee0b692 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 13 Jul 2025 15:04:58 -0400 Subject: [PATCH 2/7] fix lint --- sdks/python/apache_beam/options/pipeline_options.py | 9 ++++++--- sdks/python/apache_beam/options/pipeline_options_test.py | 6 ++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 3b4f501a027a..f99000b99e77 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -227,7 +227,8 @@ class _CommaSeparatedListAction(argparse.Action): experiments 'abc' and 'def', similar to how Java SDK handles them. For key=value experiments, only splits at commas that are not part of the value. - For example: 'abc,def,master_key=k1=v1,k2=v2' becomes ['abc', 'def', 'master_key=k1=v1,k2=v2'] + For example: 'abc,def,master_key=k1=v1,k2=v2' becomes + ['abc', 'def', 'master_key=k1=v1,k2=v2'] """ def __call__(self, parser, namespace, values, option_string=None): if not hasattr(namespace, @@ -236,7 +237,8 @@ def __call__(self, parser, namespace, values, option_string=None): # Split comma-separated values and extend the list if isinstance(values, str): - # Smart splitting: only split at commas that are not part of key=value pairs + # Smart splitting: only split at commas that are not part of + # key=value pairs split_values = self._smart_split(values) getattr(namespace, self.dest).extend(split_values) else: @@ -244,7 +246,8 @@ def __call__(self, parser, namespace, values, option_string=None): getattr(namespace, self.dest).append(values) def _smart_split(self, values): - """Split comma-separated values, but preserve commas within key=value pairs.""" + """Split comma-separated values, but preserve commas within + key=value pairs.""" result = [] current = [] equals_depth = 0 diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 89199f5c3d6c..06270d4cd310 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -920,7 +920,8 @@ def test_comma_separated_experiments(self): self.assertEqual(['abc', 'def'], options.get_all_options()['experiments']) def test_comma_separated_dataflow_service_options(self): - """Test that comma-separated dataflow service options are parsed correctly.""" + """Test that comma-separated dataflow service options are parsed + correctly.""" # Test single option options = PipelineOptions(['--dataflow_service_options=option1=value1']) self.assertEqual(['option1=value1'], @@ -928,7 +929,8 @@ def test_comma_separated_dataflow_service_options(self): # Test comma-separated options options = PipelineOptions([ - '--dataflow_service_options=option1=value1,option2=value2,option3=value3' + '--dataflow_service_options=option1=value1,option2=value2,' + 'option3=value3' ]) self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'], options.get_all_options()['dataflow_service_options']) From dbe814c80833c5e735deb6b7033b4d630cbfebfc Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 13 Jul 2025 19:25:20 -0400 Subject: [PATCH 3/7] lint --- sdks/python/apache_beam/options/pipeline_options.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index f99000b99e77..7424238bec79 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -222,12 +222,13 @@ def __call__(self, parser, namespace, values, option_string=None): class _CommaSeparatedListAction(argparse.Action): """ - Argparse Action that splits comma-separated values and appends them to a list. - This allows options like --experiments=abc,def to be treated as separate - experiments 'abc' and 'def', similar to how Java SDK handles them. + Argparse Action that splits comma-separated values and appends them to + a list. This allows options like --experiments=abc,def to be treated + as separate experiments 'abc' and 'def', similar to how Java SDK handles + them. - For key=value experiments, only splits at commas that are not part of the value. - For example: 'abc,def,master_key=k1=v1,k2=v2' becomes + For key=value experiments, only splits at commas that are not part of the + value. For example: 'abc,def,master_key=k1=v1,k2=v2' becomes ['abc', 'def', 'master_key=k1=v1,k2=v2'] """ def __call__(self, parser, namespace, values, option_string=None): From d0fa84123d29f066c8d84b63665185b7c644dd65 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 13 Jul 2025 19:38:26 -0400 Subject: [PATCH 4/7] yapf --- .../apache_beam/options/pipeline_options.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7424238bec79..136f7cad75f8 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -143,8 +143,8 @@ class _DictUnionAction(argparse.Action): than one of the values, the last value takes precedence. """ def __call__(self, parser, namespace, values, option_string=None): - if not hasattr(namespace, - self.dest) or getattr(namespace, self.dest) is None: + if not hasattr(namespace, self.dest) or getattr(namespace, + self.dest) is None: setattr(namespace, self.dest, {}) getattr(namespace, self.dest).update(values) @@ -194,8 +194,8 @@ def _add_entry(self, key, value): key] = value def __call__(self, parser, namespace, values, option_string=None): - if not hasattr(namespace, - self.dest) or getattr(namespace, self.dest) is None: + if not hasattr(namespace, self.dest) or getattr(namespace, + self.dest) is None: setattr(namespace, self.dest, {}) self._custom_audit_entries = getattr(namespace, self.dest) @@ -232,8 +232,8 @@ class _CommaSeparatedListAction(argparse.Action): ['abc', 'def', 'master_key=k1=v1,k2=v2'] """ def __call__(self, parser, namespace, values, option_string=None): - if not hasattr(namespace, - self.dest) or getattr(namespace, self.dest) is None: + if not hasattr(namespace, self.dest) or getattr(namespace, + self.dest) is None: setattr(namespace, self.dest, []) # Split comma-separated values and extend the list @@ -791,7 +791,8 @@ def additional_option_ptransform_fn(): # Optional type checks that aren't enabled by default. additional_type_checks: Dict[str, Callable[[], None]] = { - 'ptransform_fn': additional_option_ptransform_fn, } + 'ptransform_fn': additional_option_ptransform_fn, +} def enable_all_additional_type_checks(): From 38711371703daffa382e3d49c0fa7591fdd16155 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 13 Jul 2025 20:03:18 -0400 Subject: [PATCH 5/7] updated the test --- sdks/python/apache_beam/options/value_provider_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/options/value_provider_test.py b/sdks/python/apache_beam/options/value_provider_test.py index 42afa8c0def3..e88d23714646 100644 --- a/sdks/python/apache_beam/options/value_provider_test.py +++ b/sdks/python/apache_beam/options/value_provider_test.py @@ -216,8 +216,8 @@ def test_experiments_options_setup(self): options = PipelineOptions(['--experiments', 'a', '--experiments', 'b,c']) options = options.view_as(DebugOptions) self.assertIn('a', options.experiments) - self.assertIn('b,c', options.experiments) - self.assertNotIn('c', options.experiments) + self.assertIn('b', options.experiments) + self.assertIn('c', options.experiments) def test_nested_value_provider_wrap_static(self): vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1) From 2b289f2804d76e75e857928a058d2d7c33f85de3 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sun, 13 Jul 2025 20:07:15 -0400 Subject: [PATCH 6/7] updated changes.md --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 08e7ccfc75fe..6f84ddf85369 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -76,6 +76,9 @@ * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)). * [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496)) +* Add support for comma-separated options in Python SDK (Python) ([#35580](https://github.com/apache/beam/pull/35580)). + Python SDK now supports comma-separated values for experiments and dataflow_service_options, + matching Java SDK behavior while maintaining backward compatibility. * Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)). Beam now supports Milvus enrichment handler capabilities for vector, keyword, and hybrid search operations. From 1d0306c47bcb009e1bc521fbef3dc804ddacef72 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Mon, 14 Jul 2025 19:35:20 -0400 Subject: [PATCH 7/7] polished docstrings --- sdks/python/apache_beam/options/pipeline_options.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 136f7cad75f8..a7db5bfb0e71 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -227,9 +227,10 @@ class _CommaSeparatedListAction(argparse.Action): as separate experiments 'abc' and 'def', similar to how Java SDK handles them. - For key=value experiments, only splits at commas that are not part of the - value. For example: 'abc,def,master_key=k1=v1,k2=v2' becomes - ['abc', 'def', 'master_key=k1=v1,k2=v2'] + If there are key=value experiments in a raw argument, the remaining part of + the argument are treated as values and won't split further. For example: + 'abc,def,master_key=k1=v1,k2=v2' becomes + ['abc', 'def', 'master_key=k1=v1,k2=v2']. """ def __call__(self, parser, namespace, values, option_string=None): if not hasattr(namespace, self.dest) or getattr(namespace,