From a2aac6c4c998c7e07021c840fdb60a075348df85 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 5 Sep 2025 11:27:53 -0700 Subject: [PATCH 1/3] Hint Suggestions for invalid pipeline options --- .../apache_beam/options/pipeline_options.py | 34 +++++++++++++++---- sdks/python/apache_beam/pipeline.py | 4 +++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c30a902063e0..6386682705fa 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -20,6 +20,7 @@ # pytype: skip-file import argparse +import difflib import json import logging import os @@ -449,11 +450,31 @@ def from_dictionary(cls, options): return cls(flags) + @staticmethod + def _warn_on_unknown_options(unknown_args, parser): + if not unknown_args: + return + + all_known_options = [ + opt for action in parser._actions for opt in action.option_strings + ] + + for arg in unknown_args: + if not arg.startswith('--'): + continue + arg_name = arg.split('=', 1)[0] + suggestions = difflib.get_close_matches(arg_name, all_known_options) + msg = f"Unparseable argument: {arg}" + if suggestions: + msg += f". Did you mean '{suggestions[0]}'?" + _LOGGER.warning(msg) + def get_all_options( self, drop_default=False, add_extra_args_fn: Optional[Callable[[_BeamArgumentParser], None]] = None, - retain_unknown_options=False) -> Dict[str, Any]: + retain_unknown_options=False, + display_warnings=False) -> Dict[str, Any]: """Returns a dictionary of all defined arguments. Returns a dictionary of all defined arguments (arguments that are defined in @@ -485,12 +506,11 @@ def get_all_options( add_extra_args_fn(parser) known_args, unknown_args = parser.parse_known_args(self._flags) - if retain_unknown_options: - if unknown_args: - _LOGGER.warning( - 'Unknown pipeline options received: %s. Ignore if flags are ' - 'used for internal purposes.' % (','.join(unknown_args))) + if display_warnings: + self._warn_on_unknown_options(unknown_args, parser) + + if retain_unknown_options: seen = set() def add_new_arg(arg, **kwargs): @@ -530,7 +550,7 @@ def add_new_arg(arg, **kwargs): continue parsed_args, _ = parser.parse_known_args(self._flags) else: - if unknown_args: + if unknown_args and display_warnings: _LOGGER.warning("Discarding unparseable args: %s", unknown_args) parsed_args = known_args result = vars(parsed_args) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 0ed5a435e788..884ca124b0f6 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -580,6 +580,10 @@ def run(self, test_runner_api='AUTO'): # type: (Union[bool, str]) -> PipelineResult """Runs the pipeline. Returns whatever our runner returns after running.""" + # All pipeline options are finalized at this point. + # Call get_all_options to print warnings on invalid options. + self.options.get_all_options( + retain_unknown_options=True, display_warnings=True) for error_handler in self._error_handlers: error_handler.verify_closed() From 2d99cede65b83e121b3a5f6df42339c43a49342e Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 5 Sep 2025 11:54:11 -0700 Subject: [PATCH 2/3] only show suggestions once --- sdks/python/apache_beam/options/pipeline_options.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 6386682705fa..0d0a78090305 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -550,7 +550,7 @@ def add_new_arg(arg, **kwargs): continue parsed_args, _ = parser.parse_known_args(self._flags) else: - if unknown_args and display_warnings: + if unknown_args: _LOGGER.warning("Discarding unparseable args: %s", unknown_args) parsed_args = known_args result = vars(parsed_args) From 907bf601c0cc4a489d1fc0ab417f253d9451a671 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 5 Sep 2025 13:03:32 -0700 Subject: [PATCH 3/3] Update sdks/python/apache_beam/options/pipeline_options.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- sdks/python/apache_beam/options/pipeline_options.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 0d0a78090305..47e1d9b26241 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -460,13 +460,12 @@ def _warn_on_unknown_options(unknown_args, parser): ] for arg in unknown_args: - if not arg.startswith('--'): - continue - arg_name = arg.split('=', 1)[0] - suggestions = difflib.get_close_matches(arg_name, all_known_options) msg = f"Unparseable argument: {arg}" - if suggestions: - msg += f". Did you mean '{suggestions[0]}'?" + if arg.startswith('--'): + arg_name = arg.split('=', 1)[0] + suggestions = difflib.get_close_matches(arg_name, all_known_options) + if suggestions: + msg += f". Did you mean '{suggestions[0]}'?'" _LOGGER.warning(msg) def get_all_options(