diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c30a902063e0..47e1d9b26241 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -20,6 +20,7 @@ # pytype: skip-file import argparse +import difflib import json import logging import os @@ -449,11 +450,30 @@ def from_dictionary(cls, options): return cls(flags) + @staticmethod + def _warn_on_unknown_options(unknown_args, parser): + if not unknown_args: + return + + all_known_options = [ + opt for action in parser._actions for opt in action.option_strings + ] + + for arg in unknown_args: + msg = f"Unparseable argument: {arg}" + if arg.startswith('--'): + arg_name = arg.split('=', 1)[0] + suggestions = difflib.get_close_matches(arg_name, all_known_options) + if suggestions: + msg += f". Did you mean '{suggestions[0]}'?'" + _LOGGER.warning(msg) + def get_all_options( self, drop_default=False, add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None, - retain_unknown_options=False) -> Dict[str, Any]: + retain_unknown_options=False, + display_warnings=False) -> Dict[str, Any]: """Returns a dictionary of all defined arguments. Returns a dictionary of all defined arguments (arguments that are defined in @@ -485,12 +505,11 @@ def get_all_options( add_extra_args_fn(parser) known_args, unknown_args = parser.parse_known_args(self._flags) - if retain_unknown_options: - if unknown_args: - _LOGGER.warning( - 'Unknown pipeline options received: %s. Ignore if flags are ' - 'used for internal purposes.' % (','.join(unknown_args))) + if display_warnings: + self._warn_on_unknown_options(unknown_args, parser) + + if retain_unknown_options: seen = set() def add_new_arg(arg, **kwargs): diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 0ed5a435e788..884ca124b0f6 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -580,6 +580,10 @@ def run(self, test_runner_api='AUTO'): # type: (Union[bool, str]) -> PipelineResult """Runs the pipeline. Returns whatever our runner returns after running.""" + # All pipeline options are finalized at this point. + # Call get_all_options to print warnings on invalid options. + self.options.get_all_options( + retain_unknown_options=True, display_warnings=True) for error_handler in self._error_handlers: error_handler.verify_closed()