Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/jni/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@
if(ARROW_ORC)
add_subdirectory(orc)
endif()

if(ARROW_PARQUET)
add_subdirectory(parquet)
endif()
File renamed without changes.
249 changes: 249 additions & 0 deletions cpp/src/jni/jni_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <jni.h>
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/api.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/api.h"
#include "jni/concurrent_map.h"

static jclass io_exception_class;
static jclass illegal_access_exception_class;
static jclass illegal_argument_exception_class;

static jclass arrow_record_batch_builder_class;
static jmethodID arrow_record_batch_builder_constructor;

static jclass arrow_field_node_builder_class;
static jmethodID arrow_field_node_builder_constructor;

static jclass arrowbuf_builder_class;
static jmethodID arrowbuf_builder_constructor;

static arrow::jni::ConcurrentMap<std::shared_ptr<arrow::Buffer>> buffer_holder_;

jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
jclass local_class = env->FindClass(class_name);
jclass global_class = (jclass)env->NewGlobalRef(local_class);
env->DeleteLocalRef(local_class);
if (global_class == nullptr) {
std::string error_message =
"Unable to createGlobalClassReference for" + std::string(class_name);
env->ThrowNew(illegal_access_exception_class, error_message.c_str());
}
return global_class;
}

jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) {
jmethodID ret = env->GetMethodID(this_class, name, sig);
if (ret == nullptr) {
std::string error_message = "Unable to find method " + std::string(name) +
" within signature" + std::string(sig);
env->ThrowNew(illegal_access_exception_class, error_message.c_str());
}
return ret;
}

void LoadExceptionClassReferences(JNIEnv* env) {
io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;");
illegal_access_exception_class =
CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;");
illegal_argument_exception_class =
CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;");
}

void LoadRecordBatchClassReferences(JNIEnv* env) {
arrow_record_batch_builder_class = CreateGlobalClassReference(
env, "Lorg/apache/arrow/adapter/common/ArrowRecordBatchBuilder;");
arrow_record_batch_builder_constructor =
GetMethodID(env, arrow_record_batch_builder_class, "<init>",
"(I[Lorg/apache/arrow/adapter/common/ArrowFieldNodeBuilder;"
"[Lorg/apache/arrow/adapter/common/ArrowBufBuilder;)V");

arrow_field_node_builder_class = CreateGlobalClassReference(
env, "Lorg/apache/arrow/adapter/common/ArrowFieldNodeBuilder;");
arrow_field_node_builder_constructor =
GetMethodID(env, arrow_field_node_builder_class, "<init>", "(II)V");

arrowbuf_builder_class = CreateGlobalClassReference(
env, "Lorg/apache/arrow/adapter/common/ArrowBufBuilder;");
arrowbuf_builder_constructor =
GetMethodID(env, arrowbuf_builder_class, "<init>", "(JJIJ)V");
}

void UnloadExceptionClassReferences(JNIEnv* env) {
env->DeleteGlobalRef(io_exception_class);
env->DeleteGlobalRef(illegal_access_exception_class);
env->DeleteGlobalRef(illegal_argument_exception_class);
}

void UnloadRecordBatchClassReferences(JNIEnv* env) {
env->DeleteGlobalRef(arrow_field_node_builder_class);
env->DeleteGlobalRef(arrowbuf_builder_class);
env->DeleteGlobalRef(arrow_record_batch_builder_class);

buffer_holder_.Clear();
}

std::string JStringToCString(JNIEnv* env, jstring string) {
int32_t jlen, clen;
clen = env->GetStringUTFLength(string);
jlen = env->GetStringLength(string);
std::vector<char> buffer(clen);
env->GetStringUTFRegion(string, 0, jlen, buffer.data());
return std::string(buffer.data(), clen);
}

arrow::Status MakeRecordBatch(const std::shared_ptr<arrow::Schema>& schema, int num_rows,
int64_t* in_buf_addrs, int64_t* in_buf_sizes,
int in_bufs_len,
std::shared_ptr<arrow::RecordBatch>* batch) {
std::vector<std::shared_ptr<::arrow::ArrayData>> arrays;
auto num_fields = schema->num_fields();
int buf_idx = 0;
int sz_idx = 0;

for (int i = 0; i < num_fields; i++) {
auto field = schema->field(i);
std::vector<std::shared_ptr<::arrow::Buffer>> buffers;

if (buf_idx >= in_bufs_len) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}
int64_t validity_addr = in_buf_addrs[buf_idx++];
int64_t validity_size = in_buf_sizes[sz_idx++];
auto validity = std::shared_ptr<::arrow::Buffer>(
new ::arrow::Buffer(reinterpret_cast<uint8_t*>(validity_addr), validity_size));
buffers.push_back(validity);

if (buf_idx >= in_bufs_len) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}
int64_t value_addr = in_buf_addrs[buf_idx++];
int64_t value_size = in_buf_sizes[sz_idx++];
auto data = std::shared_ptr<::arrow::Buffer>(
new ::arrow::Buffer(reinterpret_cast<uint8_t*>(value_addr), value_size));
buffers.push_back(data);

if (arrow::is_binary_like(field->type()->id())) {
if (buf_idx >= in_bufs_len) {
return arrow::Status::Invalid("insufficient number of in_buf_addrs");
}

// add offsets buffer for variable-len fields.
int64_t offsets_addr = in_buf_addrs[buf_idx++];
int64_t offsets_size = in_buf_sizes[sz_idx++];
auto offsets = std::shared_ptr<::arrow::Buffer>(
new ::arrow::Buffer(reinterpret_cast<uint8_t*>(offsets_addr), offsets_size));
buffers.push_back(offsets);
}

auto array_data =
::arrow::ArrayData::Make(field->type(), num_rows, std::move(buffers));
arrays.push_back(array_data);
}
*batch = arrow::RecordBatch::Make(schema, num_rows, arrays);
return arrow::Status::OK();
}

jobject MakeRecordBatchBuilder(JNIEnv* env, std::shared_ptr<arrow::Schema> schema,
std::shared_ptr<arrow::RecordBatch> record_batch) {
jobjectArray field_array =
env->NewObjectArray(schema->num_fields(), arrow_field_node_builder_class, nullptr);

std::vector<std::shared_ptr<arrow::Buffer>> buffers;
for (int i = 0; i < schema->num_fields(); ++i) {
auto column = record_batch->column(i);
auto dataArray = column->data();
jobject field = env->NewObject(arrow_field_node_builder_class,
arrow_field_node_builder_constructor, column->length(),
column->null_count());
env->SetObjectArrayElement(field_array, i, field);

for (auto& buffer : dataArray->buffers) {
buffers.push_back(buffer);
}
}

jobjectArray arrowbuf_builder_array =
env->NewObjectArray(buffers.size(), arrowbuf_builder_class, nullptr);

for (size_t j = 0; j < buffers.size(); ++j) {
auto buffer = buffers[j];
uint8_t* data = nullptr;
int size = 0;
int64_t capacity = 0;
if (buffer != nullptr) {
data = (uint8_t*)buffer->data();
size = (int)buffer->size();
capacity = buffer->capacity();
}
jobject arrowBufBuilder =
env->NewObject(arrowbuf_builder_class, arrowbuf_builder_constructor,
buffer_holder_.Insert(buffer), data, size, capacity);
env->SetObjectArrayElement(arrowbuf_builder_array, j, arrowBufBuilder);
}

// create RecordBatch
jobject arrowRecordBatchBuilder = env->NewObject(
arrow_record_batch_builder_class, arrow_record_batch_builder_constructor,
record_batch->num_rows(), field_array, arrowbuf_builder_array);
return arrowRecordBatchBuilder;
}

jbyteArray ToSchemaByteArray(JNIEnv* env, std::shared_ptr<arrow::Schema> schema) {
arrow::Status status;
std::shared_ptr<arrow::Buffer> buffer;
status = arrow::ipc::SerializeSchema(*schema.get(), nullptr,
arrow::default_memory_pool(), &buffer);
if (!status.ok()) {
std::string error_message =
"Unable to convert schema to byte array, err is " + status.message();
env->ThrowNew(io_exception_class, error_message.c_str());
}

jbyteArray out = env->NewByteArray(buffer->size());
auto src = reinterpret_cast<const jbyte*>(buffer->data());
env->SetByteArrayRegion(out, 0, buffer->size(), src);
return out;
}

arrow::Status FromSchemaByteArray(JNIEnv* env, jbyteArray schemaBytes,
std::shared_ptr<arrow::Schema>* schema) {
arrow::Status status;
arrow::ipc::DictionaryMemo in_memo;

int schemaBytes_len = env->GetArrayLength(schemaBytes);
jbyte* schemaBytes_data = env->GetByteArrayElements(schemaBytes, 0);

auto serialized_schema =
std::make_shared<arrow::Buffer>((uint8_t*)schemaBytes_data, schemaBytes_len);
arrow::io::BufferReader buf_reader(serialized_schema);
status = arrow::ipc::ReadSchema(&buf_reader, &in_memo, schema);

env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT);

return status;
}

void ReleaseBuffer(jlong id) { buffer_holder_.Erase(id); }
2 changes: 1 addition & 1 deletion cpp/src/jni/orc/jni_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "org_apache_arrow_adapter_orc_OrcReaderJniWrapper.h"
#include "org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper.h"

#include "./concurrent_map.h"
#include "../concurrent_map.h"

using ORCFileReader = arrow::adapters::orc::ORCFileReader;
using RecordBatchReader = arrow::RecordBatchReader;
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/jni/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#
# arrow_parquet_jni
#

project(arrow_parquet_jni)

cmake_minimum_required(VERSION 3.11)

find_package(JNI REQUIRED)

add_custom_target(arrow_parquet_jni)

set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated")

add_subdirectory(../../../../java/adapter/common ./java)

set(ARROW_BUILD_STATIC OFF)

add_arrow_lib(arrow_parquet_jni
BUILD_SHARED
SOURCES
jni_wrapper.cc
adapter.cc
OUTPUTS
ARROW_PARQUET_JNI_LIBRARIES
SHARED_LINK_LIBS
parquet_shared
arrow_shared
EXTRA_INCLUDES
${JNI_HEADERS_DIR}
PRIVATE_INCLUDES
${JNI_INCLUDE_DIRS}
DEPENDENCIES
arrow_parquet_java)

add_dependencies(arrow_parquet_jni ${ARROW_PARQUET_JNI_LIBRARIES})
Loading