Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Add pip-based install support for JupyterLab Sidepanel extension ([#35397](https://github.com/apache/beam/issues/#35397)).
* [IcebergIO] Create tables with a specified table properties ([#35496](https://github.com/apache/beam/pull/35496))
* Add support for comma-separated options in Python SDK (Python) ([#35580](https://github.com/apache/beam/pull/35580)).
Python SDK now supports comma-separated values for experiments and dataflow_service_options,
matching Java SDK behavior while maintaining backward compatibility.
* Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)).
Beam now supports Milvus enrichment handler capabilities for vector, keyword,
and hybrid search operations.
Expand Down
69 changes: 67 additions & 2 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,71 @@ def __call__(self, parser, namespace, values, option_string=None):
% _GcsCustomAuditEntriesAction.MAX_ENTRIES)


class _CommaSeparatedListAction(argparse.Action):
"""
Argparse Action that splits comma-separated values and appends them to
a list. This allows options like --experiments=abc,def to be treated
as separate experiments 'abc' and 'def', similar to how Java SDK handles
them.

If there are key=value experiments in a raw argument, the remaining part of
the argument are treated as values and won't split further. For example:
'abc,def,master_key=k1=v1,k2=v2' becomes
['abc', 'def', 'master_key=k1=v1,k2=v2'].
"""
def __call__(self, parser, namespace, values, option_string=None):
if not hasattr(namespace, self.dest) or getattr(namespace,
self.dest) is None:
setattr(namespace, self.dest, [])

# Split comma-separated values and extend the list
if isinstance(values, str):
# Smart splitting: only split at commas that are not part of
# key=value pairs
split_values = self._smart_split(values)
getattr(namespace, self.dest).extend(split_values)
else:
# If values is not a string, just append it
getattr(namespace, self.dest).append(values)

def _smart_split(self, values):
"""Split comma-separated values, but preserve commas within
key=value pairs."""
result = []
current = []
equals_depth = 0

i = 0
while i < len(values):
char = values[i]

if char == '=':
equals_depth += 1
current.append(char)
elif char == ',' and equals_depth <= 1:
# This comma is a top-level separator (not inside a complex value)
if current:
result.append(''.join(current).strip())
current = []
equals_depth = 0
elif char == ',' and equals_depth > 1:
# This comma is inside a complex value, keep it
current.append(char)
elif char == ' ' and not current:
# Skip leading spaces
pass
else:
current.append(char)

i += 1

# Add the last item
if current:
result.append(''.join(current).strip())

return [v for v in result if v] # Filter out empty values


class PipelineOptions(HasDisplayData):
"""This class and subclasses are used as containers for command line options.

Expand Down Expand Up @@ -977,7 +1042,7 @@ def _add_argparse_args(cls, parser):
'--dataflow_service_option',
'--dataflow_service_options',
dest='dataflow_service_options',
action='append',
action=_CommaSeparatedListAction,
default=None,
help=(
'Options to configure the Dataflow service. These '
Expand Down Expand Up @@ -1412,7 +1477,7 @@ def _add_argparse_args(cls, parser):
'--experiment',
'--experiments',
dest='experiments',
action='append',
action=_CommaSeparatedListAction,
default=None,
help=(
'Runners may provide a number of experimental features that can be '
Expand Down
52 changes: 52 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,58 @@ def test_validation_bad_stg_bad_temp_no_default(self):
'staging_location.'
])

def test_comma_separated_experiments(self):
"""Test that comma-separated experiments are parsed correctly."""
# Test single experiment
options = PipelineOptions(['--experiments=abc'])
self.assertEqual(['abc'], options.get_all_options()['experiments'])

# Test comma-separated experiments
options = PipelineOptions(['--experiments=abc,def,ghi'])
self.assertEqual(['abc', 'def', 'ghi'],
options.get_all_options()['experiments'])

# Test multiple flags with comma-separated values
options = PipelineOptions(
['--experiments=abc,def', '--experiments=ghi,jkl'])
self.assertEqual(['abc', 'def', 'ghi', 'jkl'],
options.get_all_options()['experiments'])

# Test with spaces around commas
options = PipelineOptions(['--experiments=abc, def , ghi'])
self.assertEqual(['abc', 'def', 'ghi'],
options.get_all_options()['experiments'])

# Test empty values are filtered out
options = PipelineOptions(['--experiments=abc,,def,'])
self.assertEqual(['abc', 'def'], options.get_all_options()['experiments'])

def test_comma_separated_dataflow_service_options(self):
"""Test that comma-separated dataflow service options are parsed
correctly."""
# Test single option
options = PipelineOptions(['--dataflow_service_options=option1=value1'])
self.assertEqual(['option1=value1'],
options.get_all_options()['dataflow_service_options'])

# Test comma-separated options
options = PipelineOptions([
'--dataflow_service_options=option1=value1,option2=value2,'
'option3=value3'
])
self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'],
options.get_all_options()['dataflow_service_options'])

# Test multiple flags with comma-separated values
options = PipelineOptions([
'--dataflow_service_options=option1=value1,option2=value2',
'--dataflow_service_options=option3=value3,option4=value4'
])
self.assertEqual([
'option1=value1', 'option2=value2', 'option3=value3', 'option4=value4'
],
options.get_all_options()['dataflow_service_options'])


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/options/value_provider_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ def test_experiments_options_setup(self):
options = PipelineOptions(['--experiments', 'a', '--experiments', 'b,c'])
options = options.view_as(DebugOptions)
self.assertIn('a', options.experiments)
self.assertIn('b,c', options.experiments)
self.assertNotIn('c', options.experiments)
self.assertIn('b', options.experiments)
self.assertIn('c', options.experiments)

def test_nested_value_provider_wrap_static(self):
vp = NestedValueProvider(StaticValueProvider(int, 1), lambda x: x + 1)
Expand Down
Loading