Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions cpp/src/jni/dataset/jni_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

#include "jni/dataset/jni_util.h"

#include "arrow/util/logging.h"

#include <memory>
#include <mutex>

#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
#include "arrow/util/logging.h"

namespace arrow {
namespace dataset {
namespace jni {
Expand Down Expand Up @@ -162,6 +165,15 @@ std::shared_ptr<ReservationListener> ReservationListenableMemoryPool::get_listen

ReservationListenableMemoryPool::~ReservationListenableMemoryPool() {}

Status CheckException(JNIEnv* env) {
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
return Status::Invalid("Error during calling Java code from native code");
Comment on lines +170 to +172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of simply dumping the exception, is it possible to add its description to the Status instance that is being returned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be deferred to a later JIRA btw.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'll open another ticket for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assigned to me. Thanks.

}
return Status::OK();
}

jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
jclass local_class = env->FindClass(class_name);
jclass global_class = (jclass)env->NewGlobalRef(local_class);
Expand Down Expand Up @@ -236,6 +248,17 @@ arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(
env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT);
return schema;
}
arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr<RecordBatch>& batch,
jlong struct_array) {
return arrow::ExportRecordBatch(*batch,
reinterpret_cast<struct ArrowArray*>(struct_array));
}

arrow::Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(
JNIEnv* env, const std::shared_ptr<Schema>& schema, jlong struct_array) {
return arrow::ImportRecordBatch(reinterpret_cast<struct ArrowArray*>(struct_array),
schema);
}

} // namespace jni
} // namespace dataset
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/jni/dataset/jni_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@

#pragma once

#include <jni.h>

#include "arrow/array.h"
#include "arrow/io/api.h"
#include "arrow/ipc/api.h"
#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/type.h"

#include <jni.h>

namespace arrow {
namespace dataset {
namespace jni {

Status CheckException(JNIEnv* env);

jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name);

arrow::Result<jmethodID> GetMethodID(JNIEnv* env, jclass this_class, const char* name,
Expand All @@ -48,6 +50,18 @@ arrow::Result<jbyteArray> ToSchemaByteArray(JNIEnv* env,
arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(JNIEnv* env,
jbyteArray schemaBytes);

/// \brief Export arrow::RecordBatch for Java (or other JVM languages) use.
/// The exported batch is subject to C data interface specification and can be
/// imported from Java side using provided JNI utilities.
arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr<RecordBatch>& batch,
jlong struct_array);

/// \brief Import arrow::RecordBatch from JVM language side. The input data should
/// ideally be exported from specific JNI utilities from JVM language side and should
/// conform to C data interface specification.
arrow::Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(
JNIEnv* env, const std::shared_ptr<Schema>& schema, jlong struct_array);

/// \brief Create a new shared_ptr on heap from shared_ptr t to prevent
/// the managed object from being garbage-collected.
///
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/jni/dataset/jni_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include "jni/dataset/jni_util.h"

#include <gtest/gtest.h>

#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
#include "jni/dataset/jni_util.h"

namespace arrow {
namespace dataset {
Expand Down
110 changes: 21 additions & 89 deletions cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
#include "arrow/filesystem/localfs.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"

#include "jni/dataset/jni_util.h"

#include "org_apache_arrow_dataset_file_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
Expand All @@ -37,14 +35,8 @@ jclass illegal_access_exception_class;
jclass illegal_argument_exception_class;
jclass runtime_exception_class;

jclass record_batch_handle_class;
jclass record_batch_handle_field_class;
jclass record_batch_handle_buffer_class;
jclass java_reservation_listener_class;

jmethodID record_batch_handle_constructor;
jmethodID record_batch_handle_field_constructor;
jmethodID record_batch_handle_buffer_constructor;
jmethodID reserve_memory_method;
jmethodID unreserve_memory_method;

Expand Down Expand Up @@ -100,11 +92,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
}
env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
return arrow::Status::Invalid("Error calling Java side reservation listener");
}
RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
}

Expand All @@ -114,11 +102,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
}
env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size);
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->ExceptionClear();
return arrow::Status::Invalid("Error calling Java side reservation listener");
}
RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
}

Expand Down Expand Up @@ -206,33 +190,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
runtime_exception_class =
CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;");

record_batch_handle_class =
CreateGlobalClassReference(env,
"Lorg/apache/arrow/"
"dataset/jni/NativeRecordBatchHandle;");
record_batch_handle_field_class =
CreateGlobalClassReference(env,
"Lorg/apache/arrow/"
"dataset/jni/NativeRecordBatchHandle$Field;");
record_batch_handle_buffer_class =
CreateGlobalClassReference(env,
"Lorg/apache/arrow/"
"dataset/jni/NativeRecordBatchHandle$Buffer;");
java_reservation_listener_class =
CreateGlobalClassReference(env,
"Lorg/apache/arrow/"
"dataset/jni/ReservationListener;");

record_batch_handle_constructor =
JniGetOrThrow(GetMethodID(env, record_batch_handle_class, "<init>",
"(J[Lorg/apache/arrow/dataset/"
"jni/NativeRecordBatchHandle$Field;"
"[Lorg/apache/arrow/dataset/"
"jni/NativeRecordBatchHandle$Buffer;)V"));
record_batch_handle_field_constructor =
JniGetOrThrow(GetMethodID(env, record_batch_handle_field_class, "<init>", "(JJ)V"));
record_batch_handle_buffer_constructor = JniGetOrThrow(
GetMethodID(env, record_batch_handle_buffer_class, "<init>", "(JJJJ)V"));
reserve_memory_method =
JniGetOrThrow(GetMethodID(env, java_reservation_listener_class, "reserve", "(J)V"));
unreserve_memory_method = JniGetOrThrow(
Expand All @@ -250,9 +211,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
env->DeleteGlobalRef(illegal_access_exception_class);
env->DeleteGlobalRef(illegal_argument_exception_class);
env->DeleteGlobalRef(runtime_exception_class);
env->DeleteGlobalRef(record_batch_handle_class);
env->DeleteGlobalRef(record_batch_handle_field_class);
env->DeleteGlobalRef(record_batch_handle_buffer_class);
env->DeleteGlobalRef(java_reservation_listener_class);

default_memory_pool_id = -1L;
Expand Down Expand Up @@ -458,25 +416,21 @@ Java_org_apache_arrow_dataset_jni_JniWrapper_getSchemaFromScanner(JNIEnv* env, j
/*
* Class: org_apache_arrow_dataset_jni_JniWrapper
* Method: nextRecordBatch
* Signature: (J)Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle;
* Signature: (JJ)Z
*/
JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch(
JNIEnv* env, jobject, jlong scanner_id) {
JNIEXPORT jboolean JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch(
JNIEnv* env, jobject, jlong scanner_id, jlong struct_array) {
JNI_METHOD_START
std::shared_ptr<DisposableScannerAdaptor> scanner_adaptor =
RetrieveNativeInstance<DisposableScannerAdaptor>(scanner_id);

std::shared_ptr<arrow::RecordBatch> record_batch =
JniGetOrThrow(scanner_adaptor->Next());
if (record_batch == nullptr) {
return nullptr; // stream ended
return false; // stream ended
}
std::shared_ptr<arrow::Schema> schema = record_batch->schema();
jobjectArray field_array =
env->NewObjectArray(schema->num_fields(), record_batch_handle_field_class, nullptr);

std::vector<std::shared_ptr<arrow::Buffer>> buffers;
for (int i = 0; i < schema->num_fields(); ++i) {
std::vector<std::shared_ptr<arrow::Array>> offset_zeroed_arrays;
for (int i = 0; i < record_batch->num_columns(); ++i) {
// TODO: If the array has an offset then we need to de-offset the array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still relevant? It seems it's not a TODO anymore but an explanatory note

Copy link
Member Author

@zhztheplayer zhztheplayer Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still valid. The codes still copy and rebuild offset-buffers because Java C Data Interface doesn't implement exporing/importing for offset-buffers too. And I believe to do that we should make systematic change to Java code since Arrow Java didn't implement the same offset semantic comparing to C++.

// in order for it to be properly consumed on the Java end.
// This forces a copy, it would be nice to avoid this if Java
Expand All @@ -485,44 +439,22 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor
//
// Generally a non-zero offset will occur whenever the scanner batch
// size is smaller than the batch size of the underlying files.
auto column = record_batch->column(i);
if (column->offset() != 0) {
column = JniGetOrThrow(arrow::Concatenate({column}));
}
auto dataArray = column->data();
jobject field = env->NewObject(record_batch_handle_field_class,
record_batch_handle_field_constructor,
column->length(), column->null_count());
env->SetObjectArrayElement(field_array, i, field);

for (auto& buffer : dataArray->buffers) {
buffers.push_back(buffer);
std::shared_ptr<arrow::Array> array = record_batch->column(i);
if (array->offset() == 0) {
offset_zeroed_arrays.push_back(array);
continue;
}
std::shared_ptr<arrow::Array> offset_zeroed =
JniGetOrThrow(arrow::Concatenate({array}));
offset_zeroed_arrays.push_back(offset_zeroed);
}

jobjectArray buffer_array =
env->NewObjectArray(buffers.size(), record_batch_handle_buffer_class, nullptr);

for (size_t j = 0; j < buffers.size(); ++j) {
auto buffer = buffers[j];
uint8_t* data = nullptr;
int64_t size = 0;
int64_t capacity = 0;
if (buffer != nullptr) {
data = (uint8_t*)buffer->data();
size = buffer->size();
capacity = buffer->capacity();
}
jobject buffer_handle = env->NewObject(record_batch_handle_buffer_class,
record_batch_handle_buffer_constructor,
CreateNativeRef(buffer), data, size, capacity);
env->SetObjectArrayElement(buffer_array, j, buffer_handle);
}

jobject ret = env->NewObject(record_batch_handle_class, record_batch_handle_constructor,
record_batch->num_rows(), field_array, buffer_array);
return ret;
JNI_METHOD_END(nullptr)
std::shared_ptr<arrow::RecordBatch> offset_zeroed_batch = arrow::RecordBatch::Make(
record_batch->schema(), record_batch->num_rows(), offset_zeroed_arrays);
JniAssertOkOrThrow(
arrow::dataset::jni::ExportRecordBatch(env, offset_zeroed_batch, struct_array));
return true;
JNI_METHOD_END(false)
}

/*
Expand Down
1 change: 0 additions & 1 deletion java/dataset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ message("generating headers to ${JNI_HEADERS_DIR}")
add_jar(arrow_dataset_java
src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
Expand Down
18 changes: 12 additions & 6 deletions java/dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
Expand All @@ -56,6 +62,12 @@
<version>${parquet.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
Expand Down Expand Up @@ -86,12 +98,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum FileFormat {
PARQUET(0),
NONE(-1);

private int id;
private final int id;

FileFormat(int id) {
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,25 @@ private JniWrapper() {

/**
* Release the Scanner by destroying its reference held by JNI wrapper.
*
* @param scannerId the native pointer of the arrow::dataset::Scanner instance.
*/
public native void closeScanner(long scannerId);

/**
* Read next record batch from the specified scanner.
*
* @param scannerId the native pointer of the arrow::dataset::Scanner instance.
* @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch.
* @param arrowArray pointer to an empty {@link org.apache.arrow.c.ArrowArray} struct to
* store C++ side record batch that conforms to C data interface.
* @return true if valid record batch is returned; false if stream ended.
*/
public native NativeRecordBatchHandle nextRecordBatch(long scannerId);
public native boolean nextRecordBatch(long scannerId, long arrowArray);

/**
* Release the Buffer by destroying its reference held by JNI wrapper.
*
* @param bufferId the native pointer of the arrow::Buffer instance.
*/
public native void releaseBuffer(long bufferId);

}
Loading