Skip to content

[Bug]: Type inference failing for Python SDK with External transforms and beam.Row #22854

@rahuliyer95

Description

@rahuliyer95

What happened?

When using expansion service with Beam SDK 2.41.0, the type inference is failing with the following exception,

Traceback (most recent call last):
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/__main__.py", line 87, in <module>
    bootstrap_pex(__entry_point__)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex_bootstrapper.py", line 591, in bootstrap_pex
    pex.PEX(entry_point).execute()
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py", line 528, in execute
    sys.exit(self._wrap_coverage(self._wrap_profiling, self._execute))
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py", line 435, in _wrap_coverage
    return runner(*args)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py", line 466, in _wrap_profiling
    return runner(*args)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py", line 556, in _execute
    EntryPoint.parse("run = {}".format(self._pex_info.entry_point))
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py", line 739, in execute_entry
    return self.execute_module(entry_point.module)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py", line 747, in execute_module
    runpy.run_module(module_name, run_name="__main__", alter_sys=True)
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py", line 205, in run_module
    return _run_module_code(code, init_globals, run_name, mod_spec)
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/twitter/udp/integration_tests/jobs/dal_read_job.py", line 180, in <module>
    main()
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/twitter/udp/integration_tests/jobs/dal_read_job.py", line 157, in main
    >> dal_rows.ReadRows("tweetsource-public", "public_tweets", opts.date).with_output_types(beam.Row)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/ptransform.py", line 1095, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/ptransform.py", line 617, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pipeline.py", line 663, in apply
    return self.apply(transform, pvalueish)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pipeline.py", line 709, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/dataflow/dataflow_runner.py", line 141, in apply
    return super().apply(transform, input, options)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/runner.py", line 185, in apply
    return m(transform, input, options)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/runner.py", line 215, in apply_PTransform
    return transform.expand(input)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/external.py", line 547, in expand
    pcoll_id in self._expanded_transform.outputs.items()
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/external.py", line 547, in <dictcomp>
    pcoll_id in self._expanded_transform.outputs.items()
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pvalue.py", line 210, in from_runner_api
    element_type=context.element_type_from_coder_id(proto.coder_id),
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py", line 269, in element_type_from_coder_id
    self.coders[coder_id].to_type_hint())
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py", line 163, in __getitem__
    return self.get_by_id(id)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_proto[id], self._pipeline_context)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/coders.py", line 385, in from_runner_api
    context)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/row_coder.py", line 111, in from_runner_api_parameter
    return RowCoder(schema)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/row_coder.py", line 63, in __init__
    self._type_hint = named_tuple_from_schema(self.schema)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py", line 462, in named_tuple_from_schema
    schema_registry=schema_registry)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py", line 179, in typing_from_runner_api
    schema_registry=schema_registry).typing_from_runner_api(fieldtype_proto)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py", line 407, in typing_from_runner_api
    field_py_type = self.typing_from_runner_api(field.type)
  File "/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py", line 365, in typing_from_runner_api
    return Optional[base]
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py", line 254, in inner
    return func(*args, **kwds)
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py", line 357, in __getitem__
    arg = _type_check(parameters, "Optional[t] requires a single type.")
  File "/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py", line 142, in _type_check
    raise TypeError(f"{msg} Got {arg!r:.100}.")
TypeError: Optional[t] requires a single type. Got Row(location=typing.Union[typing.Sequence[str], NoneType], empty__=typing.Union[bool, NoneType]).

This is how we are using the external transform,

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions
from apache_beam.transforms.external import ExpansionAndArtifactRetrievalStub
import grpc
import typing

External = typing.NamedTuple(
  "External",
  [
    ("id", str),
  ],
)

creds = grpc.ssl_channel_credentials()
channel = grpc.secure_channel("localhost:10001", creds)
grpc.channel_ready_future(channel).result()
stub = ExpansionAndArtifactRetrievalStub(channel)
with beam.Pipeline(options=PipelineOptions()) as p:
  p | "Read" >> beam.ExternalTransform("URN", NamedTupleBasedPayloadBuilder(External("123")), stub)

The same code works fine with SDK 2.40.0

Issue Priority

Priority: 1

Issue Component

Component: sdk-py-core

Metadata

Metadata

Assignees

Labels

P1bugcoredone & doneIssue has been reviewed after it was closed for verification, followups, etc.python

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions