Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# pytype: skip-file

import argparse
import difflib
import json
import logging
import os
Expand Down Expand Up @@ -449,11 +450,30 @@ def from_dictionary(cls, options):

return cls(flags)

@staticmethod
def _warn_on_unknown_options(unknown_args, parser):
if not unknown_args:
return

all_known_options = [
opt for action in parser._actions for opt in action.option_strings
]

for arg in unknown_args:
msg = f"Unparseable argument: {arg}"
if arg.startswith('--'):
arg_name = arg.split('=', 1)[0]
suggestions = difflib.get_close_matches(arg_name, all_known_options)
if suggestions:
msg += f". Did you mean '{suggestions[0]}'?'"
_LOGGER.warning(msg)

def get_all_options(
self,
drop_default=False,
add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None,
retain_unknown_options=False) -> Dict[str, Any]:
retain_unknown_options=False,
display_warnings=False) -> Dict[str, Any]:
"""Returns a dictionary of all defined arguments.

Returns a dictionary of all defined arguments (arguments that are defined in
Expand Down Expand Up @@ -485,12 +505,11 @@ def get_all_options(
add_extra_args_fn(parser)

known_args, unknown_args = parser.parse_known_args(self._flags)
if retain_unknown_options:
if unknown_args:
_LOGGER.warning(
'Unknown pipeline options received: %s. Ignore if flags are '
'used for internal purposes.' % (','.join(unknown_args)))

if display_warnings:
self._warn_on_unknown_options(unknown_args, parser)

if retain_unknown_options:
seen = set()

def add_new_arg(arg, **kwargs):
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ def run(self, test_runner_api='AUTO'):
# type: (Union[bool, str]) -> PipelineResult

"""Runs the pipeline. Returns whatever our runner returns after running."""
# All pipeline options are finalized at this point.
# Call get_all_options to print warnings on invalid options.
self.options.get_all_options(
retain_unknown_options=True, display_warnings=True)

for error_handler in self._error_handlers:
error_handler.verify_closed()
Expand Down
Loading