Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@

package org.apache.comet.vector

import java.nio.ByteOrder

import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.c.NativeUtil.NULL
import org.apache.arrow.memory.util.MemoryUtil
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
Expand Down Expand Up @@ -74,15 +70,6 @@ class NativeUtil {

(0 until numCols).foreach { index =>
val arrowSchema = ArrowSchema.allocateNew(allocator)

// Manually fill NULL to `release` slot of ArrowSchema because ArrowSchema doesn't provide
// `markReleased`.
// The total size of ArrowSchema is 72 bytes.
// The `release` slot is at offset 56 in the ArrowSchema struct.
val buffer =
MemoryUtil.directBuffer(arrowSchema.memoryAddress(), 72).order(ByteOrder.nativeOrder)
buffer.putLong(56, NULL);

val arrowArray = ArrowArray.allocateNew(allocator)
arrays(index) = arrowArray
schemas(index) = arrowSchema
Expand Down
12 changes: 7 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,21 +259,23 @@ fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool> {
}

/// Prepares arrow arrays for output.
unsafe fn prepare_output(
fn prepare_output(
env: &mut JNIEnv,
array_addrs: jlongArray,
schema_addrs: jlongArray,
output_batch: RecordBatch,
exec_context: &mut ExecutionContext,
) -> CometResult<jlong> {
let array_address_array = JLongArray::from_raw(array_addrs);
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;

let array_addrs = env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)?;
let array_addrs =
unsafe { env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)? };
let array_addrs = &*array_addrs;

let schema_address_array = JLongArray::from_raw(schema_addrs);
let schema_addrs = env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)?;
let schema_address_array = unsafe { JLongArray::from_raw(schema_addrs) };
let schema_addrs =
unsafe { env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)? };
let schema_addrs = &*schema_addrs;

let results = output_batch.columns();
Expand Down
18 changes: 11 additions & 7 deletions native/core/src/execution/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,20 @@ impl SparkArrowConvert for ArrayData {
let array_align = std::mem::align_of::<FFI_ArrowArray>();
let schema_align = std::mem::align_of::<FFI_ArrowSchema>();

// Check if the pointer alignment is correct for `replace`.
// Check if the pointer alignment is correct.
if array_ptr.align_offset(array_align) != 0 || schema_ptr.align_offset(schema_align) != 0 {
return Err(ExecutionError::ArrowError(
"Pointer alignment is not correct".to_string(),
));
unsafe {
std::ptr::write_unaligned(array_ptr, FFI_ArrowArray::new(self));
std::ptr::write_unaligned(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?);
}
} else {
// SAFETY: `array_ptr` and `schema_ptr` are aligned correctly.
unsafe {
std::ptr::write(array_ptr, FFI_ArrowArray::new(self));
std::ptr::write(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?);
}
}

unsafe { std::ptr::replace(array_ptr, FFI_ArrowArray::new(self)) };
unsafe { std::ptr::replace(schema_ptr, FFI_ArrowSchema::try_from(self.data_type())?) };

Ok(())
}
}
Expand Down