diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c905645e170..501c541e175 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -256,6 +256,10 @@ if(MSVC) set(ARROW_USE_GLOG OFF) endif() +if(ARROW_JNI) + set(ARROW_BUILD_STATIC ON) +endif() + if(ARROW_ORC) set(ARROW_WITH_LZ4 ON) set(ARROW_WITH_SNAPPY ON) @@ -729,6 +733,10 @@ if(ARROW_PARQUET) endif() endif() +if(ARROW_JNI) + add_subdirectory(src/jni) +endif() + if(ARROW_GANDIVA) add_subdirectory(src/gandiva) endif() diff --git a/cpp/build-support/lint_cpp_cli.py b/cpp/build-support/lint_cpp_cli.py index ab2de5901a4..e0fee00cafa 100644 --- a/cpp/build-support/lint_cpp_cli.py +++ b/cpp/build-support/lint_cpp_cli.py @@ -77,6 +77,7 @@ def lint_file(path): arrow/visitor_inline.h gandiva/cache.h gandiva/jni + jni/ test internal''') diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake index 45cff6e9cca..5f042548885 100644 --- a/cpp/cmake_modules/BuildUtils.cmake +++ b/cpp/cmake_modules/BuildUtils.cmake @@ -139,7 +139,8 @@ function(ADD_ARROW_LIB LIB_NAME) PRIVATE_INCLUDES DEPENDENCIES SHARED_INSTALL_INTERFACE_LIBS - STATIC_INSTALL_INTERFACE_LIBS) + STATIC_INSTALL_INTERFACE_LIBS + OUTPUT_PATH) cmake_parse_arguments(ARG "${options}" "${one_value_args}" @@ -164,6 +165,11 @@ function(ADD_ARROW_LIB LIB_NAME) else() set(BUILD_STATIC ${ARROW_BUILD_STATIC}) endif() + if(ARG_OUTPUT_PATH) + set(OUTPUT_PATH ${ARG_OUTPUT_PATH}) + else() + set(OUTPUT_PATH ${BUILD_OUTPUT_ROOT_DIRECTORY}) + endif() if(WIN32 OR (CMAKE_GENERATOR STREQUAL Xcode)) # We need to compile C++ separately for each library kind (shared and static) @@ -234,11 +240,11 @@ function(ADD_ARROW_LIB LIB_NAME) set_target_properties(${LIB_NAME}_shared PROPERTIES LIBRARY_OUTPUT_DIRECTORY - "${BUILD_OUTPUT_ROOT_DIRECTORY}" + "${OUTPUT_PATH}" RUNTIME_OUTPUT_DIRECTORY - "${BUILD_OUTPUT_ROOT_DIRECTORY}" + "${OUTPUT_PATH}" PDB_OUTPUT_DIRECTORY - "${BUILD_OUTPUT_ROOT_DIRECTORY}" + "${OUTPUT_PATH}" LINK_FLAGS "${ARG_SHARED_LINK_FLAGS}" OUTPUT_NAME @@ -313,8 +319,7 @@ function(ADD_ARROW_LIB LIB_NAME) endif() set_target_properties(${LIB_NAME}_static - PROPERTIES LIBRARY_OUTPUT_DIRECTORY - "${BUILD_OUTPUT_ROOT_DIRECTORY}" OUTPUT_NAME + PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${OUTPUT_PATH}" OUTPUT_NAME ${LIB_NAME_STATIC}) if(ARG_STATIC_INSTALL_INTERFACE_LIBS) diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index b00af80e898..d041da5ac40 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -151,6 +151,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_ORC "Build the Arrow ORC adapter" OFF) + define_option(ARROW_JNI "Build the Arrow JNI lib" OFF) + define_option(ARROW_TENSORFLOW "Build Arrow with TensorFlow support enabled" OFF) define_option(ARROW_JEMALLOC "Build the Arrow jemalloc-based allocator" ON) diff --git a/cpp/src/jni/CMakeLists.txt b/cpp/src/jni/CMakeLists.txt new file mode 100644 index 00000000000..3872d671934 --- /dev/null +++ b/cpp/src/jni/CMakeLists.txt @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_jni +# + +if(ARROW_ORC) + add_subdirectory(orc) +endif() diff --git a/cpp/src/jni/orc/CMakeLists.txt b/cpp/src/jni/orc/CMakeLists.txt new file mode 100644 index 00000000000..54705fcfc48 --- /dev/null +++ b/cpp/src/jni/orc/CMakeLists.txt @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_orc_jni +# + +project(arrow_orc_jni) + +cmake_minimum_required(VERSION 3.11) + +find_package(JNI REQUIRED) + +add_custom_target(arrow_orc_jni) + +set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") + +add_subdirectory(../../../../java/adapter/orc ./java) + +set(ARROW_BUILD_STATIC OFF) + +add_arrow_lib(arrow_orc_jni + BUILD_SHARED + SOURCES + jni_wrapper.cpp + OUTPUTS + ARROW_ORC_JNI_LIBRARIES + SHARED_PRIVATE_LINK_LIBS + arrow_static + EXTRA_INCLUDES + ${JNI_HEADERS_DIR} + PRIVATE_INCLUDES + ${JNI_INCLUDE_DIRS} + ${CMAKE_CURRENT_BINARY_DIR} + DEPENDENCIES + arrow_static + arrow_orc_java + OUTPUT_PATH + ${CMAKE_CURRENT_BINARY_DIR}) + +add_dependencies(arrow_orc_jni ${ARROW_ORC_JNI_LIBRARIES}) diff --git a/cpp/src/jni/orc/concurrent_map.h b/cpp/src/jni/orc/concurrent_map.h new file mode 100644 index 00000000000..9ca2fc825df --- /dev/null +++ b/cpp/src/jni/orc/concurrent_map.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +#ifndef JNI_ID_TO_MODULE_MAP_H +#define JNI_ID_TO_MODULE_MAP_H + +#include +#include +#include +#include + +#include "arrow/util/macros.h" + +namespace arrow { +namespace jni { + +/** + * An utility class that map module id to module pointers. + * @tparam Holder class of the object to hold. + */ +template +class ConcurrentMap { + public: + ConcurrentMap() : module_id_(init_module_id_) {} + + jlong Insert(Holder holder) { + std::lock_guard lock(mtx_); + jlong result = module_id_++; + map_.insert(std::pair(result, holder)); + return result; + } + + void Erase(jlong module_id) { + std::lock_guard lock(mtx_); + map_.erase(module_id); + } + + Holder Lookup(jlong module_id) { + std::lock_guard lock(mtx_); + auto it = map_.find(module_id); + if (it != map_.end()) { + return it->second; + } + return NULLPTR; + } + + void Clear() { + std::lock_guard lock(mtx_); + map_.clear(); + } + + private: + // Initialize the module id starting value to a number greater than zero + // to allow for easier debugging of uninitialized java variables. + static constexpr int init_module_id_ = 4; + + int64_t module_id_; + std::mutex mtx_; + // map from module ids returned to Java and module pointers + std::unordered_map map_; +}; + +} // namespace jni +} // namespace arrow + +#endif // JNI_ID_TO_MODULE_MAP_H diff --git a/cpp/src/jni/orc/jni_wrapper.cpp b/cpp/src/jni/orc/jni_wrapper.cpp new file mode 100644 index 00000000000..f18bacc864d --- /dev/null +++ b/cpp/src/jni/orc/jni_wrapper.cpp @@ -0,0 +1,310 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "org_apache_arrow_adapter_orc_OrcMemoryJniWrapper.h" +#include "org_apache_arrow_adapter_orc_OrcReaderJniWrapper.h" +#include "org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper.h" + +#include "./concurrent_map.h" + +using ORCFileReader = arrow::adapters::orc::ORCFileReader; +using RecordBatchReader = arrow::RecordBatchReader; + +static jclass io_exception_class; +static jclass illegal_access_exception_class; +static jclass illegal_argument_exception_class; + +static jclass orc_field_node_class; +static jmethodID orc_field_node_constructor; + +static jclass orc_memory_class; +static jmethodID orc_memory_constructor; + +static jclass record_batch_class; +static jmethodID record_batch_constructor; + +static jint JNI_VERSION = JNI_VERSION_1_6; + +using arrow::jni::ConcurrentMap; + +static ConcurrentMap> buffer_holder_; +static ConcurrentMap> orc_stripe_reader_holder_; +static ConcurrentMap> orc_reader_holder_; + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { + jclass local_class = env->FindClass(class_name); + jclass global_class = (jclass)env->NewGlobalRef(local_class); + env->DeleteLocalRef(local_class); + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) { + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(illegal_access_exception_class, error_message.c_str()); + } + + return ret; +} + +std::string JStringToCString(JNIEnv* env, jstring string) { + int32_t jlen, clen; + clen = env->GetStringUTFLength(string); + jlen = env->GetStringLength(string); + std::vector buffer(clen); + env->GetStringUTFRegion(string, 0, jlen, buffer.data()); + return std::string(buffer.data(), clen); +} + +std::shared_ptr GetFileReader(JNIEnv* env, jlong id) { + auto reader = orc_reader_holder_.Lookup(id); + if (!reader) { + std::string error_message = "invalid reader id " + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + } + + return reader; +} + +std::shared_ptr GetStripeReader(JNIEnv* env, jlong id) { + auto reader = orc_stripe_reader_holder_.Lookup(id); + if (!reader) { + std::string error_message = "invalid stripe reader id " + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + } + + return reader; +} + +#ifdef __cplusplus +extern "C" { +#endif + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return JNI_ERR; + } + + io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;"); + illegal_access_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + + orc_field_node_class = + CreateGlobalClassReference(env, "Lorg/apache/arrow/adapter/orc/OrcFieldNode;"); + orc_field_node_constructor = GetMethodID(env, orc_field_node_class, "", "(II)V"); + + orc_memory_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/orc/OrcMemoryJniWrapper;"); + orc_memory_constructor = GetMethodID(env, orc_memory_class, "", "(JJJJ)V"); + + record_batch_class = + CreateGlobalClassReference(env, "Lorg/apache/arrow/adapter/orc/OrcRecordBatch;"); + record_batch_constructor = + GetMethodID(env, record_batch_class, "", + "(I[Lorg/apache/arrow/adapter/orc/OrcFieldNode;" + "[Lorg/apache/arrow/adapter/orc/OrcMemoryJniWrapper;)V"); + + env->ExceptionDescribe(); + + return JNI_VERSION; +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + JNIEnv* env; + vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + env->DeleteGlobalRef(io_exception_class); + env->DeleteGlobalRef(illegal_access_exception_class); + env->DeleteGlobalRef(illegal_argument_exception_class); + env->DeleteGlobalRef(orc_field_node_class); + env->DeleteGlobalRef(orc_memory_class); + env->DeleteGlobalRef(record_batch_class); + + buffer_holder_.Clear(); + orc_stripe_reader_holder_.Clear(); + orc_reader_holder_.Clear(); +} + +JNIEXPORT jlong JNICALL Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_open( + JNIEnv* env, jobject this_obj, jstring file_path) { + std::shared_ptr in_file; + + std::string path = JStringToCString(env, file_path); + + arrow::Status ret; + if (path.find("hdfs://") == 0) { + env->ThrowNew(io_exception_class, "hdfs path not supported yet."); + } else { + ret = arrow::io::ReadableFile::Open(path, &in_file); + } + + if (ret.ok()) { + std::unique_ptr reader; + + ret = ORCFileReader::Open( + std::static_pointer_cast(in_file), + arrow::default_memory_pool(), &reader); + + if (!ret.ok()) { + env->ThrowNew(io_exception_class, std::string("Failed open file" + path).c_str()); + } + + return orc_reader_holder_.Insert(std::shared_ptr(reader.release())); + } + + return static_cast(ret.code()) * -1; +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_close( + JNIEnv* env, jobject this_obj, jlong id) { + orc_reader_holder_.Erase(id); +} + +JNIEXPORT jboolean JNICALL Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_seek( + JNIEnv* env, jobject this_obj, jlong id, jint row_number) { + auto reader = GetFileReader(env, id); + return reader->Seek(row_number).ok(); +} + +JNIEXPORT jint JNICALL +Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_getNumberOfStripes(JNIEnv* env, + jobject this_obj, + jlong id) { + auto reader = GetFileReader(env, id); + return reader->NumberOfStripes(); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_nextStripeReader(JNIEnv* env, + jobject this_obj, + jlong id, + jlong batch_size) { + auto reader = GetFileReader(env, id); + + std::shared_ptr stripe_reader; + auto status = reader->NextStripeReader(batch_size, &stripe_reader); + + if (!status.ok()) { + return static_cast(status.code()) * -1; + } + + if (!stripe_reader) { + return static_cast(arrow::StatusCode::Invalid) * -1; + } + + return orc_stripe_reader_holder_.Insert(stripe_reader); +} + +JNIEXPORT jbyteArray JNICALL +Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_getSchema(JNIEnv* env, + jclass this_cls, + jlong id) { + auto stripe_reader = GetStripeReader(env, id); + + auto schema = stripe_reader->schema(); + + std::shared_ptr out; + auto status = + arrow::ipc::SerializeSchema(*schema, nullptr, arrow::default_memory_pool(), &out); + if (!status.ok()) { + return nullptr; + } + + jbyteArray ret = env->NewByteArray(out->size()); + auto src = reinterpret_cast(out->data()); + env->SetByteArrayRegion(ret, 0, out->size(), src); + return ret; +} + +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_next(JNIEnv* env, + jclass this_cls, + jlong id) { + auto stripe_reader = GetStripeReader(env, id); + + std::shared_ptr record_batch; + auto status = stripe_reader->ReadNext(&record_batch); + if (!status.ok() || !record_batch) { + return nullptr; + } + + auto schema = stripe_reader->schema(); + + // TODO: ARROW-4714 Ensure JVM has sufficient capacity to create local references + // create OrcFieldNode[] + jobjectArray field_array = + env->NewObjectArray(schema->num_fields(), orc_field_node_class, nullptr); + + std::vector> buffers; + for (int i = 0; i < schema->num_fields(); ++i) { + auto column = record_batch->column(i); + auto dataArray = column->data(); + jobject field = env->NewObject(orc_field_node_class, orc_field_node_constructor, + column->length(), column->null_count()); + env->SetObjectArrayElement(field_array, i, field); + + for (auto& buffer : dataArray->buffers) { + buffers.push_back(buffer); + } + } + + // create OrcMemoryJniWrapper[] + jobjectArray memory_array = + env->NewObjectArray(buffers.size(), orc_memory_class, nullptr); + + for (size_t j = 0; j < buffers.size(); ++j) { + auto buffer = buffers[j]; + jobject memory = env->NewObject(orc_memory_class, orc_memory_constructor, + buffer_holder_.Insert(buffer), buffer->data(), + buffer->size(), buffer->capacity()); + env->SetObjectArrayElement(memory_array, j, memory); + } + + // create OrcRecordBatch + jobject ret = env->NewObject(record_batch_class, record_batch_constructor, + record_batch->num_rows(), field_array, memory_array); + + return ret; +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_close( + JNIEnv* env, jclass this_cls, jlong id) { + orc_stripe_reader_holder_.Erase(id); +} + +JNIEXPORT void JNICALL Java_org_apache_arrow_adapter_orc_OrcMemoryJniWrapper_release( + JNIEnv* env, jobject this_obj, jlong id) { + buffer_holder_.Erase(id); +} + +#ifdef __cplusplus +} +#endif diff --git a/java/README.md b/java/README.md index c69ff88ffa2..cecdd400f37 100644 --- a/java/README.md +++ b/java/README.md @@ -45,6 +45,15 @@ mvn install -P gandiva -pl gandiva -am -Dgandiva.cpp.build.dir=../../debug This library is still in Alpha stages, and subject to API changes without deprecation warnings. +## Building and running tests for arrow jni (optional) +Arrow Cpp must be built before this step. The cpp build directory must +be provided as the value for argument arrow.cpp.build.dir. eg. + +``` +cd java +mvn install -P native-orc -pl arrow-jni -am -Darrow.cpp.build.dir=../../release +``` + ## Java Code Style Guide Arrow Java follows the Google style guide [here][3] with the following diff --git a/java/adapter/orc/CMakeLists.txt b/java/adapter/orc/CMakeLists.txt new file mode 100644 index 00000000000..c6facacf465 --- /dev/null +++ b/java/adapter/orc/CMakeLists.txt @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_orc_java +# + +# Headers: top level + +project(arrow_orc_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar( + arrow_orc_java + src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java + src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java + src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java + src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java + src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java + src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java + GENERATE_NATIVE_HEADERS arrow_orc_java-native + DESTINATION ${JNI_HEADERS_DIR} +) diff --git a/java/adapter/orc/pom.xml b/java/adapter/orc/pom.xml new file mode 100644 index 00000000000..f718dd2d166 --- /dev/null +++ b/java/adapter/orc/pom.xml @@ -0,0 +1,124 @@ + + + + + 4.0.0 + + + org.apache.arrow + arrow-memory + ${project.version} + compile + + + org.apache.arrow + arrow-vector + ${project.version} + compile + + + org.apache.arrow + arrow-format + ${project.version} + compile + + + org.apache.orc + orc-core + 1.5.5 + test + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + + + org.apache.hadoop + hadoop-common + 2.2.0 + test + + + commons-logging + commons-logging + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + javax.servlet + servlet-api + + + + + org.apache.hive + hive-storage-api + 2.6.0 + test + + + + + org.apache.arrow + arrow-java-root + 0.14.0-SNAPSHOT + ../../pom.xml + + + org.apache.arrow.orc + arrow-orc + Arrow Orc Adapter + jar + + ../../../cpp/release-build/ + + + + + + ${arrow.cpp.build.dir}/src/jni/orc + + **/libarrow_orc_jni.* + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + UTC + + + + + + diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java new file mode 100644 index 00000000000..716a1387660 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +/** + * Metadata about Vectors/Arrays that is passed via JNI interface. + */ +class OrcFieldNode { + + private final int length; + private final int nullCount; + + /** + * Construct a new instance. + * @param length the number of values written. + * @param nullCount the number of null values. + */ + public OrcFieldNode(int length, int nullCount) { + this.length = length; + this.nullCount = nullCount; + } + + int getLength() { + return length; + } + + int getNullCount() { + return nullCount; + } +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java new file mode 100644 index 00000000000..600569be7c7 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +/** + * Helper class for JNI related operations. + */ +class OrcJniUtils { + private static final String LIBRARY_NAME = "arrow_orc_jni"; + private static boolean isLoaded = false; + + private OrcJniUtils() {} + + static void loadOrcAdapterLibraryFromJar() + throws IOException, IllegalAccessException { + synchronized (OrcJniUtils.class) { + if (!isLoaded) { + final String libraryToLoad = System.mapLibraryName(LIBRARY_NAME); + final File libraryFile = moveFileFromJarToTemp( + System.getProperty("java.io.tmpdir"), libraryToLoad); + System.load(libraryFile.getAbsolutePath()); + isLoaded = true; + } + } + } + + private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad) + throws IOException { + final File temp = File.createTempFile(tmpDir, libraryToLoad); + try (final InputStream is = OrcReaderJniWrapper.class.getClassLoader() + .getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } else { + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + return temp; + } +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java new file mode 100644 index 00000000000..27f54b75c1c --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +/** + * Wrapper for orc memory allocated by native code. + */ +class OrcMemoryJniWrapper implements AutoCloseable { + + private final long nativeInstanceId; + + private final long memoryAddress; + + private final long size; + + private final long capacity; + + /** + * Construct a new instance. + * @param nativeInstanceId unique id of the underlying memory. + * @param memoryAddress starting memory address of the the underlying memory. + * @param size size of the valid data. + * @param capacity allocated memory size. + */ + OrcMemoryJniWrapper(long nativeInstanceId, long memoryAddress, long size, long capacity) { + this.nativeInstanceId = nativeInstanceId; + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + } + + /** + * Return the size of underlying chunk of memory that has valid data. + * @return valid data size + */ + long getSize() { + return size; + } + + /** + * Return the size of underlying chunk of memory managed by this OrcMemoryJniWrapper. + * @return underlying memory size + */ + long getCapacity() { + return capacity; + } + + /** + * Return the memory address of underlying chunk of memory. + * @return memory address + */ + long getMemoryAddress() { + return memoryAddress; + } + + @Override + public void close() { + release(nativeInstanceId); + } + + private native void release(long id); +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java new file mode 100644 index 00000000000..366489fe562 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Orc Reader that allow accessing orc stripes in Orc file. + * This orc reader basically acts like an ArrowReader iterator that + * iterate over orc stripes. Each stripe will be accessed via an + * ArrowReader. + */ +public class OrcReader implements AutoCloseable { + private final OrcReaderJniWrapper jniWrapper; + private BufferAllocator allocator; + + /** + * reference to native reader instance. + */ + private final long nativeInstanceId; + + /** + * Create an OrcReader that iterate over orc stripes. + * @param filePath file path to target file, currently only support local file. + * @param allocator allocator provided to ArrowReader. + * @throws IOException throws exception in case of file not found + */ + public OrcReader(String filePath, BufferAllocator allocator) throws IOException, IllegalAccessException { + this.allocator = allocator; + this.jniWrapper = OrcReaderJniWrapper.getInstance(); + this.nativeInstanceId = jniWrapper.open(filePath); + } + + /** + * Seek to designated row. Invoke NextStripeReader() after seek + * will return stripe reader starting from designated row. + * @param rowNumber the rows number to seek + * @return true if seek operation is succeeded + */ + public boolean seek(int rowNumber) throws IllegalArgumentException { + return jniWrapper.seek(nativeInstanceId, rowNumber); + } + + /** + * Get a stripe level ArrowReader with specified batchSize in each record batch. + * + * @param batchSize the number of rows loaded on each iteration + * @return ArrowReader that iterate over current stripes + */ + public ArrowReader nextStripeReader(long batchSize) throws IllegalArgumentException { + long stripeReaderId = jniWrapper.nextStripeReader(nativeInstanceId, batchSize); + if (stripeReaderId < 0) { + return null; + } + + return new OrcStripeReader(stripeReaderId, allocator); + } + + /** + * The number of stripes in the file. + * + * @return number of stripes + */ + public int getNumberOfStripes() throws IllegalArgumentException { + return jniWrapper.getNumberOfStripes(nativeInstanceId); + } + + @Override + public void close() throws Exception { + jniWrapper.close(nativeInstanceId); + } +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java new file mode 100644 index 00000000000..ff449c343c4 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.IOException; + +/** + * JNI wrapper for Orc reader. + */ +class OrcReaderJniWrapper { + + private static volatile OrcReaderJniWrapper INSTANCE; + + static OrcReaderJniWrapper getInstance() throws IOException, IllegalAccessException { + if (INSTANCE == null) { + synchronized (OrcReaderJniWrapper.class) { + if (INSTANCE == null) { + OrcJniUtils.loadOrcAdapterLibraryFromJar(); + INSTANCE = new OrcReaderJniWrapper(); + } + } + } + + return INSTANCE; + } + + /** + * Construct a orc file reader over the target file. + * @param fileName absolute file path of target file + * @return id of the orc reader instance if file opened successfully, + * otherwise return error code * -1. + */ + native long open(String fileName); + + /** + * Release resources associated with designated reader instance. + * @param readerId id of the reader instance. + */ + native void close(long readerId); + + /** + * Seek to designated row. Invoke nextStripeReader() after seek + * will return id of stripe reader starting from designated row. + * @param readerId id of the reader instance + * @param rowNumber the rows number to seek + * @return true if seek operation is succeeded + */ + native boolean seek(long readerId, int rowNumber); + + /** + * The number of stripes in the file. + * @param readerId id of the reader instance + * @return number of stripes + */ + native int getNumberOfStripes(long readerId); + + /** + * Get a stripe level ArrowReader with specified batchSize in each record batch. + * @param readerId id of the reader instance + * @param batchSize the number of rows loaded on each iteration + * @return id of the stripe reader instance. + */ + native long nextStripeReader(long readerId, long batchSize); +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java new file mode 100644 index 00000000000..a006cacab98 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.util.Arrays; +import java.util.List; + +/** + * Wrapper for record batch meta and native memory. + */ +class OrcRecordBatch { + final int length; + + /** + * Nodes correspond to the pre-ordered flattened logical schema. + */ + final List nodes; + + final List buffers; + + /** + * Construct a new instance. + * @param length number of records included in current batch + * @param nodes meta data for each fields + * @param buffers buffers for underlying data + */ + OrcRecordBatch(int length, OrcFieldNode[] nodes, OrcMemoryJniWrapper[] buffers) { + this.length = length; + this.nodes = Arrays.asList(nodes); + this.buffers = Arrays.asList(buffers); + } +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java new file mode 100644 index 00000000000..457d25e4f74 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OwnershipTransferResult; +import org.apache.arrow.memory.ReferenceManager; +import org.apache.arrow.util.Preconditions; + +import io.netty.buffer.ArrowBuf; + +/** + * A simple reference manager implementation for memory allocated by native code. + * The underlying memory will be released when reference count reach zero. + */ +public class OrcReferenceManager implements ReferenceManager { + private final AtomicInteger bufRefCnt = new AtomicInteger(0); + + private OrcMemoryJniWrapper memory; + + OrcReferenceManager(OrcMemoryJniWrapper memory) { + this.memory = memory; + } + + @Override + public int getRefCount() { + return bufRefCnt.get(); + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + Preconditions.checkState(decrement >= 1, + "ref count decrement should be greater than or equal to 1"); + // decrement the ref count + final int refCnt; + synchronized (this) { + refCnt = bufRefCnt.addAndGet(-decrement); + if (refCnt == 0) { + // refcount of this reference manager has dropped to 0 + // release the underlying memory + memory.close(); + } + } + // the new ref count should be >= 0 + Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); + return refCnt == 0; + } + + @Override + public void retain() { + retain(1); + } + + @Override + public void retain(int increment) { + Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment); + bufRefCnt.addAndGet(increment); + } + + @Override + public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { + retain(); + return srcBuffer; + } + + @Override + public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, int index, int length) { + final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; + + // create new ArrowBuf + final ArrowBuf derivedBuf = new ArrowBuf( + this, + null, + length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf + derivedBufferAddress, // starting byte address in the underlying memory for this new ArrowBuf, + false); + + return derivedBuf; + } + + @Override + public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { + throw new UnsupportedOperationException(); + } + + @Override + public BufferAllocator getAllocator() { + return null; + } + + @Override + public int getSize() { + return (int)memory.getSize(); + } + + @Override + public int getAccountedSize() { + return 0; + } +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java new file mode 100644 index 00000000000..c69e74ad352 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.stream.Collectors; + +import org.apache.arrow.flatbuf.MessageHeader; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageResult; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +import io.netty.buffer.ArrowBuf; + +/** + * Orc stripe that load data into ArrowRecordBatch. + */ +public class OrcStripeReader extends ArrowReader { + /** + * reference to native stripe reader instance. + */ + private final long nativeInstanceId; + + /** + * Construct a new instance. + * @param nativeInstanceId nativeInstanceId of the stripe reader instance, obtained by + * calling nextStripeReader from OrcReaderJniWrapper + * @param allocator memory allocator for accounting. + */ + OrcStripeReader(long nativeInstanceId, BufferAllocator allocator) { + super(allocator); + this.nativeInstanceId = nativeInstanceId; + } + + @Override + public boolean loadNextBatch() throws IOException { + OrcRecordBatch recordBatch = OrcStripeReaderJniWrapper.next(nativeInstanceId); + if (recordBatch == null) { + return false; + } + + ArrayList buffers = new ArrayList<>(); + for (OrcMemoryJniWrapper buffer : recordBatch.buffers) { + buffers.add(new ArrowBuf( + new OrcReferenceManager(buffer), + null, + (int)buffer.getSize(), + buffer.getMemoryAddress(), + false)); + } + + loadRecordBatch(new ArrowRecordBatch( + recordBatch.length, + recordBatch.nodes.stream() + .map(buf -> new ArrowFieldNode(buf.getLength(), buf.getNullCount())) + .collect(Collectors.toList()), + buffers)); + return true; + } + + @Override + public long bytesRead() { + return 0; + } + + + @Override + protected void closeReadSource() throws IOException { + OrcStripeReaderJniWrapper.close(nativeInstanceId); + } + + @Override + protected Schema readSchema() throws IOException { + byte[] schemaBytes = OrcStripeReaderJniWrapper.getSchema(nativeInstanceId); + + try (MessageChannelReader schemaReader = + new MessageChannelReader( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel(schemaBytes)), allocator)) { + + MessageResult result = schemaReader.readNext(); + if (result == null) { + throw new IOException("Unexpected end of input. Missing schema."); + } + + if (result.getMessage().headerType() != MessageHeader.Schema) { + throw new IOException("Expected schema but header was " + result.getMessage().headerType()); + } + + return MessageSerializer.deserializeSchema(result.getMessage()); + } + } + + @Override + protected ArrowDictionaryBatch readDictionary() throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java new file mode 100644 index 00000000000..1dd96986108 --- /dev/null +++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +/** + * JNI wrapper for orc stripe reader. + */ +class OrcStripeReaderJniWrapper { + + /** + * Get the schema of current stripe. + * @param readerId id of the stripe reader instance. + * @return serialized schema. + */ + static native byte[] getSchema(long readerId); + + /** + * Load next record batch. + * @param readerId id of the stripe reader instance. + * @return loaded record batch, return null when reached + * the end of current stripe. + */ + static native OrcRecordBatch next(long readerId); + + /** + * Release resources of underlying reader. + * @param readerId id of the stripe reader instance. + */ + static native void close(long readerId); +} diff --git a/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java b/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java new file mode 100644 index 00000000000..943b2cb230f --- /dev/null +++ b/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.orc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.List; + + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + + + +public class OrcReaderTest { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private static final int MAX_ALLOCATION = 8 * 1024; + private static RootAllocator allocator; + + @BeforeClass + public static void beforeClass() { + allocator = new RootAllocator(MAX_ALLOCATION); + } + + @Test + public void testOrcJniReader() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct"); + File testFile = new File(testFolder.getRoot(), "test-orc"); + + Writer writer = OrcFile.createWriter(new Path(testFile.getAbsolutePath()), + OrcFile.writerOptions(new Configuration()).setSchema(schema)); + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector longColumnVector = (LongColumnVector) batch.cols[0]; + BytesColumnVector bytesColumnVector = (BytesColumnVector) batch.cols[1]; + for (int r = 0; r < 1024; ++r) { + int row = batch.size++; + longColumnVector.vector[row] = r; + byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8); + bytesColumnVector.setRef(row, buffer, 0, buffer.length); + } + writer.addRowBatch(batch); + writer.close(); + + OrcReader reader = new OrcReader(testFile.getAbsolutePath(), allocator); + assertEquals(1, reader.getNumberOfStripes()); + + ArrowReader stripeReader = reader.nextStripeReader(1024); + VectorSchemaRoot schemaRoot = stripeReader.getVectorSchemaRoot(); + stripeReader.loadNextBatch(); + + List fields = schemaRoot.getFieldVectors(); + assertEquals(2, fields.size()); + + IntVector intVector = (IntVector)fields.get(0); + VarCharVector varCharVector = (VarCharVector)fields.get(1); + for (int i = 0; i < 1024; ++i) { + assertEquals(i, intVector.get(i)); + assertEquals("Last-" + (i * 3), new String(varCharVector.get(i), StandardCharsets.UTF_8)); + } + + assertFalse(stripeReader.loadNextBatch()); + assertNull(reader.nextStripeReader(1024)); + + stripeReader.close(); + reader.close(); + } +} diff --git a/java/pom.xml b/java/pom.xml index 666162571bd..0cdb2a32086 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -682,6 +682,21 @@ gandiva + + + + arrow-jni + + format + memory + vector + tools + adapter/jdbc + adapter/orc + plasma + flight + +