From 58ebf06214b45a0b83357b22bafb5ee246e2c4d6 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Fri, 16 Sep 2022 10:29:50 +0000 Subject: [PATCH 01/14] support parquet write from scanner to file --- java/dataset/CMakeLists.txt | 1 + java/dataset/src/main/cpp/jni_wrapper.cc | 164 ++++++++++++++++++ .../arrow/dataset/file/DatasetFileWriter.java | 75 ++++++++ .../apache/arrow/dataset/file/JniWrapper.java | 18 ++ .../dataset/file/NativeScannerAdaptor.java | 35 ++++ .../file/NativeScannerAdaptorImpl.java | 146 ++++++++++++++++ .../jni/NativeRecordBatchIterator.java | 32 ++++ 7 files changed, 471 insertions(+) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index 315163a537c..cc0c384d4ec 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -26,6 +26,7 @@ add_jar(arrow_java_jni_dataset_jar src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java + src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java GENERATE_NATIVE_HEADERS arrow_java_jni_dataset_headers) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index aa7d7670232..e76c6676565 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -20,15 +20,19 @@ #include "arrow/array.h" #include "arrow/array/concatenate.h" #include "arrow/dataset/api.h" +#include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/ipc/api.h" +#include "arrow/c/helpers.h" #include "arrow/util/iterator.h" #include "jni_util.h" #include "org_apache_arrow_dataset_file_JniWrapper.h" #include "org_apache_arrow_dataset_jni_JniWrapper.h" #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h" +#include + namespace { jclass illegal_access_exception_class; @@ -36,9 +40,12 @@ jclass illegal_argument_exception_class; jclass runtime_exception_class; jclass java_reservation_listener_class; +jclass native_record_batch_iterator_class; jmethodID reserve_memory_method; jmethodID unreserve_memory_method; +jmethodID native_record_batch_iterator_hasNext; +jmethodID native_record_batch_iterator_next; jlong default_memory_pool_id = -1L; @@ -176,6 +183,116 @@ class DisposableScannerAdaptor { } }; +/// \brief Simple fragment implementation that is constructed directly +/// from a record batch iterator. +class SimpleIteratorFragment : public arrow::dataset::Fragment { + public: + explicit SimpleIteratorFragment(arrow::RecordBatchIterator itr) + : arrow::dataset::Fragment() { + itr_ = std::move(itr); + } + + static arrow::Result> Make( + arrow::RecordBatchIterator itr) { + return std::make_shared(std::move(itr)); + } + + arrow::Result ScanBatchesAsync( + const std::shared_ptr& options) override { + struct State { + State(std::shared_ptr fragment) + : fragment(std::move(fragment)) {} + + std::shared_ptr Next() { return cur_rb; } + + bool Finished() { + arrow::Result> next = fragment->itr_.Next(); + if (IsIterationEnd(next)) { + cur_rb = nullptr; + + return true; + } else { + cur_rb = next.ValueOrDie(); + return false; + } + } + + std::shared_ptr fragment; + std::shared_ptr cur_rb = nullptr; + }; + + struct Generator { + Generator(std::shared_ptr fragment) + : state(std::make_shared(std::move(fragment))) {} + + arrow::Future> operator()() { + while (!state->Finished()) { + auto next = state->Next(); + if (next) { + return arrow::Future>::MakeFinished( + std::move(next)); + } + } + return arrow::AsyncGeneratorEnd>(); + } + + std::shared_ptr state; + }; + return Generator(arrow::internal::checked_pointer_cast( + shared_from_this())); + } + + std::string type_name() const override { return "simple_iterator"; } + + arrow::Result> ReadPhysicalSchemaImpl() override { + return arrow::Status::NotImplemented("No physical schema is readable"); + } + + private: + arrow::RecordBatchIterator itr_; + bool used_ = false; +}; + +/// \brief Create scanner that scans over Java dataset API's components. +/// +/// Currently, we use a NativeRecordBatchIterator as the underlying +/// Java object to do scanning. Which means, only one single task will +/// be produced from C++ code. +arrow::Result> MakeJavaDatasetScanner( + JavaVM* vm, jobject iter, std::shared_ptr schema) { + arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator( + [vm, iter, schema]() -> arrow::Result> { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return arrow::Status::Invalid("JNIEnv was not attached to current thread"); + } + if (!env->CallBooleanMethod(iter, native_record_batch_iterator_hasNext)) { + return nullptr; // stream ended + } + + auto bytes = + (jbyteArray)env->CallObjectMethod(iter, native_record_batch_iterator_next); + auto byte_array = env->GetByteArrayElements(bytes, 0); + int64_t memory_address; + std::memcpy(&memory_address, byte_array, sizeof(int64_t)); + + std::shared_ptr rb = JniGetOrThrow( + arrow::dataset::jni::ImportRecordBatch(env, schema, memory_address)); + // Release the ArrowArray + auto c_array = reinterpret_cast(memory_address); + ArrowArrayRelease(c_array); + return rb; + }); + + ARROW_ASSIGN_OR_RAISE(auto fragment, SimpleIteratorFragment::Make(std::move(itr))) + + arrow::dataset::ScannerBuilder scanner_builder( + std::move(schema), fragment, std::make_shared()); + // Use default memory pool is enough as native allocation is ideally + // not being called during scanning Java-based fragments. + RETURN_NOT_OK(scanner_builder.Pool(arrow::default_memory_pool())); + return scanner_builder.Finish(); +} } // namespace using arrow::dataset::jni::CreateGlobalClassReference; @@ -228,6 +345,15 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { unreserve_memory_method = JniGetOrThrow( GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V")); + native_record_batch_iterator_class = + CreateGlobalClassReference(env, + "Lorg/apache/arrow/" + "dataset/jni/NativeRecordBatchIterator;"); + native_record_batch_iterator_hasNext = JniGetOrThrow( + GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); + native_record_batch_iterator_next = + JniGetOrThrow(GetMethodID(env, native_record_batch_iterator_class, "next", "()[B")); + default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); return JNI_VERSION; @@ -241,6 +367,7 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(illegal_argument_exception_class); env->DeleteGlobalRef(runtime_exception_class); env->DeleteGlobalRef(java_reservation_listener_class); + env->DeleteGlobalRef(native_record_batch_iterator_class); default_memory_pool_id = -1L; } @@ -516,3 +643,40 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( return CreateNativeRef(d); JNI_METHOD_END(-1L) } + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: writeFromScannerToFile + * Signature: + * (Lorg/apache/arrow/dataset/jni/NativeRecordBatchIterator;[BJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( + JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jlong file_format_id, + jstring uri, jobjectArray partition_columns, jint max_partitions, + jstring base_name_template) { + JNI_METHOD_START + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + JniThrow("Unable to get JavaVM instance"); + } + auto schema = JniGetOrThrow(FromSchemaByteArray(env, schema_bytes)); + auto scanner = JniGetOrThrow(MakeJavaDatasetScanner(vm, itr, schema)); + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemDatasetWriteOptions options; + std::string output_path; + auto filesystem = JniGetOrThrow( + arrow::fs::FileSystemFromUri(JStringToCString(env, uri), &output_path)); + std::vector partition_column_vector = + ToStringVector(env, partition_columns); + options.file_write_options = file_format->DefaultWriteOptions(); + options.filesystem = filesystem; + options.base_dir = output_path; + options.basename_template = JStringToCString(env, base_name_template); + options.partitioning = std::make_shared( + arrow::dataset::SchemaFromColumnNames(schema, partition_column_vector)); + options.max_partitions = max_partitions; + JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); + JNI_METHOD_END() +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java new file mode 100644 index 00000000000..00da9d1ef37 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.util.SchemaUtility; + +/** + * JNI-based utility to write datasets into files. It internally depends on C++ static method + * FileSystemDataset::Write. + */ +public class DatasetFileWriter { + + /** + * Scan over an input {@link Scanner} then write all record batches to file. + * + * @param scanner the source scanner for writing + * @param format target file format + * @param uri target file uri + * @param maxPartitions maximum partitions to be included in written files + * @param partitionColumns columns used to partition output files. Empty to disable partitioning + * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition + * ID around all written files. + */ + public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, + String[] partitionColumns, int maxPartitions, String baseNameTemplate) { + final NativeScannerAdaptorImpl adaptor = new NativeScannerAdaptorImpl(scanner, allocator); + final NativeRecordBatchIterator itr = adaptor.scan(); + RuntimeException throwableWrapper = null; + try { + JniWrapper.get().writeFromScannerToFile(itr, SchemaUtility.serialize(scanner.schema()), + format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); + } catch (Throwable t) { + throwableWrapper = new RuntimeException(t); + throw throwableWrapper; + } finally { + try { + AutoCloseables.close(itr); + } catch (Exception e) { + if (throwableWrapper != null) { + throwableWrapper.addSuppressed(e); + } + } + } + } + + /** + * Scan over an input {@link Scanner} then write all record batches to file, with default partitioning settings. + * + * @param scanner the source scanner for writing + * @param format target file format + * @param uri target file uri + */ + public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri) { + write(allocator, scanner, format, uri, new String[0], 1024, "dat_{i}"); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 6e65803a333..04f85ca1760 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -18,6 +18,7 @@ package org.apache.arrow.dataset.file; import org.apache.arrow.dataset.jni.JniLoader; +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; /** * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations. @@ -45,4 +46,21 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Write all record batches in a {@link NativeRecordBatchIterator} into files. This internally + * depends on C++ write API: FileSystemDataset::Write. + * + * @param itr iterator to be used for writing + * @param schema serialized schema of output files + * @param fileFormat target file format (ID) + * @param uri target file uri + * @param partitionColumns columns used to partition output files + * @param maxPartitions maximum partitions to be included in written files + * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition + * ID around all written files. + */ + public native void writeFromScannerToFile(NativeRecordBatchIterator itr, byte[] schema, + long fileFormat, String uri, String[] partitionColumns, int maxPartitions, + String baseNameTemplate); + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java new file mode 100644 index 00000000000..da3ec75b2c2 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; + +/** + * A short path comparing to {@link org.apache.arrow.dataset.scanner.Scanner} for being called from C++ side + * via JNI, to minimize JNI call overhead. + */ +public interface NativeScannerAdaptor { + + /** + * Scan with the delegated scanner. + * + * @return a iterator outputting JNI-friendly flatbuffers-serialized + * {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} instances. + */ + NativeRecordBatchIterator scan(); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java new file mode 100644 index 00000000000..d6c4d05e895 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.Data; +import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Default implementation of {@link NativeScannerAdaptor}. + */ +public class NativeScannerAdaptorImpl implements NativeScannerAdaptor, AutoCloseable { + + private final Scanner scanner; + private final BufferAllocator allocator; + + /** + * Constructor. + * + * @param scanner the delegated scanner. + */ + public NativeScannerAdaptorImpl(Scanner scanner, BufferAllocator allocator) { + this.scanner = scanner; + this.allocator = allocator; + } + + @Override + public NativeRecordBatchIterator scan() { + final Iterable tasks = scanner.scan(); + return new IteratorImpl(tasks, allocator); + } + + @Override + public void close() throws Exception { + scanner.close(); + } + + private static class IteratorImpl implements NativeRecordBatchIterator { + + private final Iterator taskIterator; + + private ScanTask currentTask = null; + private ArrowReader reader = null; + + private BufferAllocator allocator = null; + + public IteratorImpl(Iterable tasks, + BufferAllocator allocator) { + this.taskIterator = tasks.iterator(); + this.allocator = allocator; + } + + @Override + public void close() throws Exception { + closeCurrent(); + } + + private void closeCurrent() throws Exception { + if (currentTask == null) { + return; + } + currentTask.close(); + reader.close(); + } + + private boolean advance() { + if (!taskIterator.hasNext()) { + return false; + } + try { + closeCurrent(); + } catch (Exception e) { + throw new RuntimeException(e); + } + currentTask = taskIterator.next(); + reader = currentTask.execute(); + return true; + } + @Override + public boolean hasNext() { + + if (currentTask == null) { + if (!advance()) { + return false; + } + } + try { + if (!reader.loadNextBatch()) { + if (!advance()) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + byte[] longtoBytes(long data) { + return new byte[]{ + (byte) ((data >> 0) & 0xff), + (byte) ((data >> 8) & 0xff), + (byte) ((data >> 16) & 0xff), + (byte) ((data >> 24) & 0xff), + (byte) ((data >> 32) & 0xff), + (byte) ((data >> 40) & 0xff), + (byte) ((data >> 48) & 0xff), + (byte) ((data >> 56) & 0xff), + }; + } + + @Override + public byte[] next() { + ArrowArray arrowArray = ArrowArray.allocateNew(allocator); + try { + Data.exportVectorSchemaRoot(allocator, reader.getVectorSchemaRoot(), reader, arrowArray); + return longtoBytes(arrowArray.memoryAddress()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java new file mode 100644 index 00000000000..cc4144677e6 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.Iterator; +/** + * Iterate on the memory address of ArrowArray + * next() should be called from C++ scanner to read the memory address of ArrowArray. + */ +public interface NativeRecordBatchIterator extends Iterator, AutoCloseable { + + /** + * Return next {@link org.apache.arrow.c.ArrowArray} memory address. + */ + @Override + byte[] next(); +} From 9abbc15813db940576023fb5129d3cbe8fff4f27 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 22 Sep 2022 16:10:30 +0000 Subject: [PATCH 02/14] add the TestDatasetFileWriter and code refine --- java/dataset/CMakeLists.txt | 2 +- java/dataset/pom.xml | 6 + java/dataset/src/main/cpp/jni_wrapper.cc | 50 ++++-- .../file/CRecordBatchIteratorImpl.java | 106 ++++++++++++ .../arrow/dataset/file/DatasetFileWriter.java | 4 +- .../apache/arrow/dataset/file/JniWrapper.java | 4 +- .../dataset/file/NativeScannerAdaptor.java | 35 ---- .../file/NativeScannerAdaptorImpl.java | 146 ---------------- ...terator.java => CRecordBatchIterator.java} | 8 +- .../dataset/file/TestDatasetFileWriter.java | 162 ++++++++++++++++++ 10 files changed, 315 insertions(+), 208 deletions(-) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java delete mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java delete mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java rename java/dataset/src/main/java/org/apache/arrow/dataset/jni/{NativeRecordBatchIterator.java => CRecordBatchIterator.java} (88%) create mode 100644 java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index cc0c384d4ec..39b2c184d1f 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -26,7 +26,7 @@ add_jar(arrow_java_jni_dataset_jar src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java - src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java + src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java GENERATE_NATIVE_HEADERS arrow_java_jni_dataset_headers) diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml index fa91adfd87b..b1b9cdb6ec8 100644 --- a/java/dataset/pom.xml +++ b/java/dataset/pom.xml @@ -140,6 +140,12 @@ 2.8.1 test + + commons-io + commons-io + 2.4 + test + diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index e76c6676565..625c5f15fde 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -20,7 +20,6 @@ #include "arrow/array.h" #include "arrow/array/concatenate.h" #include "arrow/dataset/api.h" -#include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/ipc/api.h" @@ -259,40 +258,57 @@ class SimpleIteratorFragment : public arrow::dataset::Fragment { /// Java object to do scanning. Which means, only one single task will /// be produced from C++ code. arrow::Result> MakeJavaDatasetScanner( - JavaVM* vm, jobject iter, std::shared_ptr schema) { + JavaVM* vm, jobject java_record_batch_object_itr, std::shared_ptr schema) { arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator( - [vm, iter, schema]() -> arrow::Result> { + [vm, java_record_batch_object_itr, schema]() -> arrow::Result> { JNIEnv* env; if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { return arrow::Status::Invalid("JNIEnv was not attached to current thread"); } - if (!env->CallBooleanMethod(iter, native_record_batch_iterator_hasNext)) { + if (!env->CallBooleanMethod(java_record_batch_object_itr, native_record_batch_iterator_hasNext)) { return nullptr; // stream ended } + std::unique_ptr c_schema = std::make_unique(); + std::unique_ptr c_array = std::make_unique(); - auto bytes = - (jbyteArray)env->CallObjectMethod(iter, native_record_batch_iterator_next); - auto byte_array = env->GetByteArrayElements(bytes, 0); - int64_t memory_address; - std::memcpy(&memory_address, byte_array, sizeof(int64_t)); + env->CallObjectMethod( + java_record_batch_object_itr, + native_record_batch_iterator_next, + reinterpret_cast(c_array.get()), + reinterpret_cast(c_schema.get())); std::shared_ptr rb = JniGetOrThrow( - arrow::dataset::jni::ImportRecordBatch(env, schema, memory_address)); + arrow::dataset::jni::ImportRecordBatch(env, schema, reinterpret_cast(c_array.get()))); + // Release the ArrowArray - auto c_array = reinterpret_cast(memory_address); - ArrowArrayRelease(c_array); + // auto c_array = reinterpret_cast(memory_address); + ArrowArrayRelease(c_array.get()); + ArrowSchemaRelease(c_schema.get()); return rb; }); - ARROW_ASSIGN_OR_RAISE(auto fragment, SimpleIteratorFragment::Make(std::move(itr))) arrow::dataset::ScannerBuilder scanner_builder( std::move(schema), fragment, std::make_shared()); + // Use default memory pool is enough as native allocation is ideally // not being called during scanning Java-based fragments. RETURN_NOT_OK(scanner_builder.Pool(arrow::default_memory_pool())); return scanner_builder.Finish(); } + +std::shared_ptr SchemaFromColumnNames( + const std::shared_ptr& input, const std::vector& column_names) { + std::vector> columns; + for (arrow::FieldRef ref : column_names) { + auto maybe_field = ref.GetOne(*input); + if (maybe_field.ok()) { + columns.push_back(std::move(maybe_field).ValueOrDie()); + } + } + + return schema(std::move(columns))->WithMetadata(input->metadata()); +} } // namespace using arrow::dataset::jni::CreateGlobalClassReference; @@ -348,11 +364,11 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { native_record_batch_iterator_class = CreateGlobalClassReference(env, "Lorg/apache/arrow/" - "dataset/jni/NativeRecordBatchIterator;"); + "dataset/jni/CRecordBatchIterator;"); native_record_batch_iterator_hasNext = JniGetOrThrow( GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); native_record_batch_iterator_next = - JniGetOrThrow(GetMethodID(env, native_record_batch_iterator_class, "next", "()[B")); + JniGetOrThrow(GetMethodID(env, native_record_batch_iterator_class, "next", "(JJ)V")); default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); @@ -674,8 +690,8 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( options.filesystem = filesystem; options.base_dir = output_path; options.basename_template = JStringToCString(env, base_name_template); - options.partitioning = std::make_shared( - arrow::dataset::SchemaFromColumnNames(schema, partition_column_vector)); + options.partitioning = std::make_shared( + SchemaFromColumnNames(schema, partition_column_vector)); options.max_partitions = max_partitions; JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java new file mode 100644 index 00000000000..6b4513bfa41 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.dataset.jni.CRecordBatchIterator; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +public class CRecordBatchIteratorImpl implements CRecordBatchIterator { + + private final Scanner scanner; + private final BufferAllocator allocator; + + private Iterator taskIterator; + + private ScanTask currentTask = null; + private ArrowReader reader = null; + + public CRecordBatchIteratorImpl(Scanner scanner, + BufferAllocator allocator) { + this.scanner = scanner; + this.taskIterator = scanner.scan().iterator(); + this.allocator = allocator; + } + + @Override + public void close() throws Exception { + closeCurrent(); + scanner.close(); + } + + private void closeCurrent() throws Exception { + if (currentTask == null) { + return; + } + currentTask.close(); + reader.close(); + } + + private boolean advance() { + if (!taskIterator.hasNext()) { + return false; + } + try { + closeCurrent(); + } catch (Exception e) { + throw new RuntimeException(e); + } + currentTask = taskIterator.next(); + reader = currentTask.execute(); + return true; + } + + @Override + public boolean hasNext() { + + if (currentTask == null) { + if (!advance()) { + return false; + } + } + try { + if (!reader.loadNextBatch()) { + if (!advance()) { + return false; + } + } + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void next(long cArrayAddress, long cSchemaAddress) { + final ArrowArray cArray = ArrowArray.wrap(cArrayAddress); + final ArrowSchema cSchema = ArrowSchema.wrap(cSchemaAddress); + try { + Data.exportVectorSchemaRoot(allocator, reader.getVectorSchemaRoot(), reader, cArray, cSchema); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index 00da9d1ef37..0a33bf5850e 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -17,7 +17,6 @@ package org.apache.arrow.dataset.file; -import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; @@ -42,8 +41,7 @@ public class DatasetFileWriter { */ public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate) { - final NativeScannerAdaptorImpl adaptor = new NativeScannerAdaptorImpl(scanner, allocator); - final NativeRecordBatchIterator itr = adaptor.scan(); + final CRecordBatchIteratorImpl itr = new CRecordBatchIteratorImpl(scanner, allocator); RuntimeException throwableWrapper = null; try { JniWrapper.get().writeFromScannerToFile(itr, SchemaUtility.serialize(scanner.schema()), diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 04f85ca1760..c3458b731fc 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -17,8 +17,8 @@ package org.apache.arrow.dataset.file; +import org.apache.arrow.dataset.jni.CRecordBatchIterator; import org.apache.arrow.dataset.jni.JniLoader; -import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; /** * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations. @@ -59,7 +59,7 @@ private JniWrapper() { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public native void writeFromScannerToFile(NativeRecordBatchIterator itr, byte[] schema, + public native void writeFromScannerToFile(CRecordBatchIterator itr, byte[] schema, long fileFormat, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java deleted file mode 100644 index da3ec75b2c2..00000000000 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptor.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.dataset.file; - -import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; - -/** - * A short path comparing to {@link org.apache.arrow.dataset.scanner.Scanner} for being called from C++ side - * via JNI, to minimize JNI call overhead. - */ -public interface NativeScannerAdaptor { - - /** - * Scan with the delegated scanner. - * - * @return a iterator outputting JNI-friendly flatbuffers-serialized - * {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch} instances. - */ - NativeRecordBatchIterator scan(); -} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java deleted file mode 100644 index d6c4d05e895..00000000000 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/NativeScannerAdaptorImpl.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.dataset.file; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.arrow.c.ArrowArray; -import org.apache.arrow.c.Data; -import org.apache.arrow.dataset.jni.NativeRecordBatchIterator; -import org.apache.arrow.dataset.scanner.ScanTask; -import org.apache.arrow.dataset.scanner.Scanner; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.ipc.ArrowReader; - -/** - * Default implementation of {@link NativeScannerAdaptor}. - */ -public class NativeScannerAdaptorImpl implements NativeScannerAdaptor, AutoCloseable { - - private final Scanner scanner; - private final BufferAllocator allocator; - - /** - * Constructor. - * - * @param scanner the delegated scanner. - */ - public NativeScannerAdaptorImpl(Scanner scanner, BufferAllocator allocator) { - this.scanner = scanner; - this.allocator = allocator; - } - - @Override - public NativeRecordBatchIterator scan() { - final Iterable tasks = scanner.scan(); - return new IteratorImpl(tasks, allocator); - } - - @Override - public void close() throws Exception { - scanner.close(); - } - - private static class IteratorImpl implements NativeRecordBatchIterator { - - private final Iterator taskIterator; - - private ScanTask currentTask = null; - private ArrowReader reader = null; - - private BufferAllocator allocator = null; - - public IteratorImpl(Iterable tasks, - BufferAllocator allocator) { - this.taskIterator = tasks.iterator(); - this.allocator = allocator; - } - - @Override - public void close() throws Exception { - closeCurrent(); - } - - private void closeCurrent() throws Exception { - if (currentTask == null) { - return; - } - currentTask.close(); - reader.close(); - } - - private boolean advance() { - if (!taskIterator.hasNext()) { - return false; - } - try { - closeCurrent(); - } catch (Exception e) { - throw new RuntimeException(e); - } - currentTask = taskIterator.next(); - reader = currentTask.execute(); - return true; - } - @Override - public boolean hasNext() { - - if (currentTask == null) { - if (!advance()) { - return false; - } - } - try { - if (!reader.loadNextBatch()) { - if (!advance()) { - return false; - } - } - return true; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - byte[] longtoBytes(long data) { - return new byte[]{ - (byte) ((data >> 0) & 0xff), - (byte) ((data >> 8) & 0xff), - (byte) ((data >> 16) & 0xff), - (byte) ((data >> 24) & 0xff), - (byte) ((data >> 32) & 0xff), - (byte) ((data >> 40) & 0xff), - (byte) ((data >> 48) & 0xff), - (byte) ((data >> 56) & 0xff), - }; - } - - @Override - public byte[] next() { - ArrowArray arrowArray = ArrowArray.allocateNew(allocator); - try { - Data.exportVectorSchemaRoot(allocator, reader.getVectorSchemaRoot(), reader, arrowArray); - return longtoBytes(arrowArray.memoryAddress()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java similarity index 88% rename from java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java rename to java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java index cc4144677e6..49ab7ae7c5c 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchIterator.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java @@ -17,16 +17,16 @@ package org.apache.arrow.dataset.jni; -import java.util.Iterator; /** * Iterate on the memory address of ArrowArray * next() should be called from C++ scanner to read the memory address of ArrowArray. */ -public interface NativeRecordBatchIterator extends Iterator, AutoCloseable { +public interface CRecordBatchIterator extends AutoCloseable { /** * Return next {@link org.apache.arrow.c.ArrowArray} memory address. */ - @Override - byte[] next(); + + void next(long cArrowArray, long cArrowSchema); + boolean hasNext(); } diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java new file mode 100644 index 00000000000..c293b641faf --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.ParquetWriteSupport; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.io.FileUtils; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestDatasetFileWriter extends TestDataset { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + public static final String AVRO_SCHEMA_USER = "user.avsc"; + + @Test + public void testParquetWriteSimple() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a", 2, "b", 3, "c", 2, "d"); + String sampleParquet = writeSupport.getOutputURI(); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, sampleParquet); + ScanOptions options = new ScanOptions(new String[0], 100); + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final File writtenFolder = TMP.newFolder(); + final String writtenParquet = writtenFolder.toURI().toString(); + try { + DatasetFileWriter.write(rootAllocator(), scanner, FileFormat.PARQUET, writtenParquet); + assertParquetFileEquals(sampleParquet, Objects.requireNonNull(writtenFolder.listFiles())[0].toURI().toString()); + } finally { + AutoCloseables.close(factory, scanner, dataset); + } + } + + @Test + public void testParquetWriteWithPartitions() throws Exception { + ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a", 2, "b", 3, "c", 2, "d"); + String sampleParquet = writeSupport.getOutputURI(); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, sampleParquet); + ScanOptions options = new ScanOptions(new String[0], 100); + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final File writtenFolder = TMP.newFolder(); + final String writtenParquet = writtenFolder.toURI().toString(); + try { + DatasetFileWriter.write(rootAllocator(), scanner, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}"); + final Set expectedOutputFiles = new HashSet<>( + Arrays.asList("id=1/name=a/dat_0", "id=2/name=b/dat_0", "id=3/name=c/dat_0", "id=2/name=d/dat_0")); + final Set outputFiles = FileUtils.listFiles(writtenFolder, null, true) + .stream() + .map(file -> { + return writtenFolder.toURI().relativize(file.toURI()).toString(); + }) + .collect(Collectors.toSet()); + Assert.assertEquals(expectedOutputFiles, outputFiles); + } finally { + AutoCloseables.close(factory, scanner, dataset); + } + } + + @Test(expected = java.lang.RuntimeException.class) + public void testScanErrorHandling() throws Exception { + DatasetFileWriter.write(rootAllocator(), new Scanner() { + @Override + public Iterable scan() { + return Collections.singletonList(new ScanTask() { + @Override + public ArrowReader execute() { + // this error is supposed to be firstly investigated in native code, then thrown back to Java. + throw new RuntimeException("ERROR"); + } + + @Override + public void close() throws Exception { + // do nothing + } + }); + } + + @Override + public Schema schema() { + return new Schema(Collections.emptyList()); + } + + @Override + public void close() throws Exception { + // do nothing + } + + }, FileFormat.PARQUET, "file:/DUMMY/"); + } + + private void assertParquetFileEquals(String expectedURI, String actualURI) throws Exception { + final FileSystemDatasetFactory expectedFactory = new FileSystemDatasetFactory( + rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, expectedURI); + List expectedBatches = collectResultFromFactory(expectedFactory, + new ScanOptions(new String[0], 100)); + final FileSystemDatasetFactory actualFactory = new FileSystemDatasetFactory( + rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, actualURI); + List actualBatches = collectResultFromFactory(actualFactory, + new ScanOptions(new String[0], 100)); + // fast-fail by comparing metadata + Assert.assertEquals(expectedBatches.toString(), actualBatches.toString()); + // compare buffers + Assert.assertEquals(serialize(expectedBatches), serialize(actualBatches)); + AutoCloseables.close(expectedBatches, actualBatches); + } + + private String serialize(List batches) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (ArrowRecordBatch batch : batches) { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch); + } + return Arrays.toString(out.toByteArray()); + } +} + From e526daaa783b25b66a4985662b3fadd30f3487ba Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Fri, 23 Sep 2022 07:33:45 +0000 Subject: [PATCH 03/14] use the c Data interface to pass the schema and code refine --- java/dataset/src/main/cpp/jni_wrapper.cc | 15 +++++++++------ .../arrow/dataset/file/DatasetFileWriter.java | 9 ++++++++- .../org/apache/arrow/dataset/file/JniWrapper.java | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 625c5f15fde..8d5264d482b 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -24,6 +24,7 @@ #include "arrow/filesystem/localfs.h" #include "arrow/ipc/api.h" #include "arrow/c/helpers.h" +#include "arrow/c/bridge.h" #include "arrow/util/iterator.h" #include "jni_util.h" #include "org_apache_arrow_dataset_file_JniWrapper.h" @@ -254,7 +255,7 @@ class SimpleIteratorFragment : public arrow::dataset::Fragment { /// \brief Create scanner that scans over Java dataset API's components. /// -/// Currently, we use a NativeRecordBatchIterator as the underlying +/// Currently, we use a CRecordBatchIterator as the underlying /// Java object to do scanning. Which means, only one single task will /// be produced from C++ code. arrow::Result> MakeJavaDatasetScanner( @@ -280,8 +281,7 @@ arrow::Result> MakeJavaDatasetScanner( std::shared_ptr rb = JniGetOrThrow( arrow::dataset::jni::ImportRecordBatch(env, schema, reinterpret_cast(c_array.get()))); - // Release the ArrowArray - // auto c_array = reinterpret_cast(memory_address); + // Release the ArrowArray and ArrowSchema ArrowArrayRelease(c_array.get()); ArrowSchemaRelease(c_schema.get()); return rb; @@ -664,11 +664,11 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile * Signature: - * (Lorg/apache/arrow/dataset/jni/NativeRecordBatchIterator;[BJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V + * (Lorg/apache/arrow/dataset/jni/CRecordBatchIterator;JJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V */ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( - JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jlong file_format_id, + JNIEnv* env, jobject, jobject itr, jlong c_schema_address, jlong file_format_id, jstring uri, jobjectArray partition_columns, jint max_partitions, jstring base_name_template) { JNI_METHOD_START @@ -676,7 +676,10 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( if (env->GetJavaVM(&vm) != JNI_OK) { JniThrow("Unable to get JavaVM instance"); } - auto schema = JniGetOrThrow(FromSchemaByteArray(env, schema_bytes)); + + std::shared_ptr schema = JniGetOrThrow( + arrow::ImportSchema(reinterpret_cast(c_schema_address))); + auto scanner = JniGetOrThrow(MakeJavaDatasetScanner(vm, itr, schema)); std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index 0a33bf5850e..336badf85c2 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -17,6 +17,8 @@ package org.apache.arrow.dataset.file; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; @@ -42,14 +44,19 @@ public class DatasetFileWriter { public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate) { final CRecordBatchIteratorImpl itr = new CRecordBatchIteratorImpl(scanner, allocator); + ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); + Data.exportSchema(allocator, scanner.schema(), null, arrowSchema); + RuntimeException throwableWrapper = null; try { - JniWrapper.get().writeFromScannerToFile(itr, SchemaUtility.serialize(scanner.schema()), + JniWrapper.get().writeFromScannerToFile(itr, arrowSchema.memoryAddress(), format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); } catch (Throwable t) { throwableWrapper = new RuntimeException(t); throw throwableWrapper; } finally { + arrowSchema.release(); + arrowSchema.close(); try { AutoCloseables.close(itr); } catch (Exception e) { diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index c3458b731fc..508bdf9aab3 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -59,7 +59,7 @@ private JniWrapper() { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public native void writeFromScannerToFile(CRecordBatchIterator itr, byte[] schema, + public native void writeFromScannerToFile(CRecordBatchIterator itr, long schema_address, long fileFormat, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate); From 07345a250cae8741370fbc75ac23af68377b209a Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Fri, 23 Sep 2022 10:20:41 +0000 Subject: [PATCH 04/14] use the arrow record batch reader --- java/dataset/src/main/cpp/jni_wrapper.cc | 134 +++++------------- .../dataset/file/TestDatasetFileWriter.java | 32 ----- 2 files changed, 37 insertions(+), 129 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 8d5264d482b..96688d6f664 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -19,12 +19,12 @@ #include "arrow/array.h" #include "arrow/array/concatenate.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/ipc/api.h" -#include "arrow/c/helpers.h" -#include "arrow/c/bridge.h" #include "arrow/util/iterator.h" #include "jni_util.h" #include "org_apache_arrow_dataset_file_JniWrapper.h" @@ -183,122 +183,63 @@ class DisposableScannerAdaptor { } }; -/// \brief Simple fragment implementation that is constructed directly -/// from a record batch iterator. -class SimpleIteratorFragment : public arrow::dataset::Fragment { - public: - explicit SimpleIteratorFragment(arrow::RecordBatchIterator itr) - : arrow::dataset::Fragment() { - itr_ = std::move(itr); - } - - static arrow::Result> Make( - arrow::RecordBatchIterator itr) { - return std::make_shared(std::move(itr)); - } - - arrow::Result ScanBatchesAsync( - const std::shared_ptr& options) override { - struct State { - State(std::shared_ptr fragment) - : fragment(std::move(fragment)) {} - - std::shared_ptr Next() { return cur_rb; } - - bool Finished() { - arrow::Result> next = fragment->itr_.Next(); - if (IsIterationEnd(next)) { - cur_rb = nullptr; - - return true; - } else { - cur_rb = next.ValueOrDie(); - return false; - } - } - - std::shared_ptr fragment; - std::shared_ptr cur_rb = nullptr; - }; - - struct Generator { - Generator(std::shared_ptr fragment) - : state(std::make_shared(std::move(fragment))) {} - - arrow::Future> operator()() { - while (!state->Finished()) { - auto next = state->Next(); - if (next) { - return arrow::Future>::MakeFinished( - std::move(next)); - } - } - return arrow::AsyncGeneratorEnd>(); - } - - std::shared_ptr state; - }; - return Generator(arrow::internal::checked_pointer_cast( - shared_from_this())); - } - - std::string type_name() const override { return "simple_iterator"; } - - arrow::Result> ReadPhysicalSchemaImpl() override { - return arrow::Status::NotImplemented("No physical schema is readable"); - } - - private: - arrow::RecordBatchIterator itr_; - bool used_ = false; -}; - /// \brief Create scanner that scans over Java dataset API's components. /// /// Currently, we use a CRecordBatchIterator as the underlying /// Java object to do scanning. Which means, only one single task will /// be produced from C++ code. arrow::Result> MakeJavaDatasetScanner( - JavaVM* vm, jobject java_record_batch_object_itr, std::shared_ptr schema) { + JavaVM* vm, jobject java_record_batch_object_itr, + std::shared_ptr schema) { arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator( - [vm, java_record_batch_object_itr, schema]() -> arrow::Result> { + [vm, java_record_batch_object_itr, + schema]() -> arrow::Result> { JNIEnv* env; - if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + int env_code = vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + if (env_code == JNI_EDETACHED) { + if ( vm->AttachCurrentThreadAsDaemon(reinterpret_cast(&env), NULL) != JNI_OK) { + return arrow::Status::Invalid("Failed to attach thread."); + } + } else if (env_code != JNI_OK) { return arrow::Status::Invalid("JNIEnv was not attached to current thread"); } - if (!env->CallBooleanMethod(java_record_batch_object_itr, native_record_batch_iterator_hasNext)) { + + if (!env->CallBooleanMethod(java_record_batch_object_itr, + native_record_batch_iterator_hasNext)) { return nullptr; // stream ended } std::unique_ptr c_schema = std::make_unique(); std::unique_ptr c_array = std::make_unique(); - env->CallObjectMethod( - java_record_batch_object_itr, - native_record_batch_iterator_next, - reinterpret_cast(c_array.get()), - reinterpret_cast(c_schema.get())); + env->CallObjectMethod(java_record_batch_object_itr, + native_record_batch_iterator_next, + reinterpret_cast(c_array.get()), + reinterpret_cast(c_schema.get())); + + std::shared_ptr rb = + JniGetOrThrow(arrow::dataset::jni::ImportRecordBatch( + env, schema, reinterpret_cast(c_array.get()))); - std::shared_ptr rb = JniGetOrThrow( - arrow::dataset::jni::ImportRecordBatch(env, schema, reinterpret_cast(c_array.get()))); - // Release the ArrowArray and ArrowSchema ArrowArrayRelease(c_array.get()); ArrowSchemaRelease(c_schema.get()); return rb; }); - ARROW_ASSIGN_OR_RAISE(auto fragment, SimpleIteratorFragment::Make(std::move(itr))) - - arrow::dataset::ScannerBuilder scanner_builder( - std::move(schema), fragment, std::make_shared()); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr reader, + arrow::RecordBatchReader::MakeFromIterator(std::move(itr), std::move(schema))) + std::shared_ptr scanner_builder = + arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader); // Use default memory pool is enough as native allocation is ideally // not being called during scanning Java-based fragments. - RETURN_NOT_OK(scanner_builder.Pool(arrow::default_memory_pool())); - return scanner_builder.Finish(); + RETURN_NOT_OK(scanner_builder->Pool(arrow::default_memory_pool())); + return scanner_builder->Finish(); } std::shared_ptr SchemaFromColumnNames( - const std::shared_ptr& input, const std::vector& column_names) { + const std::shared_ptr& input, + const std::vector& column_names) { std::vector> columns; for (arrow::FieldRef ref : column_names) { auto maybe_field = ref.GetOne(*input); @@ -367,11 +308,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { "dataset/jni/CRecordBatchIterator;"); native_record_batch_iterator_hasNext = JniGetOrThrow( GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); - native_record_batch_iterator_next = - JniGetOrThrow(GetMethodID(env, native_record_batch_iterator_class, "next", "(JJ)V")); + native_record_batch_iterator_next = JniGetOrThrow( + GetMethodID(env, native_record_batch_iterator_class, "next", "(JJ)V")); default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); - return JNI_VERSION; JNI_METHOD_END(JNI_ERR) } @@ -678,7 +618,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( } std::shared_ptr schema = JniGetOrThrow( - arrow::ImportSchema(reinterpret_cast(c_schema_address))); + arrow::ImportSchema(reinterpret_cast(c_schema_address))); auto scanner = JniGetOrThrow(MakeJavaDatasetScanner(vm, itr, schema)); std::shared_ptr file_format = @@ -693,8 +633,8 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( options.filesystem = filesystem; options.base_dir = output_path; options.basename_template = JStringToCString(env, base_name_template); - options.partitioning = std::make_shared( - SchemaFromColumnNames(schema, partition_column_vector)); + options.partitioning = std::make_shared( + SchemaFromColumnNames(schema, partition_column_vector)); options.max_partitions = max_partitions; JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java index c293b641faf..9af96ead3d4 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -103,38 +103,6 @@ public void testParquetWriteWithPartitions() throws Exception { } } - @Test(expected = java.lang.RuntimeException.class) - public void testScanErrorHandling() throws Exception { - DatasetFileWriter.write(rootAllocator(), new Scanner() { - @Override - public Iterable scan() { - return Collections.singletonList(new ScanTask() { - @Override - public ArrowReader execute() { - // this error is supposed to be firstly investigated in native code, then thrown back to Java. - throw new RuntimeException("ERROR"); - } - - @Override - public void close() throws Exception { - // do nothing - } - }); - } - - @Override - public Schema schema() { - return new Schema(Collections.emptyList()); - } - - @Override - public void close() throws Exception { - // do nothing - } - - }, FileFormat.PARQUET, "file:/DUMMY/"); - } - private void assertParquetFileEquals(String expectedURI, String actualURI) throws Exception { final FileSystemDatasetFactory expectedFactory = new FileSystemDatasetFactory( rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, expectedURI); From f824d0ec28d30efacb058d9df3f1711adfe45c29 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Fri, 23 Sep 2022 13:33:16 +0000 Subject: [PATCH 05/14] use ArrowStream --- java/dataset/src/main/cpp/jni_wrapper.cc | 36 +++++----- .../file/CRecordBatchIteratorImpl.java | 65 +++++-------------- .../dataset/jni/CRecordBatchIterator.java | 6 +- 3 files changed, 37 insertions(+), 70 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 96688d6f664..8435b28cbff 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -191,9 +191,9 @@ class DisposableScannerAdaptor { arrow::Result> MakeJavaDatasetScanner( JavaVM* vm, jobject java_record_batch_object_itr, std::shared_ptr schema) { - arrow::RecordBatchIterator itr = arrow::MakeFunctionIterator( + arrow::Iterator itr = arrow::MakeFunctionIterator( [vm, java_record_batch_object_itr, - schema]() -> arrow::Result> { + schema]() -> arrow::Result { JNIEnv* env; int env_code = vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); if (env_code == JNI_EDETACHED) { @@ -206,29 +206,29 @@ arrow::Result> MakeJavaDatasetScanner( if (!env->CallBooleanMethod(java_record_batch_object_itr, native_record_batch_iterator_hasNext)) { - return nullptr; // stream ended + return arrow::IterationTraits::End(); } - std::unique_ptr c_schema = std::make_unique(); - std::unique_ptr c_array = std::make_unique(); + std::unique_ptr c_stream = std::make_unique(); + env->CallObjectMethod(java_record_batch_object_itr, native_record_batch_iterator_next, - reinterpret_cast(c_array.get()), - reinterpret_cast(c_schema.get())); - - std::shared_ptr rb = - JniGetOrThrow(arrow::dataset::jni::ImportRecordBatch( - env, schema, reinterpret_cast(c_array.get()))); - - // Release the ArrowArray and ArrowSchema - ArrowArrayRelease(c_array.get()); - ArrowSchemaRelease(c_schema.get()); - return rb; + reinterpret_cast(c_stream.get()) + ); + + std::shared_ptr rb_reader = + JniGetOrThrow(arrow::ImportRecordBatchReader( + c_stream.get())); + + // Release the ArrowArrayStream + ArrowArrayStreamRelease(c_stream.get()); + return arrow::MakeFunctionIterator( + [rb_reader] { return rb_reader->Next(); }); }); ARROW_ASSIGN_OR_RAISE( std::shared_ptr reader, - arrow::RecordBatchReader::MakeFromIterator(std::move(itr), std::move(schema))) + arrow::RecordBatchReader::MakeFromIterator(arrow::MakeFlattenIterator(std::move(itr)), std::move(schema))) std::shared_ptr scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader); // Use default memory pool is enough as native allocation is ideally @@ -309,7 +309,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { native_record_batch_iterator_hasNext = JniGetOrThrow( GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); native_record_batch_iterator_next = JniGetOrThrow( - GetMethodID(env, native_record_batch_iterator_class, "next", "(JJ)V")); + GetMethodID(env, native_record_batch_iterator_class, "next", "(J)V")); default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); return JNI_VERSION; diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java index 6b4513bfa41..2391c2f7e3a 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java @@ -21,6 +21,7 @@ import java.util.Iterator; import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; import org.apache.arrow.dataset.jni.CRecordBatchIterator; @@ -34,73 +35,37 @@ public class CRecordBatchIteratorImpl implements CRecordBatchIterator { private final Scanner scanner; private final BufferAllocator allocator; - private Iterator taskIterator; - - private ScanTask currentTask = null; - private ArrowReader reader = null; + private Iterator taskIterators; + private ArrowReader currentReader = null; public CRecordBatchIteratorImpl(Scanner scanner, BufferAllocator allocator) { this.scanner = scanner; - this.taskIterator = scanner.scan().iterator(); this.allocator = allocator; + this.taskIterators = scanner.scan().iterator(); } @Override public void close() throws Exception { - closeCurrent(); scanner.close(); } - private void closeCurrent() throws Exception { - if (currentTask == null) { - return; - } - currentTask.close(); - reader.close(); - } - - private boolean advance() { - if (!taskIterator.hasNext()) { - return false; - } - try { - closeCurrent(); - } catch (Exception e) { - throw new RuntimeException(e); - } - currentTask = taskIterator.next(); - reader = currentTask.execute(); - return true; - } - @Override - public boolean hasNext() { - - if (currentTask == null) { - if (!advance()) { - return false; - } - } - try { - if (!reader.loadNextBatch()) { - if (!advance()) { - return false; - } - } + public boolean hasNext() throws IOException { + if (taskIterators.hasNext()) { return true; - } catch (IOException e) { - throw new RuntimeException(e); + } else { + return false; } } - public void next(long cArrayAddress, long cSchemaAddress) { - final ArrowArray cArray = ArrowArray.wrap(cArrayAddress); - final ArrowSchema cSchema = ArrowSchema.wrap(cSchemaAddress); - try { - Data.exportVectorSchemaRoot(allocator, reader.getVectorSchemaRoot(), reader, cArray, cSchema); - } catch (IOException e) { - throw new RuntimeException(e); + public void next(long cStreamPointer) throws IOException { + currentReader = taskIterators.next().execute(); + try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) { + + Data.exportArrayStream(allocator, currentReader, stream); + } finally { + currentReader.close(); } } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java index 49ab7ae7c5c..7713a79f63d 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java @@ -17,6 +17,8 @@ package org.apache.arrow.dataset.jni; +import java.io.IOException; + /** * Iterate on the memory address of ArrowArray * next() should be called from C++ scanner to read the memory address of ArrowArray. @@ -27,6 +29,6 @@ public interface CRecordBatchIterator extends AutoCloseable { * Return next {@link org.apache.arrow.c.ArrowArray} memory address. */ - void next(long cArrowArray, long cArrowSchema); - boolean hasNext(); + void next(long cStreamPointer) throws IOException; + boolean hasNext() throws IOException; } From 759c9c26fec47a1521e002a8519bf879d4e33a37 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Wed, 12 Oct 2022 13:21:50 +0000 Subject: [PATCH 06/14] refine CRecordBatchIterator --- java/dataset/CMakeLists.txt | 2 +- java/dataset/src/main/cpp/jni_wrapper.cc | 6 +++--- ...ratorImpl.java => CArrowArrayStreamIteratorImpl.java} | 6 +++--- .../org/apache/arrow/dataset/file/DatasetFileWriter.java | 2 +- .../java/org/apache/arrow/dataset/file/JniWrapper.java | 4 ++-- ...BatchIterator.java => CArrowArrayStreamIterator.java} | 9 ++------- 6 files changed, 12 insertions(+), 17 deletions(-) rename java/dataset/src/main/java/org/apache/arrow/dataset/file/{CRecordBatchIteratorImpl.java => CArrowArrayStreamIteratorImpl.java} (91%) rename java/dataset/src/main/java/org/apache/arrow/dataset/jni/{CRecordBatchIterator.java => CArrowArrayStreamIterator.java} (77%) diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index 39b2c184d1f..aa954c7d8a1 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -26,7 +26,7 @@ add_jar(arrow_java_jni_dataset_jar src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java - src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java + src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java GENERATE_NATIVE_HEADERS arrow_java_jni_dataset_headers) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 8435b28cbff..0346b95ce2a 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -185,7 +185,7 @@ class DisposableScannerAdaptor { /// \brief Create scanner that scans over Java dataset API's components. /// -/// Currently, we use a CRecordBatchIterator as the underlying +/// Currently, we use a CArrowArrayStreamIterator as the underlying /// Java object to do scanning. Which means, only one single task will /// be produced from C++ code. arrow::Result> MakeJavaDatasetScanner( @@ -305,7 +305,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { native_record_batch_iterator_class = CreateGlobalClassReference(env, "Lorg/apache/arrow/" - "dataset/jni/CRecordBatchIterator;"); + "dataset/jni/CArrowArrayStreamIterator;"); native_record_batch_iterator_hasNext = JniGetOrThrow( GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); native_record_batch_iterator_next = JniGetOrThrow( @@ -604,7 +604,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile * Signature: - * (Lorg/apache/arrow/dataset/jni/CRecordBatchIterator;JJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V + * (Lorg/apache/arrow/dataset/jni/CArrowArrayStreamIterator;JJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V */ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java similarity index 91% rename from java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java rename to java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java index 2391c2f7e3a..14976686f56 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CRecordBatchIteratorImpl.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java @@ -24,13 +24,13 @@ import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; -import org.apache.arrow.dataset.jni.CRecordBatchIterator; +import org.apache.arrow.dataset.jni.CArrowArrayStreamIterator; import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.ArrowReader; -public class CRecordBatchIteratorImpl implements CRecordBatchIterator { +public class CArrowArrayStreamIteratorImpl implements CArrowArrayStreamIterator { private final Scanner scanner; private final BufferAllocator allocator; @@ -38,7 +38,7 @@ public class CRecordBatchIteratorImpl implements CRecordBatchIterator { private Iterator taskIterators; private ArrowReader currentReader = null; - public CRecordBatchIteratorImpl(Scanner scanner, + public CArrowArrayStreamIteratorImpl(Scanner scanner, BufferAllocator allocator) { this.scanner = scanner; this.allocator = allocator; diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index 336badf85c2..dc024a48e2b 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -43,7 +43,7 @@ public class DatasetFileWriter { */ public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate) { - final CRecordBatchIteratorImpl itr = new CRecordBatchIteratorImpl(scanner, allocator); + final CArrowArrayStreamIteratorImpl itr = new CArrowArrayStreamIteratorImpl(scanner, allocator); ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); Data.exportSchema(allocator, scanner.schema(), null, arrowSchema); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 508bdf9aab3..7112022e0e1 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -17,7 +17,7 @@ package org.apache.arrow.dataset.file; -import org.apache.arrow.dataset.jni.CRecordBatchIterator; +import org.apache.arrow.dataset.jni.CArrowArrayStreamIterator; import org.apache.arrow.dataset.jni.JniLoader; /** @@ -59,7 +59,7 @@ private JniWrapper() { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public native void writeFromScannerToFile(CRecordBatchIterator itr, long schema_address, + public native void writeFromScannerToFile(CArrowArrayStreamIterator itr, long schema_address, long fileFormat, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java similarity index 77% rename from java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java rename to java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java index 7713a79f63d..d791ef90437 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CRecordBatchIterator.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java @@ -20,14 +20,9 @@ import java.io.IOException; /** - * Iterate on the memory address of ArrowArray - * next() should be called from C++ scanner to read the memory address of ArrowArray. + * Iterate on construct the ArrowArrayStream */ -public interface CRecordBatchIterator extends AutoCloseable { - - /** - * Return next {@link org.apache.arrow.c.ArrowArray} memory address. - */ +public interface CArrowArrayStreamIterator extends AutoCloseable { void next(long cStreamPointer) throws IOException; boolean hasNext() throws IOException; From 88893e05cd866c3621f022a674dbc8ff60a63c43 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 13 Oct 2022 14:10:00 +0000 Subject: [PATCH 07/14] remove the CArrowArrayStreamIterator --- java/dataset/CMakeLists.txt | 1 - java/dataset/src/main/cpp/jni_wrapper.cc | 86 +++---------------- .../file/CArrowArrayStreamIteratorImpl.java | 71 --------------- .../arrow/dataset/file/DatasetFileWriter.java | 26 ++++-- .../apache/arrow/dataset/file/JniWrapper.java | 6 +- .../jni/CArrowArrayStreamIterator.java | 29 ------- 6 files changed, 35 insertions(+), 184 deletions(-) delete mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java delete mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt index aa954c7d8a1..315163a537c 100644 --- a/java/dataset/CMakeLists.txt +++ b/java/dataset/CMakeLists.txt @@ -26,7 +26,6 @@ add_jar(arrow_java_jni_dataset_jar src/main/java/org/apache/arrow/dataset/file/JniWrapper.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java - src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java GENERATE_NATIVE_HEADERS arrow_java_jni_dataset_headers) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 0346b95ce2a..48edb3a10b1 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -40,12 +40,9 @@ jclass illegal_argument_exception_class; jclass runtime_exception_class; jclass java_reservation_listener_class; -jclass native_record_batch_iterator_class; jmethodID reserve_memory_method; jmethodID unreserve_memory_method; -jmethodID native_record_batch_iterator_hasNext; -jmethodID native_record_batch_iterator_next; jlong default_memory_pool_id = -1L; @@ -183,60 +180,6 @@ class DisposableScannerAdaptor { } }; -/// \brief Create scanner that scans over Java dataset API's components. -/// -/// Currently, we use a CArrowArrayStreamIterator as the underlying -/// Java object to do scanning. Which means, only one single task will -/// be produced from C++ code. -arrow::Result> MakeJavaDatasetScanner( - JavaVM* vm, jobject java_record_batch_object_itr, - std::shared_ptr schema) { - arrow::Iterator itr = arrow::MakeFunctionIterator( - [vm, java_record_batch_object_itr, - schema]() -> arrow::Result { - JNIEnv* env; - int env_code = vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); - if (env_code == JNI_EDETACHED) { - if ( vm->AttachCurrentThreadAsDaemon(reinterpret_cast(&env), NULL) != JNI_OK) { - return arrow::Status::Invalid("Failed to attach thread."); - } - } else if (env_code != JNI_OK) { - return arrow::Status::Invalid("JNIEnv was not attached to current thread"); - } - - if (!env->CallBooleanMethod(java_record_batch_object_itr, - native_record_batch_iterator_hasNext)) { - return arrow::IterationTraits::End(); - } - - std::unique_ptr c_stream = std::make_unique(); - - env->CallObjectMethod(java_record_batch_object_itr, - native_record_batch_iterator_next, - reinterpret_cast(c_stream.get()) - ); - - std::shared_ptr rb_reader = - JniGetOrThrow(arrow::ImportRecordBatchReader( - c_stream.get())); - - // Release the ArrowArrayStream - ArrowArrayStreamRelease(c_stream.get()); - return arrow::MakeFunctionIterator( - [rb_reader] { return rb_reader->Next(); }); - }); - - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr reader, - arrow::RecordBatchReader::MakeFromIterator(arrow::MakeFlattenIterator(std::move(itr)), std::move(schema))) - std::shared_ptr scanner_builder = - arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader); - // Use default memory pool is enough as native allocation is ideally - // not being called during scanning Java-based fragments. - RETURN_NOT_OK(scanner_builder->Pool(arrow::default_memory_pool())); - return scanner_builder->Finish(); -} - std::shared_ptr SchemaFromColumnNames( const std::shared_ptr& input, const std::vector& column_names) { @@ -302,15 +245,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { unreserve_memory_method = JniGetOrThrow( GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V")); - native_record_batch_iterator_class = - CreateGlobalClassReference(env, - "Lorg/apache/arrow/" - "dataset/jni/CArrowArrayStreamIterator;"); - native_record_batch_iterator_hasNext = JniGetOrThrow( - GetMethodID(env, native_record_batch_iterator_class, "hasNext", "()Z")); - native_record_batch_iterator_next = JniGetOrThrow( - GetMethodID(env, native_record_batch_iterator_class, "next", "(J)V")); - default_memory_pool_id = reinterpret_cast(arrow::default_memory_pool()); return JNI_VERSION; JNI_METHOD_END(JNI_ERR) @@ -323,7 +257,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(illegal_argument_exception_class); env->DeleteGlobalRef(runtime_exception_class); env->DeleteGlobalRef(java_reservation_listener_class); - env->DeleteGlobalRef(native_record_batch_iterator_class); default_memory_pool_id = -1L; } @@ -604,23 +537,32 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile * Signature: - * (Lorg/apache/arrow/dataset/jni/CArrowArrayStreamIterator;JJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V + * (JJJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V */ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( - JNIEnv* env, jobject, jobject itr, jlong c_schema_address, jlong file_format_id, - jstring uri, jobjectArray partition_columns, jint max_partitions, - jstring base_name_template) { + JNIEnv* env, jobject, jlong c_arrow_array_stream_address, jlong c_schema_address, + jlong file_format_id, jstring uri, jobjectArray partition_columns, + jint max_partitions, jstring base_name_template) { JNI_METHOD_START JavaVM* vm; if (env->GetJavaVM(&vm) != JNI_OK) { JniThrow("Unable to get JavaVM instance"); } + auto* arrow_stream = reinterpret_cast(c_arrow_array_stream_address); + std::shared_ptr reader = + JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream)); + // Release the ArrowArrayStream + ArrowArrayStreamRelease(arrow_stream); + std::shared_ptr scanner_builder = + arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader); + scanner_builder->Pool(arrow::default_memory_pool()); + auto scanner = scanner_builder->Finish().ValueOrDie(); + std::shared_ptr schema = JniGetOrThrow( arrow::ImportSchema(reinterpret_cast(c_schema_address))); - auto scanner = JniGetOrThrow(MakeJavaDatasetScanner(vm, itr, schema)); std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); arrow::dataset::FileSystemDatasetWriteOptions options; diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java deleted file mode 100644 index 14976686f56..00000000000 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/CArrowArrayStreamIteratorImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.dataset.file; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.arrow.c.ArrowArray; -import org.apache.arrow.c.ArrowArrayStream; -import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.Data; -import org.apache.arrow.dataset.jni.CArrowArrayStreamIterator; -import org.apache.arrow.dataset.scanner.ScanTask; -import org.apache.arrow.dataset.scanner.Scanner; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.ipc.ArrowReader; - -public class CArrowArrayStreamIteratorImpl implements CArrowArrayStreamIterator { - - private final Scanner scanner; - private final BufferAllocator allocator; - - private Iterator taskIterators; - private ArrowReader currentReader = null; - - public CArrowArrayStreamIteratorImpl(Scanner scanner, - BufferAllocator allocator) { - this.scanner = scanner; - this.allocator = allocator; - this.taskIterators = scanner.scan().iterator(); - } - - @Override - public void close() throws Exception { - scanner.close(); - } - - @Override - public boolean hasNext() throws IOException { - if (taskIterators.hasNext()) { - return true; - } else { - return false; - } - } - - public void next(long cStreamPointer) throws IOException { - currentReader = taskIterators.next().execute(); - try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) { - - Data.exportArrayStream(allocator, currentReader, stream); - } finally { - currentReader.close(); - } - } -} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index dc024a48e2b..9913e9c6305 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -17,13 +17,18 @@ package org.apache.arrow.dataset.file; +import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; +import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.util.SchemaUtility; +import java.util.Iterator; + /** * JNI-based utility to write datasets into files. It internally depends on C++ static method * FileSystemDataset::Write. @@ -43,22 +48,29 @@ public class DatasetFileWriter { */ public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate) { - final CArrowArrayStreamIteratorImpl itr = new CArrowArrayStreamIteratorImpl(scanner, allocator); ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); Data.exportSchema(allocator, scanner.schema(), null, arrowSchema); - RuntimeException throwableWrapper = null; try { - JniWrapper.get().writeFromScannerToFile(itr, arrowSchema.memoryAddress(), - format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); + Iterator taskIterators = scanner.scan().iterator(); + while (taskIterators.hasNext()) { + ArrowReader currentReader = taskIterators.next().execute(); + ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator); + Data.exportArrayStream(allocator, currentReader, stream); + JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(), arrowSchema.memoryAddress(), + format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); + currentReader.close(); + stream.close(); + } + } catch (Throwable t) { throwableWrapper = new RuntimeException(t); throw throwableWrapper; } finally { - arrowSchema.release(); - arrowSchema.close(); try { - AutoCloseables.close(itr); + scanner.close(); + arrowSchema.release(); + arrowSchema.close(); } catch (Exception e) { if (throwableWrapper != null) { throwableWrapper.addSuppressed(e); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 7112022e0e1..7e5d946ecb1 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -17,7 +17,6 @@ package org.apache.arrow.dataset.file; -import org.apache.arrow.dataset.jni.CArrowArrayStreamIterator; import org.apache.arrow.dataset.jni.JniLoader; /** @@ -50,8 +49,7 @@ private JniWrapper() { * Write all record batches in a {@link NativeRecordBatchIterator} into files. This internally * depends on C++ write API: FileSystemDataset::Write. * - * @param itr iterator to be used for writing - * @param schema serialized schema of output files + * @param schema_address the schema address * @param fileFormat target file format (ID) * @param uri target file uri * @param partitionColumns columns used to partition output files @@ -59,7 +57,7 @@ private JniWrapper() { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public native void writeFromScannerToFile(CArrowArrayStreamIterator itr, long schema_address, + public native void writeFromScannerToFile(long stream_address, long schema_address, long fileFormat, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java deleted file mode 100644 index d791ef90437..00000000000 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/CArrowArrayStreamIterator.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.arrow.dataset.jni; - -import java.io.IOException; - -/** - * Iterate on construct the ArrowArrayStream - */ -public interface CArrowArrayStreamIterator extends AutoCloseable { - - void next(long cStreamPointer) throws IOException; - boolean hasNext() throws IOException; -} From 092baa2ca42572083bd4a9cf0fb11bc1f7ebe094 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Wed, 19 Oct 2022 10:41:32 +0000 Subject: [PATCH 08/14] Create new ArrowScannerReader Over Scanner --- java/dataset/src/main/cpp/jni_wrapper.cc | 7 +- .../arrow/dataset/file/DatasetFileWriter.java | 39 ++---- .../apache/arrow/dataset/file/JniWrapper.java | 9 +- .../dataset/scanner/ArrowScannerReader.java | 127 ++++++++++++++++++ .../dataset/file/TestDatasetFileWriter.java | 11 +- 5 files changed, 156 insertions(+), 37 deletions(-) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 48edb3a10b1..4102fac3572 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -541,7 +541,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( */ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( - JNIEnv* env, jobject, jlong c_arrow_array_stream_address, jlong c_schema_address, + JNIEnv* env, jobject, jlong c_arrow_array_stream_address, jlong file_format_id, jstring uri, jobjectArray partition_columns, jint max_partitions, jstring base_name_template) { JNI_METHOD_START @@ -551,6 +551,9 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( } auto* arrow_stream = reinterpret_cast(c_arrow_array_stream_address); + struct ArrowSchema c_schema; + arrow_stream->get_schema(arrow_stream, &c_schema); + std::shared_ptr reader = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream)); // Release the ArrowArrayStream @@ -561,7 +564,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( auto scanner = scanner_builder->Finish().ValueOrDie(); std::shared_ptr schema = JniGetOrThrow( - arrow::ImportSchema(reinterpret_cast(c_schema_address))); + arrow::ImportSchema(&c_schema)); std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index 9913e9c6305..91a048a5c5e 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -20,6 +20,7 @@ import org.apache.arrow.c.ArrowArrayStream; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; +import org.apache.arrow.dataset.scanner.ArrowScannerReader; import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; @@ -38,7 +39,7 @@ public class DatasetFileWriter { /** * Scan over an input {@link Scanner} then write all record batches to file. * - * @param scanner the source scanner for writing + * @param reader the datasource for writing * @param format target file format * @param uri target file uri * @param maxPartitions maximum partitions to be included in written files @@ -46,47 +47,29 @@ public class DatasetFileWriter { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri, + public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate) { - ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator); - Data.exportSchema(allocator, scanner.schema(), null, arrowSchema); RuntimeException throwableWrapper = null; try { - Iterator taskIterators = scanner.scan().iterator(); - while (taskIterators.hasNext()) { - ArrowReader currentReader = taskIterators.next().execute(); - ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator); - Data.exportArrayStream(allocator, currentReader, stream); - JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(), arrowSchema.memoryAddress(), - format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); - currentReader.close(); - stream.close(); - } - + ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator); + Data.exportArrayStream(allocator, reader, stream); + JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(), + format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); + stream.close(); } catch (Throwable t) { throwableWrapper = new RuntimeException(t); throw throwableWrapper; - } finally { - try { - scanner.close(); - arrowSchema.release(); - arrowSchema.close(); - } catch (Exception e) { - if (throwableWrapper != null) { - throwableWrapper.addSuppressed(e); - } - } } } /** * Scan over an input {@link Scanner} then write all record batches to file, with default partitioning settings. * - * @param scanner the source scanner for writing + * @param reader the datasource for writing * @param format target file format * @param uri target file uri */ - public static void write(BufferAllocator allocator, Scanner scanner, FileFormat format, String uri) { - write(allocator, scanner, format, uri, new String[0], 1024, "dat_{i}"); + public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) { + write(allocator, reader, format, uri, new String[0], 1024, "dat_{i}"); } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 7e5d946ecb1..0939ee2fa4a 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -49,7 +49,7 @@ private JniWrapper() { * Write all record batches in a {@link NativeRecordBatchIterator} into files. This internally * depends on C++ write API: FileSystemDataset::Write. * - * @param schema_address the schema address + * @param stream_address the ArrowArrayStream address * @param fileFormat target file format (ID) * @param uri target file uri * @param partitionColumns columns used to partition output files @@ -57,8 +57,11 @@ private JniWrapper() { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public native void writeFromScannerToFile(long stream_address, long schema_address, - long fileFormat, String uri, String[] partitionColumns, int maxPartitions, + public native void writeFromScannerToFile(long stream_address, + long fileFormat, + String uri, + String[] partitionColumns, + int maxPartitions, String baseNameTemplate); } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java new file mode 100644 index 00000000000..3adccbfd89a --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.CDataDictionaryProvider; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.util.Iterator; + +public class ArrowScannerReader extends ArrowReader { + private final Scanner scanner; + + private Iterator taskIterator; + + private ScanTask currentTask = null; + private ArrowReader currentReader = null; + + public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) { + super(allocator); + this.scanner = scanner; + this.taskIterator = scanner.scan().iterator(); + if (taskIterator.hasNext()) { + currentTask = taskIterator.next(); + currentReader = currentTask.execute(); + } else { + currentReader = null; + } + } + + @Override + protected void loadRecordBatch(ArrowRecordBatch batch) { + throw new UnsupportedOperationException(); + } + + @Override + protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean loadNextBatch() throws IOException { + if (currentReader == null) return false; + Boolean result = currentReader.loadNextBatch(); + + if (!result) { + try { + currentTask.close(); + currentReader.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + while (!result) { + if (!taskIterator.hasNext()) { + return false; + } else { + currentTask = taskIterator.next(); + currentReader = currentTask.execute(); + result = currentReader.loadNextBatch(); + } + } + } + + // Load the currentReader#VectorSchemaRoot to ArrowArray + VectorSchemaRoot vsr = currentReader.getVectorSchemaRoot(); + ArrowArray array = ArrowArray.allocateNew(allocator); + Data.exportVectorSchemaRoot(allocator, vsr, currentReader, array); + + // Load the ArrowArray into ArrowScannerReader#VectorSchemaRoot + CDataDictionaryProvider provider = new CDataDictionaryProvider(); + Data.importIntoVectorSchemaRoot(allocator, + array, this.getVectorSchemaRoot(), provider); + array.close(); + provider.close(); + return true; + } + + @Override + public long bytesRead() { + return 0L; + } + + @Override + public void close() throws IOException { + try { + super.close(true); + currentTask.close(); + currentReader.close(); + scanner.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected void closeReadSource() throws IOException { + // no-op + } + + @Override + protected Schema readSchema() throws IOException { + return scanner.schema(); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java index 9af96ead3d4..443e342e183 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -32,6 +32,7 @@ import org.apache.arrow.dataset.ParquetWriteSupport; import org.apache.arrow.dataset.TestDataset; import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ArrowScannerReader; import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; @@ -65,13 +66,14 @@ public void testParquetWriteSimple() throws Exception { ScanOptions options = new ScanOptions(new String[0], 100); final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); + final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); final File writtenFolder = TMP.newFolder(); final String writtenParquet = writtenFolder.toURI().toString(); try { - DatasetFileWriter.write(rootAllocator(), scanner, FileFormat.PARQUET, writtenParquet); + DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet); assertParquetFileEquals(sampleParquet, Objects.requireNonNull(writtenFolder.listFiles())[0].toURI().toString()); } finally { - AutoCloseables.close(factory, scanner, dataset); + AutoCloseables.close(factory, scanner, reader, dataset); } } @@ -85,10 +87,11 @@ public void testParquetWriteWithPartitions() throws Exception { ScanOptions options = new ScanOptions(new String[0], 100); final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); + final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); final File writtenFolder = TMP.newFolder(); final String writtenParquet = writtenFolder.toURI().toString(); try { - DatasetFileWriter.write(rootAllocator(), scanner, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}"); + DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}"); final Set expectedOutputFiles = new HashSet<>( Arrays.asList("id=1/name=a/dat_0", "id=2/name=b/dat_0", "id=3/name=c/dat_0", "id=2/name=d/dat_0")); final Set outputFiles = FileUtils.listFiles(writtenFolder, null, true) @@ -99,7 +102,7 @@ public void testParquetWriteWithPartitions() throws Exception { .collect(Collectors.toSet()); Assert.assertEquals(expectedOutputFiles, outputFiles); } finally { - AutoCloseables.close(factory, scanner, dataset); + AutoCloseables.close(factory, scanner, reader, dataset); } } From 434ad8a989679ef0223b22887ffe6db3c1919639 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 20 Oct 2022 17:29:49 +0000 Subject: [PATCH 09/14] resolve the comments --- java/dataset/src/main/cpp/jni_wrapper.cc | 16 ++--- .../arrow/dataset/file/DatasetFileWriter.java | 14 ++-- .../dataset/scanner/ArrowScannerReader.java | 41 +++--------- .../dataset/file/TestDatasetFileWriter.java | 65 ++++++++++++------- 4 files changed, 60 insertions(+), 76 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 4102fac3572..ad8c38e0f2b 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -31,8 +31,6 @@ #include "org_apache_arrow_dataset_jni_JniWrapper.h" #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h" -#include - namespace { jclass illegal_access_exception_class; @@ -180,7 +178,7 @@ class DisposableScannerAdaptor { } }; -std::shared_ptr SchemaFromColumnNames( +arrow::Result> SchemaFromColumnNames( const std::shared_ptr& input, const std::vector& column_names) { std::vector> columns; @@ -188,6 +186,8 @@ std::shared_ptr SchemaFromColumnNames( auto maybe_field = ref.GetOne(*input); if (maybe_field.ok()) { columns.push_back(std::move(maybe_field).ValueOrDie()); + } else { + return arrow::Status::Invalid("The provided column name is not in arrow schema"); } } @@ -551,20 +551,14 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( } auto* arrow_stream = reinterpret_cast(c_arrow_array_stream_address); - struct ArrowSchema c_schema; - arrow_stream->get_schema(arrow_stream, &c_schema); - std::shared_ptr reader = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream)); - // Release the ArrowArrayStream - ArrowArrayStreamRelease(arrow_stream); std::shared_ptr scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader); scanner_builder->Pool(arrow::default_memory_pool()); auto scanner = scanner_builder->Finish().ValueOrDie(); - std::shared_ptr schema = JniGetOrThrow( - arrow::ImportSchema(&c_schema)); + std::shared_ptr schema = reader->schema(); std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); @@ -579,7 +573,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( options.base_dir = output_path; options.basename_template = JStringToCString(env, base_name_template); options.partitioning = std::make_shared( - SchemaFromColumnNames(schema, partition_column_vector)); + SchemaFromColumnNames(schema, partition_column_vector).ValueOrDie()); options.max_partitions = max_partitions; JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index 91a048a5c5e..93352933753 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -37,7 +37,7 @@ public class DatasetFileWriter { /** - * Scan over an input {@link Scanner} then write all record batches to file. + * Write a ArrowReader accepting Scanner into files. * * @param reader the datasource for writing * @param format target file format @@ -49,27 +49,21 @@ public class DatasetFileWriter { */ public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri, String[] partitionColumns, int maxPartitions, String baseNameTemplate) { - RuntimeException throwableWrapper = null; - try { - ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator); + try (final ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { Data.exportArrayStream(allocator, reader, stream); JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(), format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate); - stream.close(); - } catch (Throwable t) { - throwableWrapper = new RuntimeException(t); - throw throwableWrapper; } } /** - * Scan over an input {@link Scanner} then write all record batches to file, with default partitioning settings. + * Write a ArrowReader accepting Scanner into files, with default partitioning settings. * * @param reader the datasource for writing * @param format target file format * @param uri target file uri */ public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) { - write(allocator, reader, format, uri, new String[0], 1024, "dat_{i}"); + write(allocator, reader, format, uri, new String[0], 1024, "data_{i}"); } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java index 3adccbfd89a..c751fa51b65 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java @@ -17,11 +17,9 @@ package org.apache.arrow.dataset.scanner; -import org.apache.arrow.c.ArrowArray; -import org.apache.arrow.c.CDataDictionaryProvider; -import org.apache.arrow.c.Data; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -45,8 +43,6 @@ public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) { if (taskIterator.hasNext()) { currentTask = taskIterator.next(); currentReader = currentTask.execute(); - } else { - currentReader = null; } } @@ -63,14 +59,14 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { @Override public boolean loadNextBatch() throws IOException { if (currentReader == null) return false; - Boolean result = currentReader.loadNextBatch(); + boolean result = currentReader.loadNextBatch(); if (!result) { try { currentTask.close(); currentReader.close(); } catch (Exception e) { - throw new RuntimeException(e); + throw new IOException(e); } while (!result) { @@ -84,17 +80,12 @@ public boolean loadNextBatch() throws IOException { } } - // Load the currentReader#VectorSchemaRoot to ArrowArray - VectorSchemaRoot vsr = currentReader.getVectorSchemaRoot(); - ArrowArray array = ArrowArray.allocateNew(allocator); - Data.exportVectorSchemaRoot(allocator, vsr, currentReader, array); - - // Load the ArrowArray into ArrowScannerReader#VectorSchemaRoot - CDataDictionaryProvider provider = new CDataDictionaryProvider(); - Data.importIntoVectorSchemaRoot(allocator, - array, this.getVectorSchemaRoot(), provider); - array.close(); - provider.close(); + VectorLoader loader = new VectorLoader(this.getVectorSchemaRoot()); + VectorUnloader unloader = + new VectorUnloader(currentReader.getVectorSchemaRoot()); + try(ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { + loader.load(recordBatch); + } return true; } @@ -103,18 +94,6 @@ public long bytesRead() { return 0L; } - @Override - public void close() throws IOException { - try { - super.close(true); - currentTask.close(); - currentReader.close(); - scanner.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - @Override protected void closeReadSource() throws IOException { // no-op diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java index 443e342e183..7939e0f59a1 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.channels.Channels; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -34,15 +33,17 @@ import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.dataset.scanner.ArrowScannerReader; import org.apache.arrow.dataset.scanner.ScanOptions; -import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compare.VectorEqualsVisitor; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageSerializer; -import org.apache.arrow.vector.types.pojo.Schema; import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.ClassRule; @@ -61,19 +62,17 @@ public void testParquetWriteSimple() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 2, "b", 3, "c", 2, "d"); String sampleParquet = writeSupport.getOutputURI(); - FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), - FileFormat.PARQUET, sampleParquet); ScanOptions options = new ScanOptions(new String[0], 100); - final Dataset dataset = factory.finish(); - final Scanner scanner = dataset.newScan(options); - final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); final File writtenFolder = TMP.newFolder(); final String writtenParquet = writtenFolder.toURI().toString(); - try { + try (FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, sampleParquet); + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); + ) { DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet); assertParquetFileEquals(sampleParquet, Objects.requireNonNull(writtenFolder.listFiles())[0].toURI().toString()); - } finally { - AutoCloseables.close(factory, scanner, reader, dataset); } } @@ -82,18 +81,19 @@ public void testParquetWriteWithPartitions() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 2, "b", 3, "c", 2, "d"); String sampleParquet = writeSupport.getOutputURI(); - FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), - FileFormat.PARQUET, sampleParquet); ScanOptions options = new ScanOptions(new String[0], 100); - final Dataset dataset = factory.finish(); - final Scanner scanner = dataset.newScan(options); - final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); final File writtenFolder = TMP.newFolder(); final String writtenParquet = writtenFolder.toURI().toString(); - try { - DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}"); + + try (FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, sampleParquet); + final Dataset dataset = factory.finish(); + final Scanner scanner = dataset.newScan(options); + final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); + ) { + DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "data_{i}"); final Set expectedOutputFiles = new HashSet<>( - Arrays.asList("id=1/name=a/dat_0", "id=2/name=b/dat_0", "id=3/name=c/dat_0", "id=2/name=d/dat_0")); + Arrays.asList("id=1/name=a/data_0", "id=2/name=b/data_0", "id=3/name=c/data_0", "id=2/name=d/data_0")); final Set outputFiles = FileUtils.listFiles(writtenFolder, null, true) .stream() .map(file -> { @@ -101,8 +101,6 @@ public void testParquetWriteWithPartitions() throws Exception { }) .collect(Collectors.toSet()); Assert.assertEquals(expectedOutputFiles, outputFiles); - } finally { - AutoCloseables.close(factory, scanner, reader, dataset); } } @@ -117,9 +115,28 @@ private void assertParquetFileEquals(String expectedURI, String actualURI) throw new ScanOptions(new String[0], 100)); // fast-fail by comparing metadata Assert.assertEquals(expectedBatches.toString(), actualBatches.toString()); - // compare buffers - Assert.assertEquals(serialize(expectedBatches), serialize(actualBatches)); + // compare ArrowRecordBatches + VectorSchemaRoot expectVsr = VectorSchemaRoot.create(expectedFactory.inspect(), rootAllocator()); + VectorLoader expectLoader = new VectorLoader(expectVsr); + for(ArrowRecordBatch batch: expectedBatches) { + expectLoader.load(batch); + } + + VectorSchemaRoot actualVsr = VectorSchemaRoot.create(actualFactory.inspect(), rootAllocator()); + VectorLoader actualLoader = new VectorLoader(actualVsr); + for(ArrowRecordBatch batch: actualBatches) { + actualLoader.load(batch); + } + + for (int i = 0; i < expectVsr.getFieldVectors().size(); i++) { + FieldVector vector = expectVsr.getFieldVectors().get(i); + FieldVector otherVector = actualVsr.getFieldVectors().get(i); + Assert.assertTrue(VectorEqualsVisitor.vectorEquals(vector, otherVector)); + } + + // Assert.assertTrue(expectVsr.equals(actualVsr)); AutoCloseables.close(expectedBatches, actualBatches); + AutoCloseables.close(expectVsr, actualVsr); } private String serialize(List batches) throws IOException { From 46197746b295a85307b2e53b1a3671fd300c41ff Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 20 Oct 2022 21:20:27 +0000 Subject: [PATCH 10/14] resolve comments --- java/dataset/src/main/cpp/jni_wrapper.cc | 4 +- .../dataset/scanner/ArrowScannerReader.java | 8 ++- .../dataset/file/TestDatasetFileWriter.java | 56 ++++++++----------- 3 files changed, 32 insertions(+), 36 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index ad8c38e0f2b..cbd3123758c 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -555,8 +555,8 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream)); std::shared_ptr scanner_builder = arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader); - scanner_builder->Pool(arrow::default_memory_pool()); - auto scanner = scanner_builder->Finish().ValueOrDie(); + JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool())); + auto scanner = JniGetOrThrow(scanner_builder->Finish()); std::shared_ptr schema = reader->schema(); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java index c751fa51b65..431c4c64032 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java @@ -96,7 +96,13 @@ public long bytesRead() { @Override protected void closeReadSource() throws IOException { - // no-op + try { + currentTask.close(); + currentReader.close(); + scanner.close(); + } catch (Exception e) { + throw new IOException(e); + } } @Override diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java index 7939e0f59a1..86bbe759d02 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -90,7 +90,7 @@ public void testParquetWriteWithPartitions() throws Exception { final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); - ) { + ) { DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "data_{i}"); final Set expectedOutputFiles = new HashSet<>( Arrays.asList("id=1/name=a/data_0", "id=2/name=b/data_0", "id=3/name=c/data_0", "id=2/name=d/data_0")); @@ -107,44 +107,34 @@ public void testParquetWriteWithPartitions() throws Exception { private void assertParquetFileEquals(String expectedURI, String actualURI) throws Exception { final FileSystemDatasetFactory expectedFactory = new FileSystemDatasetFactory( rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, expectedURI); - List expectedBatches = collectResultFromFactory(expectedFactory, - new ScanOptions(new String[0], 100)); final FileSystemDatasetFactory actualFactory = new FileSystemDatasetFactory( rootAllocator(), NativeMemoryPool.getDefault(), FileFormat.PARQUET, actualURI); + List expectedBatches = collectResultFromFactory(expectedFactory, + new ScanOptions(new String[0], 100)); List actualBatches = collectResultFromFactory(actualFactory, new ScanOptions(new String[0], 100)); - // fast-fail by comparing metadata - Assert.assertEquals(expectedBatches.toString(), actualBatches.toString()); - // compare ArrowRecordBatches - VectorSchemaRoot expectVsr = VectorSchemaRoot.create(expectedFactory.inspect(), rootAllocator()); - VectorLoader expectLoader = new VectorLoader(expectVsr); - for(ArrowRecordBatch batch: expectedBatches) { - expectLoader.load(batch); - } - - VectorSchemaRoot actualVsr = VectorSchemaRoot.create(actualFactory.inspect(), rootAllocator()); - VectorLoader actualLoader = new VectorLoader(actualVsr); - for(ArrowRecordBatch batch: actualBatches) { - actualLoader.load(batch); - } - - for (int i = 0; i < expectVsr.getFieldVectors().size(); i++) { - FieldVector vector = expectVsr.getFieldVectors().get(i); - FieldVector otherVector = actualVsr.getFieldVectors().get(i); - Assert.assertTrue(VectorEqualsVisitor.vectorEquals(vector, otherVector)); - } - - // Assert.assertTrue(expectVsr.equals(actualVsr)); - AutoCloseables.close(expectedBatches, actualBatches); - AutoCloseables.close(expectVsr, actualVsr); - } + try ( + VectorSchemaRoot expectVsr = VectorSchemaRoot.create(expectedFactory.inspect(), rootAllocator()); + VectorSchemaRoot actualVsr = VectorSchemaRoot.create(actualFactory.inspect(), rootAllocator())) { - private String serialize(List batches) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - for (ArrowRecordBatch batch : batches) { - MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch); + // fast-fail by comparing metadata + Assert.assertEquals(expectedBatches.toString(), actualBatches.toString()); + // compare ArrowRecordBatches + Assert.assertEquals(expectedBatches.size(), actualBatches.size()); + VectorLoader expectLoader = new VectorLoader(expectVsr); + VectorLoader actualLoader = new VectorLoader(actualVsr); + for (int i = 0; i < expectedBatches.size(); i++) { + expectLoader.load(expectedBatches.get(i)); + actualLoader.load(actualBatches.get(i)); + for (int j = 0; j < expectVsr.getFieldVectors().size(); j++) { + FieldVector vector = expectVsr.getFieldVectors().get(i); + FieldVector otherVector = actualVsr.getFieldVectors().get(i); + Assert.assertTrue(VectorEqualsVisitor.vectorEquals(vector, otherVector)); + } + } + } finally { + AutoCloseables.close(expectedBatches, actualBatches); } - return Arrays.toString(out.toByteArray()); } } From 951462fed503a90fadcca510f73c47fc38ac20d6 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Thu, 20 Oct 2022 21:27:20 +0000 Subject: [PATCH 11/14] doc update --- java/dataset/src/main/cpp/jni_wrapper.cc | 2 +- .../apache/arrow/dataset/file/DatasetFileWriter.java | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index cbd3123758c..b3b5fe18c79 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -187,7 +187,7 @@ arrow::Result> SchemaFromColumnNames( if (maybe_field.ok()) { columns.push_back(std::move(maybe_field).ValueOrDie()); } else { - return arrow::Status::Invalid("The provided column name is not in arrow schema"); + return arrow::Status::Invalid("Partition column '", ref.ToString(), "' is not in dataset schema"); } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java index 93352933753..b2369b853ad 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java @@ -18,17 +18,9 @@ package org.apache.arrow.dataset.file; import org.apache.arrow.c.ArrowArrayStream; -import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.Data; -import org.apache.arrow.dataset.scanner.ArrowScannerReader; -import org.apache.arrow.dataset.scanner.ScanTask; -import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.ipc.ArrowReader; -import org.apache.arrow.vector.util.SchemaUtility; - -import java.util.Iterator; /** * JNI-based utility to write datasets into files. It internally depends on C++ static method @@ -37,7 +29,7 @@ public class DatasetFileWriter { /** - * Write a ArrowReader accepting Scanner into files. + * Write the contents of an ArrowReader as a dataset. * * @param reader the datasource for writing * @param format target file format @@ -57,7 +49,7 @@ public static void write(BufferAllocator allocator, ArrowReader reader, FileForm } /** - * Write a ArrowReader accepting Scanner into files, with default partitioning settings. + * Write the contents of an ArrowReader as a dataset, with default partitioning settings. * * @param reader the datasource for writing * @param format target file format From fbc99b51d847ea57419e9d5f8bef244de6533ef4 Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Tue, 25 Oct 2022 08:28:40 +0000 Subject: [PATCH 12/14] resolve comments --- .../apache/arrow/dataset/file/TestDatasetFileWriter.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java index 86bbe759d02..862ca45233b 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -17,10 +17,8 @@ package org.apache.arrow.dataset.file; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.IOException; -import java.nio.channels.Channels; + import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -35,16 +33,14 @@ import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; -import org.apache.arrow.flatbuf.RecordBatch; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.compare.VectorEqualsVisitor; -import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; -import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.commons.io.FileUtils; + import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -129,6 +125,7 @@ private void assertParquetFileEquals(String expectedURI, String actualURI) throw for (int j = 0; j < expectVsr.getFieldVectors().size(); j++) { FieldVector vector = expectVsr.getFieldVectors().get(i); FieldVector otherVector = actualVsr.getFieldVectors().get(i); + // TODO: ARROW-18140 Use VectorSchemaRoot#equals() method to compare Assert.assertTrue(VectorEqualsVisitor.vectorEquals(vector, otherVector)); } } From 5f86c71f9605a24bf88e06c0cf9c59bbd33b649e Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Wed, 26 Oct 2022 09:19:26 +0000 Subject: [PATCH 13/14] code style refine --- .../apache/arrow/dataset/file/JniWrapper.java | 6 +++--- .../dataset/scanner/ArrowScannerReader.java | 16 +++++++++++----- .../dataset/file/TestDatasetFileWriter.java | 6 +++--- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 0939ee2fa4a..18560a46a5c 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -46,10 +46,10 @@ private JniWrapper() { public native long makeFileSystemDatasetFactory(String uri, int fileFormat); /** - * Write all record batches in a {@link NativeRecordBatchIterator} into files. This internally + * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally * depends on C++ write API: FileSystemDataset::Write. * - * @param stream_address the ArrowArrayStream address + * @param streamAddress the ArrowArrayStream address * @param fileFormat target file format (ID) * @param uri target file uri * @param partitionColumns columns used to partition output files @@ -57,7 +57,7 @@ private JniWrapper() { * @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition * ID around all written files. */ - public native void writeFromScannerToFile(long stream_address, + public native void writeFromScannerToFile(long streamAddress, long fileFormat, String uri, String[] partitionColumns, diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java index 431c4c64032..ba2634cbf99 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java @@ -17,6 +17,9 @@ package org.apache.arrow.dataset.scanner; +import java.io.IOException; +import java.util.Iterator; + import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorUnloader; @@ -25,9 +28,10 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; -import java.io.IOException; -import java.util.Iterator; - +/** + * An implementation of {@link ArrowReader} that reads + * the dataset from {@link Scanner}. + */ public class ArrowScannerReader extends ArrowReader { private final Scanner scanner; @@ -58,7 +62,9 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { @Override public boolean loadNextBatch() throws IOException { - if (currentReader == null) return false; + if (currentReader == null) { + return false; + } boolean result = currentReader.loadNextBatch(); if (!result) { @@ -83,7 +89,7 @@ public boolean loadNextBatch() throws IOException { VectorLoader loader = new VectorLoader(this.getVectorSchemaRoot()); VectorUnloader unloader = new VectorUnloader(currentReader.getVectorSchemaRoot()); - try(ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { + try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { loader.load(recordBatch); } return true; diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java index 862ca45233b..10c06be2cca 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestDatasetFileWriter.java @@ -18,7 +18,6 @@ package org.apache.arrow.dataset.file; import java.io.File; - import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -40,7 +39,6 @@ import org.apache.arrow.vector.compare.VectorEqualsVisitor; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.commons.io.FileUtils; - import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -87,7 +85,9 @@ public void testParquetWriteWithPartitions() throws Exception { final Scanner scanner = dataset.newScan(options); final ArrowScannerReader reader = new ArrowScannerReader(scanner, rootAllocator()); ) { - DatasetFileWriter.write(rootAllocator(), reader, FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, 100, "data_{i}"); + DatasetFileWriter.write(rootAllocator(), reader, + FileFormat.PARQUET, writtenParquet, new String[]{"id", "name"}, + 100, "data_{i}"); final Set expectedOutputFiles = new HashSet<>( Arrays.asList("id=1/name=a/data_0", "id=2/name=b/data_0", "id=3/name=c/data_0", "id=2/name=d/data_0")); final Set outputFiles = FileUtils.listFiles(writtenFolder, null, true) From 50abe7f25fb7657b3dd03a36cd0b32e3965b720b Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Fri, 28 Oct 2022 08:36:51 +0000 Subject: [PATCH 14/14] Add the doc for ArrowScannerReader construct method --- .../apache/arrow/dataset/scanner/ArrowScannerReader.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java index ba2634cbf99..417ba837a3b 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ArrowScannerReader.java @@ -40,6 +40,12 @@ public class ArrowScannerReader extends ArrowReader { private ScanTask currentTask = null; private ArrowReader currentReader = null; + /** + * Constructs a scanner reader using a Scanner. + * + * @param scanner scanning data over dataset + * @param allocator to allocate new buffers + */ public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) { super(allocator); this.scanner = scanner;