Skip to content

[Bug]: Writing partitioned data with Managed IcebergIO fails #31943

@arthurpessoa

Description

@arthurpessoa

What happened?

After following Dataflow example (https://cloud.google.com/dataflow/docs/guides/write-to-iceberg), i'm trying to add a partition, but it breaks when writing.

Given the following table:

    val catalogConfig = IcebergCatalogConfig.builder()
        .setIcebergCatalogType("hadoop")
        .setName("hadoop.catalog")
        .setWarehouseLocation("file:///warehouse")
        .build()


    val icebergSchema = IcebergSchema(
        Types.NestedField.required(1, "id", Types.LongType.get()),
        Types.NestedField.required(2, "name", Types.StringType.get()),
        Types.NestedField.required(3, "day", Types.StringType.get())
    )

    val partitionSpec = PartitionSpec.builderFor(icebergSchema)
        .identity("id")
        .build()

    catalogConfig.catalog().createTable(
        TableIdentifier.of("table"),
        icebergSchema,
        partitionSpec
    )

When writing with the following code

    val TABLE_ROWS: List<String> = listOf(
        "{\"id\":0, \"name\":\"Alice\", \"day\":\"01\"}",
        "{\"id\":1, \"name\":\"Bob\", \"day\":\"02\"}",
        "{\"id\":2, \"name\":\"Charles\", \"day\":\"02\"}"
    )

    val SCHEMA: Schema = Schema.builder()
        .addInt64Field("id")
        .addStringField("name")
        .addStringField("day")
        .build()


    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(
           Managed.write(Managed.ICEBERG).withConfig(mapOf("table" to "table",
           "catalog_config" to mapOf(
               "catalog_name" to "hadoop.catalog",
               "warehouse_location" to "file:///warehouse",
               "catalog_type" to "hadoop"
           )
        )))

Then it breaks with the following stacktrace:

Caused by: java.lang.IllegalArgumentException: Partition must not be null when creating data writer for partitioned spec
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143)
	at org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:704)
	at org.apache.beam.sdk.io.iceberg.RecordWriter.<init>(RecordWriter.java:73)
	at org.apache.beam.sdk.io.iceberg.RecordWriter.<init>(RecordWriter.java:47)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.createAndInsertWriter(WriteUngroupedRowsToFiles.java:219)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.getWriterIfPossible(WriteUngroupedRowsToFiles.java:242)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:258)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions