Skip to content

[Bug]: Cannot transform multiple PCollections to Dataframe using the apache_beam.dataframe.transforms.DataframeTransform function #30445

@riverset

Description

@riverset

What happened?

To reproduce the issue, you can simply use the below python code

from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam.dataframe.transforms import DataframeTransform
import logging
import argparse
import sys
import pandas

logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

# Just a Dummy Dataframe Transform Function, Ignore the logic
def transformdf(a, b):
    a["addr"] = "addr-common"
    return a

p = beam.Pipeline(options=pipeline_options)

# Schema Aware Pcollection
data1 = [Row(id=1, name="abc"), Row(id=2, name="def"), Row(id=3, name="ghi")]
pcol1 = (p | "Create1" >> beam.Create(data1))

data2 = [Row(addr="addr1"), Row(addr="addr2"), Row(addr="addr3")]
pcol2 = (p | "Create2" >> beam.Create(data2))

pcol = ({"a":pcol1, "b":pcol2} | "TransformedDF" >> DataframeTransform(transformdf))
# The above throws issue with duplicate label error

pcol | "Map" >> beam.Map(lambda row: {"id":row.id, "name":row.name, "addr":row.addr}) | "Print" >> beam.Map(print)

p.run().wait_until_finish()

this is sourced from this https://stackoverflow.com/questions/70937308/apache-beam-multiple-pcollection-dataframetransform-issue issue.

To explain the issue in detail,

When we are passing multiple PCollection to DataframeTransform function, we get the following error back
RuntimeError: A transform with label "TransformedDF/BatchElements(pc)" already exists in the pipeline .

This is happening even though the PCollection is schema aware

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions