Skip to content

[Bug]: JDBC logical type not found when running a Python pipeline in docker env #36013

@shunping

Description

@shunping

What happened?

When we call jdbc related schema transform in a pipeline and specify environment_type as DOCKER, the pipeline failed with error ValueError: No logical type registered for URN 'beam:logical_type:javasdk_date:v1'. Loopback mode does not have this issue.

Notice that it happens even if we import the Jdbc logical types in the pipeline code in the main session. Apparently, the python SDK worker inside the docker has not imported the JDBC logical types or apache_beam.io.jdbc, so those classes are not registered when the worker tries to construct the graph from proto.

Here is the code to reproduce (required a postgres database instance):

import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import SchemaAwareExternalTransform
from apache_beam.transforms.external import BeamJarExpansionService

logging.basicConfig(level=logging.INFO)

options = PipelineOptions([
    "--streaming",
    "--runner=PrismRunner",
    "--environment_type=DOCKER",
    "--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.12_sdk:latest",
    "--job_server_timeout=3600",
    "--prism_log_level=debug",
])

URN = "beam:schematransform:org.apache.beam:postgres_read:v1"

from apache_beam.io.jdbc import JdbcDateType
from apache_beam.io.jdbc import JdbcTimeType

expansion_service = BeamJarExpansionService(
    "sdks:java:io:google-cloud-platform:expansion-service:shadowJar")
with beam.Pipeline(options=options) as p:
  _ = (
      p | SchemaAwareExternalTransform(
          identifier=URN,
          expansion_service=expansion_service,
          rearrange_based_on_discovery=True,
          jdbc_url="jdbc:postgresql://localhost:5432/<db_name>",
          location="<table_name>",
          username="postgres",
          password="",
      )
      | beam.LogElements(level=logging.WARN))

The error message is as follows.

Traceback (most recent call last):
  File "sdks/python/playpen/xlang/run_xlang.py", line 37, in <module>
    with beam.Pipeline(options=options) as p:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "sdks/python/apache_beam/pipeline.py", line 663, in __exit__
    self.result.wait_until_finish()
  File "sdks/python/apache_beam/runners/portability/portable_runner.py", line 568, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst004 stage-005 failed:Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 573, in named_tuple_from_schema
    field_py_type = self.typing_from_runner_api(field.type)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 491, in typing_from_runner_api
    base = self.typing_from_runner_api(base_type)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 554, in typing_from_runner_api
    return LogicalType.from_runner_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 813, in from_runner_api
    raise ValueError(
ValueError: No logical type registered for URN 'beam:logical_type:javasdk_date:v1'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 387, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 659, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 690, in process_bundle
    bundle_processor = self.bundle_processor_cache.get(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 511, in get
    processor = bundle_processor.BundleProcessor(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1133, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1193, in create_execution_tree
    get_operation(transform_id))) for transform_id in sorted(
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1168, in get_operation
    tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
          ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1038, in wrapper
    result = cache[args] = func(*args)
                           ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1178, in get_operation
    return transform_factory.create_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1497, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1828, in create_par_do
    return _create_pardo_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1912, in _create_pardo_operation
    output_coders = factory.get_output_coders(transform_proto)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1540, in get_output_coders
    tag: self.get_windowed_coder(pcoll_id)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1526, in get_windowed_coder
    coder = self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1519, in get_coder
    return self.context.coders.get_by_id(coder_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/runners/pipeline_context.py", line 106, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/coders/coders.py", line 368, in from_runner_api
    return constructor(
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/coders/row_coder.py", line 107, in from_runner_api_parameter
    return RowCoder(schema)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/coders/row_coder.py", line 66, in __init__
    self._type_hint = named_tuple_from_schema(self.schema)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 616, in named_tuple_from_schema
    schema_registry=schema_registry).named_tuple_from_schema(schema)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 577, in named_tuple_from_schema
    raise ValueError(
ValueError: Failed to decode schema due to an issue with Field proto:

name: "event_date"
type {
  nullable: true
  logical_type {
    urn: "beam:logical_type:javasdk_date:v1"
    payload: "\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002d\303\007\360@\254\355\000\005sr\000*org.apache.beam.sdk.io.jdbc.LogicalTypes$2M\352\236\036h\3034/\002\000\000xr\000?N9\000 schemas.l\t9Dtypes.PassThroughL\t\030\001Q\270\210\324\331\211\313P\033\263\002\000\004L\000\010argumentt\000\022Ljava/lang/Object;L\000\014a\r \001:\034t\000.Lorg/\t\266\000/\001\266\020/sdk/\r}\004/S\005\205\024$Field\0010\020;L\000\tf\021\rDq\000~\000\003L\000\nidentifier6s\000<String;xpt\000\000sr\0006n\346\000$AutoValue_\ts\000_\025sh9\304m\364S\243\227P\002\000\010L\000\025collectionEle\001\346\001\226\000q\t\211\000\013-+\001\023\010t\0000\216\331\000\000L9E$;L\000\nmapKey\001@\rS\014\014map\005\227\035\024,\010metadatat\000\017)aXutil/Map;L\000\010nullablet\000\023\t\035%~4Boolean;L\000\trow\t\343\010t\000$\212\243\000\001T!\374\030Namet\000-\2122\000\000$\001\254\001/\020;xr\000,nu\001\000S:\336\0010\013PLl[\357\3103\002\000\000xp\001\001\014sr\000\036AC\030.util.C5|Ds$EmptyMapY6\024\205Z\334\347\320\0053\014sr\000\021\005/\001\364\000.\r\3648\315 r\200\325\234\372\356\002\000\001Z\000\005v!\344\034xp\000p~r\000+\212\234\000\021\314\000\000\r\001\000\022e1\000\016\031f\014Enum\r\034\005\035(pt\000\006STRINGs!\304\000\007\001\307\001\t\000\020\001\005\010\022p~\001\007H\023t\000\010DATETIMEt\000\004DATE"
    representation {
      logical_type {
        urn: "beam:logical_type:millis_instant:v1"
        representation {
          atomic_type: INT64
        }
      }
    }
    argument_type {
      atomic_type: STRING
    }
    argument {
      atomic_value {
        string: ""
      }
    }
  }
}
id: 1
encoding_position: 1

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions