diff --git a/CHANGES.md b/CHANGES.md index 08e7ccfc75fe..6f84ddf85369 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -76,6 +76,9 @@ * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)). * [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496)) +* Add support for comma-separated options in Python SDK (Python) ([#35580](https://github.com/apache/beam/pull/35580)). + Python SDK now supports comma-separated values for experiments and dataflow_service_options, + matching Java SDK behavior while maintaining backward compatibility. * Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)). Beam now supports Milvus enrichment handler capabilities for vector, keyword, and hybrid search operations. diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index dad905fec79a..a7db5bfb0e71 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -220,6 +220,71 @@ def __call__(self, parser, namespace, values, option_string=None): % _GcsCustomAuditEntriesAction.MAX_ENTRIES) +class _CommaSeparatedListAction(argparse.Action): + """ + Argparse Action that splits comma-separated values and appends them to + a list. This allows options like --experiments=abc,def to be treated + as separate experiments 'abc' and 'def', similar to how Java SDK handles + them. + + If there are key=value experiments in a raw argument, the remaining part of + the argument are treated as values and won't split further. For example: + 'abc,def,master_key=k1=v1,k2=v2' becomes + ['abc', 'def', 'master_key=k1=v1,k2=v2']. + """ + def __call__(self, parser, namespace, values, option_string=None): + if not hasattr(namespace, self.dest) or getattr(namespace, + self.dest) is None: + setattr(namespace, self.dest, []) + + # Split comma-separated values and extend the list + if isinstance(values, str): + # Smart splitting: only split at commas that are not part of + # key=value pairs + split_values = self._smart_split(values) + getattr(namespace, self.dest).extend(split_values) + else: + # If values is not a string, just append it + getattr(namespace, self.dest).append(values) + + def _smart_split(self, values): + """Split comma-separated values, but preserve commas within + key=value pairs.""" + result = [] + current = [] + equals_depth = 0 + + i = 0 + while i < len(values): + char = values[i] + + if char == '=': + equals_depth += 1 + current.append(char) + elif char == ',' and equals_depth <= 1: + # This comma is a top-level separator (not inside a complex value) + if current: + result.append(''.join(current).strip()) + current = [] + equals_depth = 0 + elif char == ',' and equals_depth > 1: + # This comma is inside a complex value, keep it + current.append(char) + elif char == ' ' and not current: + # Skip leading spaces + pass + else: + current.append(char) + + i += 1 + + # Add the last item + if current: + result.append(''.join(current).strip()) + + return [v for v in result if v] # Filter out empty values + + class PipelineOptions(HasDisplayData): """This class and subclasses are used as containers for command line options. @@ -977,7 +1042,7 @@ def _add_argparse_args(cls, parser): '--dataflow_service_option', '--dataflow_service_options', dest='dataflow_service_options', - action='append', + action=_CommaSeparatedListAction, default=None, help=( 'Options to configure the Dataflow service. These ' @@ -1412,7 +1477,7 @@ def _add_argparse_args(cls, parser): '--experiment', '--experiments', dest='experiments', - action='append', + action=_CommaSeparatedListAction, default=None, help=( 'Runners may provide a number of experimental features that can be ' diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 099c9e80e21b..06270d4cd310 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -893,6 +893,58 @@ def test_validation_bad_stg_bad_temp_no_default(self): 'staging_location.' ]) + def test_comma_separated_experiments(self): + """Test that comma-separated experiments are parsed correctly.""" + # Test single experiment + options = PipelineOptions(['--experiments=abc']) + self.assertEqual(['abc'], options.get_all_options()['experiments']) + + # Test comma-separated experiments + options = PipelineOptions(['--experiments=abc,def,ghi']) + self.assertEqual(['abc', 'def', 'ghi'], + options.get_all_options()['experiments']) + + # Test multiple flags with comma-separated values + options = PipelineOptions( + ['--experiments=abc,def', '--experiments=ghi,jkl']) + self.assertEqual(['abc', 'def', 'ghi', 'jkl'], + options.get_all_options()['experiments']) + + # Test with spaces around commas + options = PipelineOptions(['--experiments=abc, def , ghi']) + self.assertEqual(['abc', 'def', 'ghi'], + options.get_all_options()['experiments']) + + # Test empty values are filtered out + options = PipelineOptions(['--experiments=abc,,def,']) + self.assertEqual(['abc', 'def'], options.get_all_options()['experiments']) + + def test_comma_separated_dataflow_service_options(self): + """Test that comma-separated dataflow service options are parsed + correctly.""" + # Test single option + options = PipelineOptions(['--dataflow_service_options=option1=value1']) + self.assertEqual(['option1=value1'], + options.get_all_options()['dataflow_service_options']) + + # Test comma-separated options + options = PipelineOptions([ + '--dataflow_service_options=option1=value1,option2=value2,' + 'option3=value3' + ]) + self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'], + options.get_all_options()['dataflow_service_options']) + + # Test multiple flags with comma-separated values + options = PipelineOptions([ + '--dataflow_service_options=option1=value1,option2=value2', + '--dataflow_service_options=option3=value3,option4=value4' + ]) + self.assertEqual([ + 'option1=value1', 'option2=value2', 'option3=value3', 'option4=value4' + ], + options.get_all_options()['dataflow_service_options']) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/value_provider_test.py b/sdks/python/apache_beam/options/value_provider_test.py index 42afa8c0def3..e88d23714646 100644 --- a/sdks/python/apache_beam/options/value_provider_test.py +++ b/sdks/python/apache_beam/options/value_provider_test.py @@ -216,8 +216,8 @@ def test_experiments_options_setup(self): options = PipelineOptions(['--experiments', 'a', '--experiments', 'b,c']) options = options.view_as(DebugOptions) self.assertIn('a', options.experiments) - self.assertIn('b,c', options.experiments) - self.assertNotIn('c', options.experiments) + self.assertIn('b', options.experiments) + self.assertIn('c', options.experiments) def test_nested_value_provider_wrap_static(self): vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1)