Skip to content

[Bug]: Dataflow option validation fails in VSCode Jupyter notebook #34101

@aredshift

Description

@aredshift

What happened?

Launching a Google Cloud Dataflow pipeline from a VSCode Jupyter notebook with Beam Python SDK version 2.63.0//Python3.11 results in a failure to parse options that otherwise are accepted when running a Python script (and previously successfully in Beam version 2.52.0//Python3.8).

The flexrs_goal flag is being passed the Jupyter kernel connection file.

My best guess is that it has to do with the argument for the ipykernel_launcher.py script being passed --f /<user-home>/.local/share/jupyter/runtime/<connection_file>.json which interacts with argparse's allow_abbrev option (thus mapping to --flexrs_goal.

Printing sys.argv in the failing cell yields: ['/opt/conda/envs/my-conda-env/lib/python3.11/site-packages/ipykernel_launcher.py', '--f=<path>.json'] (called-by VSCode plugin source here I think).

I'm not sure what changed between Beam 2.52.0 and 2.63.0 or between Python 3.8 and 3.11 that triggered this regression. It looks like according to this doc the kernel expects a -f flag but successfully parses the --f passed by the VSCode plugin.

Output:

... logs ...
usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                             [--project PROJECT] [--job_name JOB_NAME]
                             [--staging_location STAGING_LOCATION]
                             [--temp_location TEMP_LOCATION] [--region REGION]
                             [--service_account_email SERVICE_ACCOUNT_EMAIL]
                             [--no_auth]
                             [--template_location TEMPLATE_LOCATION]
                             [--label LABELS] [--update]
                             [--transform_name_mapping TRANSFORM_NAME_MAPPING]
                             [--enable_streaming_engine]
                             [--dataflow_kms_key DATAFLOW_KMS_KEY]
                             [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                             [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                             [--dataflow_service_option DATAFLOW_SERVICE_OPTIONS]
                             [--enable_hot_key_logging]
                             [--enable_artifact_caching]
                             [--impersonate_service_account IMPERSONATE_SERVICE_ACCOUNT]
                             [--gcp_oauth_scope GCP_OAUTH_SCOPES]
                             [--enable_bucket_read_metric_counter]
                             [--enable_bucket_write_metric_counter]
                             [--no_gcsio_throttling_counter]
                             [--enable_gcsio_blob_generation]
ipykernel_launcher.py: error: argument --flexrs_goal: invalid choice: '/<usr-home>/.local/share/jupyter/runtime/kernel-<id>.json' (choose from 'COST_OPTIMIZED', 'SPEED_OPTIMIZED')

Traceback

... pipeline = beam.Pipeline(options=pipeline_options)

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/pipeline.py:211, in Pipeline.__init__(self, runner, options, argv, display_data)
    206   raise TypeError(
    207       'Runner %s is not a PipelineRunner object or the '
    208       'name of a registered runner.' % runner)
    210 # Validate pipeline options
--> 211 errors = PipelineOptionsValidator(self._options, runner).validate()
    212 if errors:
    213   raise ValueError(
    214       'Pipeline has validations errors: \n' + '\n'.join(errors))

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options_validator.py:149, in PipelineOptionsValidator.validate(self)
    147 for cls in self.OPTIONS:
    148   if 'validate' in cls.__dict__ and callable(cls.__dict__['validate']):
--> 149     errors.extend(self.options.view_as(cls).validate(self))
    150 return errors

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:1023, in GoogleCloudOptions.validate(self, validator)
   1021 errors = []
   1022 if validator.is_service_runner():
-> 1023   errors.extend(self._handle_temp_and_staging_locations(validator))
   1024   errors.extend(validator.validate_cloud_options(self))
   1026 if self.view_as(DebugOptions).dataflow_job_file:

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:1004, in GoogleCloudOptions._handle_temp_and_staging_locations(self, validator)
   1002   return []
   1003 elif not staging_errors and not temp_errors:
-> 1004   self._warn_if_soft_delete_policy_enabled('temp_location')
   1005   self._warn_if_soft_delete_policy_enabled('staging_location')
   1006   return []

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:978, in GoogleCloudOptions._warn_if_soft_delete_policy_enabled(self, arg_name)
    976 try:
    977   from apache_beam.io.gcp import gcsio
--> 978   if gcsio.GcsIO().is_soft_delete_enabled(gcs_path):
    979     _LOGGER.warning(
    980         "Bucket specified in %s has soft-delete policy enabled."
    981         " To avoid being billed for unnecessary storage costs, turn"
   (...)
    985         " https://cloud.google.com/storage/docs/use-soft-delete"
    986         "#remove-soft-delete-policy." % arg_name)
    987 except ImportError:

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py:151, in GcsIO.__init__(self, storage_client, pipeline_options)
    149   pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
    150 if storage_client is None:
--> 151   storage_client = create_storage_client(pipeline_options)
    153 google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    154 self.enable_read_bucket_metric = getattr(
    155     google_cloud_options, 'enable_bucket_read_metric_counter', False)

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py:119, in create_storage_client(pipeline_options, use_credentials)
    107 """Create a GCS client for Beam via GCS Client Library.
    108 
    109 Args:
   (...)
    116   A google.cloud.storage.client.Client instance.
    117 """
    118 if use_credentials:
--> 119   credentials = auth.get_service_credentials(pipeline_options)
    120 else:
    121   credentials = None

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:82, in get_service_credentials(pipeline_options)
     68 def get_service_credentials(pipeline_options):
     69   # type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
     71   """For internal use only; no backwards-compatibility guarantees.
     72 
     73   Get credentials to access Google services.
   (...)
     80     not found. Returned object is thread-safe.
     81   """
---> 82   return _Credentials.get_service_credentials(pipeline_options)

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:139, in _Credentials.get_service_credentials(cls, pipeline_options)
    135     socket.setdefaulttimeout(60)
    136   _LOGGER.debug(
    137       "socket default timeout is %s seconds.", socket.getdefaulttimeout())
--> 139   cls._credentials = cls._get_service_credentials(pipeline_options)
    140   cls._credentials_init = True
    142 return cls._credentials

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:156, in _Credentials._get_service_credentials(pipeline_options)
    152   return None
    154 try:
    155   # pylint: disable=c-extension-no-member
--> 156   credentials = _Credentials._get_credentials_with_retrys(pipeline_options)
    157   credentials = _Credentials._add_impersonation_credentials(
    158       credentials, pipeline_options)
    159   credentials = _ApitoolsCredentialsAdapter(credentials)

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/utils/retry.py:298, in with_exponential_backoff.<locals>.real_decorator.<locals>.wrapper(*args, **kwargs)
    296 while True:
    297   try:
--> 298     return fun(*args, **kwargs)
    299   except Exception as exn:  # pylint: disable=broad-except
    300     if not retry_filter(exn):

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/internal/gcp/auth.py:176, in _Credentials._get_credentials_with_retrys(pipeline_options)
    172 @staticmethod
    173 @retry.with_exponential_backoff(num_retries=4, initial_delay_secs=2)
    174 def _get_credentials_with_retrys(pipeline_options):
    175   credentials, _ = google.auth.default(
--> 176     scopes=pipeline_options.view_as(GoogleCloudOptions).gcp_oauth_scopes)
    177   return credentials

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:467, in PipelineOptions.view_as(self, cls)
    445 def view_as(self, cls: Type[PipelineOptionsT]) -> PipelineOptionsT:
    446   """Returns a view of current object as provided PipelineOption subclass.
    447 
    448   Example Usage::
   (...)
    465 
    466   """
--> 467   view = cls(self._flags)
    469   for option_name in view._visible_option_list():
    470     # Initialize values of keys defined by a cls.
    471     #
   (...)
    475     # backed by the same list across multiple views, and that any overrides of
    476     # pipeline options already stored in _all_options are preserved.
    477     if option_name not in self._all_options:

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:237, in PipelineOptions.__init__(self, flags, **kwargs)
    233     cls._add_argparse_args(parser)  # type: ignore
    235 # The _visible_options attribute will contain options that were recognized
    236 # by the parser.
--> 237 self._visible_options, _ = parser.parse_known_args(flags)
    239 # self._all_options is initialized with overrides to flag values,
    240 # provided in kwargs, and will store key-value pairs for options recognized
    241 # by current PipelineOptions [sub]class and its views that may be created.
   (...)
    244 # as each new views are created.
    245 # Users access this dictionary store via __getattr__ / __setattr__ methods.
    246 self._all_options = kwargs

File /opt/conda/envs/my-conda-env/lib/python3.11/argparse.py:1909, in ArgumentParser.parse_known_args(self, args, namespace)
   1907         namespace, args = self._parse_known_args(args, namespace)
   1908     except ArgumentError as err:
-> 1909         self.error(str(err))
   1910 else:
   1911     namespace, args = self._parse_known_args(args, namespace)

File /opt/conda/envs/my-conda-env/lib/python3.11/site-packages/apache_beam/options/pipeline_options.py:137, in _BeamArgumentParser.error(self, message)
    135 if message.startswith('ambiguous option: '):
    136   return
--> 137 super().error(message)

File /opt/conda/envs/my-conda-env/lib/python3.11/argparse.py:2640, in ArgumentParser.error(self, message)
   2638 self.print_usage(_sys.stderr)
   2639 args = {'prog': self.prog, 'message': message}
-> 2640 self.exit(2, _('%(prog)s: error: %(message)s\n') % args)

File /opt/conda/envs/my-conda-env/lib/python3.11/argparse.py:2627, in ArgumentParser.exit(self, status, message)
   2625 if message:
   2626     self._print_message(message, _sys.stderr)
-> 2627 _sys.exit(status)

SystemExit: 2

Environment

  • OS: Debian GNU/Linux 12 (bookworm)
  • Architecture: aarch64
    • CPU op-mode(s): 64-bit
    • Byte Order: Little Endian
  • Python version: 3.11.11
  • Beam version: 2.63.0 (also fails on 2.61.0)
  • IPykernel version: 6.29.5
  • Jupyter client version: 8.6.3
  • Jupyter server version: 2.15.0
  • Jupyter core version: 5.7.2
  • Interfacing via VSCode devcontainer
  • Environment managed by Conda(micromamba)/Poetry

Things I've tried

  • Populating the flexrs_goal option explicitly instead of omitting it
  • Running the same program outside of a jupyter notebook (successful)
  • Running with Python 3.8 & Beam 2.52.0 (successful)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions