From aba7b69b0dae8bc0dd4cf703fe598741974a3190 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sat, 22 May 2021 13:14:18 +0800 Subject: [PATCH 1/6] ARROW-7272: [C++][Java] JNI bridge between RecordBatch and VectorSchemaRoot --- cpp/src/jni/dataset/jni_util.cc | 27 +++- cpp/src/jni/dataset/jni_util.h | 18 ++- cpp/src/jni/dataset/jni_util_test.cc | 3 +- cpp/src/jni/dataset/jni_wrapper.cc | 110 ++------------- java/dataset/CMakeLists.txt | 1 - java/dataset/pom.xml | 20 ++- .../apache/arrow/dataset/file/FileFormat.java | 2 +- .../apache/arrow/dataset/jni/JniWrapper.java | 10 +- .../dataset/jni/NativeRecordBatchHandle.java | 106 -------------- .../arrow/dataset/jni/NativeScanTask.java | 3 +- .../arrow/dataset/jni/NativeScanner.java | 133 ++++++++---------- .../arrow/dataset/scanner/ScanTask.java | 15 +- .../arrow/memory/NativeUnderlyingMemory.java | 81 ----------- .../arrow/dataset/ParquetWriteSupport.java | 1 - .../org/apache/arrow/dataset/TestDataset.java | 30 +++- .../dataset/file/TestFileSystemDataset.java | 41 +++--- .../memory/TestNativeUnderlyingMemory.java | 110 --------------- java/pom.xml | 1 + 18 files changed, 185 insertions(+), 527 deletions(-) delete mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java delete mode 100644 java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java delete mode 100644 java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java diff --git a/cpp/src/jni/dataset/jni_util.cc b/cpp/src/jni/dataset/jni_util.cc index 113669a4cf6..aea65e5ee27 100644 --- a/cpp/src/jni/dataset/jni_util.cc +++ b/cpp/src/jni/dataset/jni_util.cc @@ -17,10 +17,13 @@ #include "jni/dataset/jni_util.h" -#include "arrow/util/logging.h" - +#include #include +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/util/logging.h" + namespace arrow { namespace dataset { namespace jni { @@ -162,6 +165,15 @@ std::shared_ptr ReservationListenableMemoryPool::get_listen ReservationListenableMemoryPool::~ReservationListenableMemoryPool() {} +Status CheckException(JNIEnv* env) { + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->ExceptionClear(); + return Status::Invalid("Error during calling Java code from native code"); + } + return Status::OK(); +} + jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { jclass local_class = env->FindClass(class_name); jclass global_class = (jclass)env->NewGlobalRef(local_class); @@ -236,6 +248,17 @@ arrow::Result> FromSchemaByteArray( env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT); return schema; } +arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr& batch, + jlong struct_array) { + return arrow::ExportRecordBatch(*batch, + reinterpret_cast(struct_array)); +} + +arrow::Result> ImportRecordBatch( + JNIEnv* env, const std::shared_ptr& schema, jlong struct_array) { + return arrow::ImportRecordBatch(reinterpret_cast(struct_array), + schema); +} } // namespace jni } // namespace dataset diff --git a/cpp/src/jni/dataset/jni_util.h b/cpp/src/jni/dataset/jni_util.h index c76033ae633..552ce6f2aa8 100644 --- a/cpp/src/jni/dataset/jni_util.h +++ b/cpp/src/jni/dataset/jni_util.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "arrow/array.h" #include "arrow/io/api.h" #include "arrow/ipc/api.h" @@ -24,12 +26,12 @@ #include "arrow/result.h" #include "arrow/type.h" -#include - namespace arrow { namespace dataset { namespace jni { +Status CheckException(JNIEnv* env); + jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name); arrow::Result GetMethodID(JNIEnv* env, jclass this_class, const char* name, @@ -48,6 +50,18 @@ arrow::Result ToSchemaByteArray(JNIEnv* env, arrow::Result> FromSchemaByteArray(JNIEnv* env, jbyteArray schemaBytes); +/// \brief Export arrow::RecordBatch for Java (or other JVM languages) use. +/// The exported batch is subject to C data interface specification and can be +/// imported from Java side using provided JNI utilities. +arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr& batch, + jlong struct_array); + +/// \brief Import arrow::RecordBatch from JVM language side. The input data should +/// ideally be exported from specific JNI utilities from JVM language side and should +/// conform to C data interface specification. +arrow::Result> ImportRecordBatch( + JNIEnv* env, const std::shared_ptr& schema, jlong struct_array); + /// \brief Create a new shared_ptr on heap from shared_ptr t to prevent /// the managed object from being garbage-collected. /// diff --git a/cpp/src/jni/dataset/jni_util_test.cc b/cpp/src/jni/dataset/jni_util_test.cc index 589f00b1cc7..eec1ad245ab 100644 --- a/cpp/src/jni/dataset/jni_util_test.cc +++ b/cpp/src/jni/dataset/jni_util_test.cc @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include "jni/dataset/jni_util.h" + #include #include "arrow/memory_pool.h" #include "arrow/testing/gtest_util.h" -#include "jni/dataset/jni_util.h" namespace arrow { namespace dataset { diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index 041542804ce..457b50dc7ca 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -24,9 +24,7 @@ #include "arrow/filesystem/localfs.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" - #include "jni/dataset/jni_util.h" - #include "org_apache_arrow_dataset_file_JniWrapper.h" #include "org_apache_arrow_dataset_jni_JniWrapper.h" #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h" @@ -37,14 +35,8 @@ jclass illegal_access_exception_class; jclass illegal_argument_exception_class; jclass runtime_exception_class; -jclass record_batch_handle_class; -jclass record_batch_handle_field_class; -jclass record_batch_handle_buffer_class; jclass java_reservation_listener_class; -jmethodID record_batch_handle_constructor; -jmethodID record_batch_handle_field_constructor; -jmethodID record_batch_handle_buffer_constructor; jmethodID reserve_memory_method; jmethodID unreserve_memory_method; @@ -100,11 +92,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener { return arrow::Status::Invalid("JNIEnv was not attached to current thread"); } env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size); - if (env->ExceptionCheck()) { - env->ExceptionDescribe(); - env->ExceptionClear(); - return arrow::Status::Invalid("Error calling Java side reservation listener"); - } + RETURN_NOT_OK(arrow::dataset::jni::CheckException(env)); return arrow::Status::OK(); } @@ -114,11 +102,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener { return arrow::Status::Invalid("JNIEnv was not attached to current thread"); } env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size); - if (env->ExceptionCheck()) { - env->ExceptionDescribe(); - env->ExceptionClear(); - return arrow::Status::Invalid("Error calling Java side reservation listener"); - } + RETURN_NOT_OK(arrow::dataset::jni::CheckException(env)); return arrow::Status::OK(); } @@ -206,33 +190,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { runtime_exception_class = CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;"); - record_batch_handle_class = - CreateGlobalClassReference(env, - "Lorg/apache/arrow/" - "dataset/jni/NativeRecordBatchHandle;"); - record_batch_handle_field_class = - CreateGlobalClassReference(env, - "Lorg/apache/arrow/" - "dataset/jni/NativeRecordBatchHandle$Field;"); - record_batch_handle_buffer_class = - CreateGlobalClassReference(env, - "Lorg/apache/arrow/" - "dataset/jni/NativeRecordBatchHandle$Buffer;"); java_reservation_listener_class = CreateGlobalClassReference(env, "Lorg/apache/arrow/" "dataset/jni/ReservationListener;"); - - record_batch_handle_constructor = - JniGetOrThrow(GetMethodID(env, record_batch_handle_class, "", - "(J[Lorg/apache/arrow/dataset/" - "jni/NativeRecordBatchHandle$Field;" - "[Lorg/apache/arrow/dataset/" - "jni/NativeRecordBatchHandle$Buffer;)V")); - record_batch_handle_field_constructor = - JniGetOrThrow(GetMethodID(env, record_batch_handle_field_class, "", "(JJ)V")); - record_batch_handle_buffer_constructor = JniGetOrThrow( - GetMethodID(env, record_batch_handle_buffer_class, "", "(JJJJ)V")); reserve_memory_method = JniGetOrThrow(GetMethodID(env, java_reservation_listener_class, "reserve", "(J)V")); unreserve_memory_method = JniGetOrThrow( @@ -250,9 +211,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(illegal_access_exception_class); env->DeleteGlobalRef(illegal_argument_exception_class); env->DeleteGlobalRef(runtime_exception_class); - env->DeleteGlobalRef(record_batch_handle_class); - env->DeleteGlobalRef(record_batch_handle_field_class); - env->DeleteGlobalRef(record_batch_handle_buffer_class); env->DeleteGlobalRef(java_reservation_listener_class); default_memory_pool_id = -1L; @@ -458,10 +416,10 @@ Java_org_apache_arrow_dataset_jni_JniWrapper_getSchemaFromScanner(JNIEnv* env, j /* * Class: org_apache_arrow_dataset_jni_JniWrapper * Method: nextRecordBatch - * Signature: (J)Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle; + * Signature: (JJ)Z */ -JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch( - JNIEnv* env, jobject, jlong scanner_id) { +JNIEXPORT jboolean JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch( + JNIEnv* env, jobject, jlong scanner_id, jlong struct_array) { JNI_METHOD_START std::shared_ptr scanner_adaptor = RetrieveNativeInstance(scanner_id); @@ -469,60 +427,12 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor std::shared_ptr record_batch = JniGetOrThrow(scanner_adaptor->Next()); if (record_batch == nullptr) { - return nullptr; // stream ended - } - std::shared_ptr schema = record_batch->schema(); - jobjectArray field_array = - env->NewObjectArray(schema->num_fields(), record_batch_handle_field_class, nullptr); - - std::vector> buffers; - for (int i = 0; i < schema->num_fields(); ++i) { - // TODO: If the array has an offset then we need to de-offset the array - // in order for it to be properly consumed on the Java end. - // This forces a copy, it would be nice to avoid this if Java - // could consume offset-arrays. Perhaps at some point in the future - // using the C data interface. See ARROW-15275 - // - // Generally a non-zero offset will occur whenever the scanner batch - // size is smaller than the batch size of the underlying files. - auto column = record_batch->column(i); - if (column->offset() != 0) { - column = JniGetOrThrow(arrow::Concatenate({column})); - } - auto dataArray = column->data(); - jobject field = env->NewObject(record_batch_handle_field_class, - record_batch_handle_field_constructor, - column->length(), column->null_count()); - env->SetObjectArrayElement(field_array, i, field); - - for (auto& buffer : dataArray->buffers) { - buffers.push_back(buffer); - } + return false; // stream ended } - - jobjectArray buffer_array = - env->NewObjectArray(buffers.size(), record_batch_handle_buffer_class, nullptr); - - for (size_t j = 0; j < buffers.size(); ++j) { - auto buffer = buffers[j]; - uint8_t* data = nullptr; - int64_t size = 0; - int64_t capacity = 0; - if (buffer != nullptr) { - data = (uint8_t*)buffer->data(); - size = buffer->size(); - capacity = buffer->capacity(); - } - jobject buffer_handle = env->NewObject(record_batch_handle_buffer_class, - record_batch_handle_buffer_constructor, - CreateNativeRef(buffer), data, size, capacity); - env->SetObjectArrayElement(buffer_array, j, buffer_handle); - } - - jobject ret = env->NewObject(record_batch_handle_class, record_batch_handle_constructor, - record_batch->num_rows(), field_array, buffer_array); - return ret; - JNI_METHOD_END(nullptr) + JniAssertOkOrThrow( + arrow::dataset::jni::ExportRecordBatch(env, record_batch, struct_array)); + return true; + JNI_METHOD_END(false) } /* diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index 07e2d0ae8fc..5b6e4a9ce24 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -33,7 +33,6 @@ message("generating headers to ${JNI_HEADERS_DIR}") add_jar(arrow_dataset_java src/main/java/org/apache/arrow/dataset/jni/JniLoader.java src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java - src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index 9a80a547c1c..40ee3d2b24f 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -15,7 +15,7 @@ arrow-java-root org.apache.arrow - 8.0.0-SNAPSHOT + 7.0.0-SNAPSHOT 4.0.0 @@ -44,6 +44,12 @@ ${project.version} compile + + org.apache.arrow + arrow-c-data + ${project.version} + compile + org.apache.arrow arrow-memory-netty @@ -56,6 +62,12 @@ ${parquet.version} test + + org.apache.avro + avro + ${avro.version} + test + org.apache.parquet parquet-hadoop @@ -86,12 +98,6 @@ - - org.apache.avro - avro - ${avro.version} - test - com.google.guava guava diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java index e341d46beac..107fc2f71d2 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -24,7 +24,7 @@ public enum FileFormat { PARQUET(0), NONE(-1); - private int id; + private final int id; FileFormat(int id) { this.id = id; diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index 7dd54e7648f..e10eda0b45e 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -87,21 +87,25 @@ private JniWrapper() { /** * Release the Scanner by destroying its reference held by JNI wrapper. + * * @param scannerId the native pointer of the arrow::dataset::Scanner instance. */ public native void closeScanner(long scannerId); /** * Read next record batch from the specified scanner. + * * @param scannerId the native pointer of the arrow::dataset::Scanner instance. - * @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch. + * @param arrowArray pointer to an empty {@link org.apache.arrow.c.ArrowArray} struct to + * store C++ side record batch that conforms to C data interface. + * @return true if valid record batch is returned; false if stream ended. */ - public native NativeRecordBatchHandle nextRecordBatch(long scannerId); + public native boolean nextRecordBatch(long scannerId, long arrowArray); /** * Release the Buffer by destroying its reference held by JNI wrapper. + * * @param bufferId the native pointer of the arrow::Buffer instance. */ public native void releaseBuffer(long bufferId); - } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java deleted file mode 100644 index dd90fd1c1dd..00000000000 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.dataset.jni; - -import java.util.Arrays; -import java.util.List; - -/** - * Hold pointers to a Arrow C++ RecordBatch. - */ -public class NativeRecordBatchHandle { - - private final long numRows; - private final List fields; - private final List buffers; - - /** - * Constructor. - * - * @param numRows Total row number of the associated RecordBatch - * @param fields Metadata of fields - * @param buffers Retained Arrow buffers - */ - public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) { - this.numRows = numRows; - this.fields = Arrays.asList(fields); - this.buffers = Arrays.asList(buffers); - } - - /** - * Returns the total row number of the associated RecordBatch. - * @return Total row number of the associated RecordBatch. - */ - public long getNumRows() { - return numRows; - } - - /** - * Returns Metadata of fields. - * @return Metadata of fields. - */ - public List getFields() { - return fields; - } - - /** - * Returns the buffers. - * @return Retained Arrow buffers. - */ - public List getBuffers() { - return buffers; - } - - /** - * Field metadata. - */ - public static class Field { - public final long length; - public final long nullCount; - - public Field(long length, long nullCount) { - this.length = length; - this.nullCount = nullCount; - } - } - - /** - * Pointers and metadata of the targeted Arrow buffer. - */ - public static class Buffer { - public final long nativeInstanceId; - public final long memoryAddress; - public final long size; - public final long capacity; - - /** - * Constructor. - * - * @param nativeInstanceId Native instance's id - * @param memoryAddress Memory address of the first byte - * @param size Size (in bytes) - * @param capacity Capacity (in bytes) - */ - public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) { - this.nativeInstanceId = nativeInstanceId; - this.memoryAddress = memoryAddress; - this.size = size; - this.capacity = capacity; - } - } -} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java index 14d89c2ee7c..e4764236dad 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java @@ -18,6 +18,7 @@ package org.apache.arrow.dataset.jni; import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.vector.ipc.ArrowReader; /** * Native implementation of {@link ScanTask}. Currently RecordBatches are iterated directly by the scanner @@ -35,7 +36,7 @@ public NativeScanTask(NativeScanner scanner) { } @Override - public BatchIterator execute() { + public ArrowReader execute() { return scanner.execute(); } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index 24c298067af..de18f9e5e0b 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -18,23 +18,19 @@ package org.apache.arrow.dataset.jni; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; -import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.Data; import org.apache.arrow.dataset.scanner.Scanner; -import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.BufferLedger; -import org.apache.arrow.memory.NativeUnderlyingMemory; -import org.apache.arrow.memory.util.LargeMemoryUtil; -import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.SchemaUtility; @@ -60,7 +56,7 @@ public NativeScanner(NativeContext context, long scannerId) { this.scannerId = scannerId; } - ScanTask.BatchIterator execute() { + ArrowReader execute() { if (closed) { throw new NativeInstanceReleasedException(); } @@ -68,67 +64,7 @@ ScanTask.BatchIterator execute() { throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " + "new scanner instead"); } - return new ScanTask.BatchIterator() { - private ArrowRecordBatch peek = null; - - @Override - public void close() { - NativeScanner.this.close(); - } - - @Override - public boolean hasNext() { - if (peek != null) { - return true; - } - final NativeRecordBatchHandle handle; - readLock.lock(); - try { - if (closed) { - throw new NativeInstanceReleasedException(); - } - handle = JniWrapper.get().nextRecordBatch(scannerId); - } finally { - readLock.unlock(); - } - if (handle == null) { - return false; - } - final ArrayList buffers = new ArrayList<>(); - for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) { - final BufferAllocator allocator = context.getAllocator(); - final int size = LargeMemoryUtil.checkedCastToInt(buffer.size); - final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator, - size, buffer.nativeInstanceId, buffer.memoryAddress); - BufferLedger ledger = am.associate(allocator); - ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress); - buffers.add(buf); - } - - try { - final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows()); - peek = new ArrowRecordBatch(numRows, handle.getFields().stream() - .map(field -> new ArrowFieldNode(field.length, field.nullCount)) - .collect(Collectors.toList()), buffers); - return true; - } finally { - buffers.forEach(buffer -> buffer.getReferenceManager().release()); - } - } - - @Override - public ArrowRecordBatch next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - try { - return peek; - } finally { - peek = null; - } - } - }; + return new NativeReader(context.getAllocator()); } @Override @@ -167,4 +103,59 @@ public void close() { writeLock.unlock(); } } + + /** + * {@link ArrowReader} implementation for NativeDataset. + */ + public class NativeReader extends ArrowReader { + + private NativeReader(BufferAllocator allocator) { + super(allocator); + } + + @Override + protected void loadRecordBatch(ArrowRecordBatch batch) { + throw new UnsupportedOperationException(); + } + + @Override + protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean loadNextBatch() throws IOException { + readLock.lock(); + try { + if (closed) { + throw new NativeInstanceReleasedException(); + } + try (ArrowArray arrowArray = ArrowArray.allocateNew(context.getAllocator())) { + if (!JniWrapper.get().nextRecordBatch(scannerId, arrowArray.memoryAddress())) { + return false; + } + final VectorSchemaRoot vsr = getVectorSchemaRoot(); + Data.importIntoVectorSchemaRoot(context.getAllocator(), arrowArray, vsr, this); + } + } finally { + readLock.unlock(); + } + return true; + } + + @Override + public long bytesRead() { + return 0L; + } + + @Override + protected void closeReadSource() throws IOException { + // no-op + } + + @Override + protected Schema readSchema() throws IOException { + return schema(); + } + } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java index d07036a61ee..434f5c9a6fa 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java @@ -17,9 +17,9 @@ package org.apache.arrow.dataset.scanner; -import java.util.Iterator; +import java.io.Reader; -import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.ArrowReader; /** * Read record batches from a range of a single data fragment. A @@ -29,14 +29,7 @@ public interface ScanTask extends AutoCloseable { /** - * Creates and returns a {@link BatchIterator} instance. + * Execute this ScanTask and return a {@link Reader} instance. */ - BatchIterator execute(); - - /** - * The iterator implementation for {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}s. - */ - interface BatchIterator extends Iterator, AutoCloseable { - - } + ArrowReader execute(); } diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java deleted file mode 100644 index 963fb617040..00000000000 --- a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.memory; - -import org.apache.arrow.dataset.jni.JniWrapper; - -/** - * AllocationManager implementation for native allocated memory. - */ -public class NativeUnderlyingMemory extends AllocationManager { - - private final int size; - private final long nativeInstanceId; - private final long address; - - /** - * Constructor. - * - * @param accountingAllocator The accounting allocator instance - * @param size Size of underlying memory (in bytes) - * @param nativeInstanceId ID of the native instance - */ - NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) { - super(accountingAllocator); - this.size = size; - this.nativeInstanceId = nativeInstanceId; - this.address = address; - // pre-allocate bytes on accounting allocator - final AllocationListener listener = accountingAllocator.getListener(); - try (final AllocationReservation reservation = accountingAllocator.newReservation()) { - listener.onPreAllocation(size); - reservation.reserve(size); - listener.onAllocation(size); - } catch (Exception e) { - release0(); - throw e; - } - } - - /** - * Alias to constructor. - */ - public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId, - long address) { - return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address); - } - - public BufferLedger associate(BufferAllocator allocator) { - return super.associate(allocator); - } - - @Override - protected void release0() { - JniWrapper.get().releaseBuffer(nativeInstanceId); - } - - @Override - public long getSize() { - return size; - } - - @Override - protected long memoryAddress() { - return address; - } -} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java index efdaf46dd56..e436182795c 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Random; -import org.apache.arrow.util.Preconditions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java index 51dac15e561..15224534d28 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -17,6 +17,8 @@ package org.apache.arrow.dataset; +import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Spliterator; @@ -26,11 +28,15 @@ import java.util.stream.StreamSupport; import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; import org.apache.arrow.dataset.source.DatasetFactory; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.junit.After; @@ -56,15 +62,31 @@ protected RootAllocator rootAllocator() { protected List collectResultFromFactory(DatasetFactory factory, ScanOptions options) { final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); - final List ret = stream(scanner.scan()) - .flatMap(t -> stream(t.execute())) - .collect(Collectors.toList()); try { + final List ret = stream(scanner.scan()) + .flatMap(t -> stream(collectTaskData(t))) + .collect(Collectors.toList()); AutoCloseables.close(scanner, dataset); + return ret; + } catch (RuntimeException e) { + throw e; } catch (Exception e) { throw new RuntimeException(e); } - return ret; + } + + protected List collectTaskData(ScanTask scanTask) { + try (ArrowReader reader = scanTask.execute()) { + List batches = new ArrayList<>(); + while (reader.loadNextBatch()) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + final VectorUnloader unloader = new VectorUnloader(root); + batches.add(unloader.getRecordBatch()); + } + return batches; + } catch (IOException e) { + throw new RuntimeException(e); + } } protected Schema inferResultSchemaFromFactory(DatasetFactory factory, ScanOptions options) { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 83d57c7421b..a2335668200 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -44,11 +44,12 @@ import org.apache.arrow.dataset.jni.NativeScanner; import org.apache.arrow.dataset.jni.TestNativeDataset; import org.apache.arrow.dataset.scanner.ScanOptions; -import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.Schema; @@ -90,6 +91,7 @@ public void testBaseParquetRead() throws Exception { checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); AutoCloseables.close(datum); + AutoCloseables.close(factory); } @Test @@ -116,6 +118,7 @@ public void testParquetProjectSingleColumn() throws Exception { .build()), datum); AutoCloseables.close(datum); + AutoCloseables.close(factory); } @Test @@ -135,6 +138,7 @@ public void testParquetBatchSize() throws Exception { checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); AutoCloseables.close(datum); + AutoCloseables.close(factory); } @Test @@ -217,6 +221,8 @@ public void testNoErrorWhenCloseAgain() throws Exception { dataset.close(); dataset.close(); }); + + AutoCloseables.close(factory); } @Test @@ -232,13 +238,14 @@ public void testErrorThrownWhenScanAgain() throws Exception { List taskList2 = collect(scanner.scan()); NativeScanTask task1 = taskList1.get(0); NativeScanTask task2 = taskList2.get(0); - List datum = collect(task1.execute()); + List datum = collectTaskData(task1); + + AutoCloseables.close(datum); UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute); Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead", uoe.getMessage()); - AutoCloseables.close(datum); AutoCloseables.close(taskList1); AutoCloseables.close(taskList2); AutoCloseables.close(scanner, dataset, factory); @@ -256,7 +263,7 @@ public void testScanInOtherThread() throws Exception { NativeScanner scanner = dataset.newScan(options); List taskList = collect(scanner.scan()); NativeScanTask task = taskList.get(0); - List datum = executor.submit(() -> collect(task.execute())).get(); + List datum = executor.submit(() -> collectTaskData(task)).get(); AutoCloseables.close(datum); AutoCloseables.close(taskList); @@ -274,6 +281,7 @@ public void testErrorThrownWhenScanAfterScannerClose() throws Exception { NativeScanner scanner = dataset.newScan(options); scanner.close(); assertThrows(NativeInstanceReleasedException.class, scanner::scan); + AutoCloseables.close(factory); } @Test @@ -289,6 +297,7 @@ public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception { NativeScanTask task = tasks.get(0); task.close(); assertThrows(NativeInstanceReleasedException.class, task::execute); + AutoCloseables.close(factory); } @Test @@ -302,28 +311,10 @@ public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exceptio NativeScanner scanner = dataset.newScan(options); List tasks = collect(scanner.scan()); NativeScanTask task = tasks.get(0); - ScanTask.BatchIterator iterator = task.execute(); + ArrowReader reader = task.execute(); task.close(); - assertThrows(NativeInstanceReleasedException.class, iterator::hasNext); - } - - @Test - public void testMemoryAllocationOnAssociatedAllocator() throws Exception { - ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); - FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), - FileFormat.PARQUET, writeSupport.getOutputURI()); - ScanOptions options = new ScanOptions(100); - long initReservation = rootAllocator().getAllocatedMemory(); - List datum = collectResultFromFactory(factory, options); - final long expected_diff = datum.stream() - .flatMapToLong(batch -> batch.getBuffers() - .stream() - .mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum(); - long reservation = rootAllocator().getAllocatedMemory(); - AutoCloseables.close(datum); - long finalReservation = rootAllocator().getAllocatedMemory(); - Assert.assertEquals(expected_diff, reservation - initReservation); - Assert.assertEquals(-expected_diff, finalReservation - reservation); + assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch); + AutoCloseables.close(factory); } private void checkParquetReadResult(Schema schema, String expectedJson, List actual) diff --git a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java deleted file mode 100644 index c81868e42b2..00000000000 --- a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.memory; - -import static org.junit.Assert.*; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestNativeUnderlyingMemory { - - private RootAllocator allocator = null; - - @Before - public void setUp() { - allocator = new RootAllocator(Long.MAX_VALUE); - } - - @After - public void tearDown() { - allocator.close(); - } - - protected RootAllocator rootAllocator() { - return allocator; - } - - @Test - public void testReservation() { - final RootAllocator root = rootAllocator(); - - final int size = 512; - final AllocationManager am = new MockUnderlyingMemory(root, size); - final BufferLedger ledger = am.associate(root); - - assertEquals(size, root.getAllocatedMemory()); - - ledger.release(); - } - - @Test - public void testBufferTransfer() { - final RootAllocator root = rootAllocator(); - - ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE); - ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE); - assertEquals(0, allocator1.getAllocatedMemory()); - assertEquals(0, allocator2.getAllocatedMemory()); - - final int size = 512; - final AllocationManager am = new MockUnderlyingMemory(allocator1, size); - - final BufferLedger owningLedger = am.associate(allocator1); - assertEquals(size, owningLedger.getAccountedSize()); - assertEquals(size, owningLedger.getSize()); - assertEquals(size, allocator1.getAllocatedMemory()); - - final BufferLedger transferredLedger = am.associate(allocator2); - owningLedger.release(); // release previous owner - assertEquals(0, owningLedger.getAccountedSize()); - assertEquals(size, owningLedger.getSize()); - assertEquals(size, transferredLedger.getAccountedSize()); - assertEquals(size, transferredLedger.getSize()); - assertEquals(0, allocator1.getAllocatedMemory()); - assertEquals(size, allocator2.getAllocatedMemory()); - - transferredLedger.release(); - allocator1.close(); - allocator2.close(); - } - - /** - * A mock class of {@link NativeUnderlyingMemory} for unit testing about size-related operations. - */ - private static class MockUnderlyingMemory extends NativeUnderlyingMemory { - - /** - * Constructor. - */ - MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) { - super(accountingAllocator, size, -1L, -1L); - } - - @Override - protected void release0() { - System.out.println("Underlying memory released. Size: " + getSize()); - } - - @Override - protected long memoryAddress() { - throw new UnsupportedOperationException(); - } - } -} diff --git a/java/pom.xml b/java/pom.xml index edcaac6b871..c74023c9e7b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -716,6 +716,7 @@ adapter/orc gandiva dataset + c From 047df06718d59bef8c8b671db61815ec44030277 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sun, 24 Apr 2022 20:49:47 +0800 Subject: [PATCH 2/6] fixup --- java/dataset/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index 40ee3d2b24f..c39f5e28962 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -15,7 +15,7 @@ arrow-java-root org.apache.arrow - 7.0.0-SNAPSHOT + 8.0.0-SNAPSHOT 4.0.0 From 699e0bfb4f612e6f3403db61dd03c7dcb7dc68d3 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sun, 24 Apr 2022 21:49:22 +0800 Subject: [PATCH 3/6] checkstyle --- .../org/apache/arrow/dataset/file/TestFileSystemDataset.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index a2335668200..92610b1145c 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -44,7 +44,6 @@ import org.apache.arrow.dataset.jni.NativeScanner; import org.apache.arrow.dataset.jni.TestNativeDataset; import org.apache.arrow.dataset.scanner.ScanOptions; -import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorLoader; From 21ec0b0ffd13a5f3662b1784036b700e018758d6 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Sun, 24 Apr 2022 22:41:01 +0800 Subject: [PATCH 4/6] fixup --- .../test/java/org/apache/arrow/dataset/ParquetWriteSupport.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java index e436182795c..efdaf46dd56 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/ParquetWriteSupport.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Random; +import org.apache.arrow.util.Preconditions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; From 74ae92a2e479e4b796b03dccbe9cefcb8f433e30 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 25 Apr 2022 11:24:36 +0800 Subject: [PATCH 5/6] fixup --- cpp/src/jni/dataset/jni_wrapper.cc | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index 457b50dc7ca..2cc80ce7612 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -429,8 +429,26 @@ JNIEXPORT jboolean JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextReco if (record_batch == nullptr) { return false; // stream ended } + std::vector> offset_zeroed_arrays; + for (int i = 0; i < record_batch->num_columns(); ++i) { + // TODO: If the array has an offset then we need to de-offset the array + // in order for it to be properly consumed on the Java end. + // This forces a copy, it would be nice to avoid this if Java + // could consume offset-arrays. Perhaps at some point in the future + // using the C data interface. See ARROW-15275 + // + // Generally a non-zero offset will occur whenever the scanner batch + // size is smaller than the batch size of the underlying files. + std::shared_ptr array = record_batch->column(i); + std::shared_ptr offset_zeroed = + JniGetOrThrow(arrow::Concatenate({array})); + offset_zeroed_arrays.push_back(offset_zeroed); + } + + std::shared_ptr offset_zeroed_batch = arrow::RecordBatch::Make( + record_batch->schema(), record_batch->num_rows(), offset_zeroed_arrays); JniAssertOkOrThrow( - arrow::dataset::jni::ExportRecordBatch(env, record_batch, struct_array)); + arrow::dataset::jni::ExportRecordBatch(env, offset_zeroed_batch, struct_array)); return true; JNI_METHOD_END(false) } From 879ccb0ac5a88069d0267365f83d5b2094690d00 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 25 Apr 2022 11:26:32 +0800 Subject: [PATCH 6/6] fixup --- cpp/src/jni/dataset/jni_wrapper.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index 2cc80ce7612..1e5c7a8aa72 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -440,6 +440,10 @@ JNIEXPORT jboolean JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextReco // Generally a non-zero offset will occur whenever the scanner batch // size is smaller than the batch size of the underlying files. std::shared_ptr array = record_batch->column(i); + if (array->offset() == 0) { + offset_zeroed_arrays.push_back(array); + continue; + } std::shared_ptr offset_zeroed = JniGetOrThrow(arrow::Concatenate({array})); offset_zeroed_arrays.push_back(offset_zeroed);