Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,51 @@ struct ExecutionContext {
pub tracing_enabled: bool,
}

/// Deserializes per-partition IcebergScan data from protobuf.
/// Returns a map of plan_id to IcebergFilePartitionData containing only the
/// per-partition FileScanTasks and deduplication pools.
fn deserialize_iceberg_replacements(
bytes: &[u8],
) -> CometResult<HashMap<u32, datafusion_comet_proto::spark_operator::IcebergFilePartitionData>> {
use datafusion_comet_proto::spark_operator::IcebergScanReplacements;
use prost::Message;

let replacements = IcebergScanReplacements::decode(bytes).map_err(|e| {
CometError::Internal(format!("Failed to decode IcebergScanReplacements: {}", e))
})?;
Ok(replacements.replacements)
}

/// Merges per-partition IcebergScan data into placeholder IcebergScans by plan_id.
/// The placeholder IcebergScan already contains common metadata (required_schema,
/// catalog_properties, metadata_location). We just merge in the per-partition data.
fn merge_iceberg_partition_data(
op: &mut Operator,
replacements: &HashMap<u32, datafusion_comet_proto::spark_operator::IcebergFilePartitionData>,
) {
use datafusion_comet_proto::spark_operator::operator::OpStruct;

if let Some(OpStruct::IcebergScan(scan)) = &mut op.op_struct {
if let Some(partition_data) = replacements.get(&op.plan_id) {
// Merge per-partition data into the existing IcebergScan
scan.file_partition = partition_data.file_partition.clone();
scan.schema_pool = partition_data.schema_pool.clone();
scan.partition_type_pool = partition_data.partition_type_pool.clone();
scan.partition_spec_pool = partition_data.partition_spec_pool.clone();
scan.name_mapping_pool = partition_data.name_mapping_pool.clone();
scan.project_field_ids_pool = partition_data.project_field_ids_pool.clone();
scan.partition_data_pool = partition_data.partition_data_pool.clone();
scan.delete_files_pool = partition_data.delete_files_pool.clone();
scan.residual_pool = partition_data.residual_pool.clone();
}
}

// Continue traversing to handle multiple IcebergScans
for child in op.children.iter_mut() {
merge_iceberg_partition_data(child, replacements);
}
}

/// Accept serialized query plan and return the address of the native query plan.
/// # Safety
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
Expand All @@ -177,6 +222,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
task_attempt_id: jlong,
task_cpus: jlong,
key_unwrapper_obj: JObject,
iceberg_task_bytes: JByteArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
// Deserialize Spark configs
Expand All @@ -199,7 +245,15 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(

// Deserialize query plan
let bytes = env.convert_byte_array(serialized_query)?;
let spark_plan = serde::deserialize_op(bytes.as_slice())?;
let mut spark_plan = serde::deserialize_op(bytes.as_slice())?;

// Merge per-partition IcebergScan data into placeholders to avoid sending
// all tasks to all executors. Each executor receives only its partition's tasks.
if !iceberg_task_bytes.is_null() {
let iceberg_bytes = env.convert_byte_array(iceberg_task_bytes)?;
let replacements = deserialize_iceberg_replacements(&iceberg_bytes)?;
merge_iceberg_partition_data(&mut spark_plan, &replacements);
}

let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);

Expand Down
6 changes: 3 additions & 3 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,13 +1144,13 @@ impl PhysicalPlanner {
let metadata_location = scan.metadata_location.clone();

debug_assert!(
!scan.file_partitions.is_empty(),
"IcebergScan must have at least one file partition. This indicates a bug in Scala serialization."
scan.file_partition.is_some(),
"IcebergScan must have file_partition populated. This indicates a bug in per-partition merge logic."
);

let tasks = parse_file_scan_tasks(
scan,
&scan.file_partitions[self.partition as usize].file_scan_tasks,
&scan.file_partition.as_ref().unwrap().file_scan_tasks,
)?;
let file_task_groups = vec![tasks];

Expand Down
25 changes: 23 additions & 2 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ message IcebergScan {
// Catalog-specific configuration for FileIO (credentials, S3/GCS config, etc.)
map<string, string> catalog_properties = 2;

// Pre-planned file scan tasks grouped by Spark partition
repeated IcebergFilePartition file_partitions = 3;
// File scan tasks for this partition (exactly one partition per IcebergScan after merging)
IcebergFilePartition file_partition = 3;

// Table metadata file path for FileIO initialization
string metadata_location = 4;
Expand Down Expand Up @@ -406,3 +406,24 @@ message Window {
repeated spark.spark_expression.Expr partition_by_list = 3;
Operator child = 4;
}

// Per-partition file scan tasks and deduplication pools for IcebergScan.
// Contains only the data that varies per partition, while common metadata
// (required_schema, catalog_properties, metadata_location) stays in the placeholder.
message IcebergFilePartitionData {
IcebergFilePartition file_partition = 1;
repeated string schema_pool = 2;
repeated string partition_type_pool = 3;
repeated string partition_spec_pool = 4;
repeated string name_mapping_pool = 5;
repeated ProjectFieldIdList project_field_ids_pool = 6;
repeated PartitionData partition_data_pool = 7;
repeated DeleteFileList delete_files_pool = 8;
repeated spark.spark_expression.Expr residual_pool = 9;
}

// Per-partition IcebergScan replacements for multiple tables in joins/unions.
// Maps plan_id to just the per-partition data, avoiding duplicate common metadata.
message IcebergScanReplacements {
map<uint32, IcebergFilePartitionData> replacements = 1;
}
6 changes: 4 additions & 2 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class CometExecIterator(
numParts: Int,
partitionIndex: Int,
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]] = None,
encryptedFilePaths: Seq[String] = Seq.empty)
encryptedFilePaths: Seq[String] = Seq.empty,
icebergTaskBytes: Option[Array[Byte]] = None)
extends Iterator[ColumnarBatch]
with Logging {

Expand Down Expand Up @@ -123,7 +124,8 @@ class CometExecIterator(
memoryConfig.memoryLimitPerTask,
taskAttemptId,
taskCPUs,
keyUnwrapper)
keyUnwrapper,
icebergTaskBytes = icebergTaskBytes.orNull)
}

private var nextBatch: Option[ColumnarBatch] = None
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class Native extends NativeBase {
memoryLimitPerTask: Long,
taskAttemptId: Long,
taskCPUs: Long,
keyUnwrapper: CometFileKeyUnwrapper): Long
keyUnwrapper: CometFileKeyUnwrapper,
icebergTaskBytes: Array[Byte]): Long
// scalastyle:on

/**
Expand Down
Loading
Loading