Skip to content

CometColumnarExchange throws exception when reading Delta table #1844

@Kontinuation

Description

@Kontinuation

Describe the bug

Reading Delta table with Comet enabled fails with the following error:

25/05/30 09:53:47 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.comet.CometNativeException: StructBuilder (Schema { fields: [Field { name: "provider", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "options", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }) and field_builder with index 0 (Utf8) are of unequal lengths: (1 != 0).
        at std::backtrace::Backtrace::create(__internal__:0)
        at comet::errors::init::{{closure}}(__internal__:0)
        at std::panicking::rust_panic_with_hook(__internal__:0)
        at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)
        at std::sys::backtrace::__rust_end_short_backtrace(__internal__:0)
        at _rust_begin_unwind(__internal__:0)
        at core::panicking::panic_fmt(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::validate_content::{{closure}}::panic_cold_display(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::validate_content(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::finish(__internal__:0)
        at <arrow_array::builder::struct_builder::StructBuilder as arrow_array::builder::ArrayBuilder>::finish(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::finish(__internal__:0)
        at <arrow_array::builder::struct_builder::StructBuilder as arrow_array::builder::ArrayBuilder>::finish(__internal__:0)
        at <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next(__internal__:0)
        at comet::execution::shuffle::row::process_sorted_row_partition(__internal__:0)
        at comet::execution::jni_api::Java_org_apache_comet_Native_writeSortedFileNative::{{closure}}::{{closure}}(__internal__:0)
        at _Java_org_apache_comet_Native_writeSortedFileNative(__internal__:0)
	at org.apache.comet.Native.writeSortedFileNative(Native Method)
	at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.doSpilling(SpillWriter.java:187)
	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter$ArrowIPCWriter.doSpilling(CometDiskBlockWriter.java:401)
	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.close(CometDiskBlockWriter.java:304)
	at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:244)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Steps to reproduce

The following PySpark code reproduces this issue locally.

from pyspark.sql import SparkSession

COMET_JAR = "path/to/comet/jar"

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.jars", COMET_JAR)
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.1")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension,org.apache.comet.CometSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.extraClassPath", COMET_JAR)
    .config("spark.executor.extraClassPath", COMET_JAR)
    .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
    .config("spark.comet.enabled", "true")
    .config("spark.comet.exec.shuffle.enabled", "true")
    .config("spark.comet.exec.shuffle.mode", "auto")
    .config("spark.comet.exec.shuffle.fallbackToColumnar", "true")
    .getOrCreate()
)

spark.range(0, 1000).write.format("delta").save("delta_table")
spark.read.format("delta").load("delta_table").show() # this line throws the aforementioned exception.

Expected behavior

Delta table should be read successfully.

Additional context

Spark version: Spark 3.5.4
Comet version: commit 7cf2e9d

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions