diff --git a/cpp/src/jni/CMakeLists.txt b/cpp/src/jni/CMakeLists.txt index 3872d671934..e56ad0c5e10 100644 --- a/cpp/src/jni/CMakeLists.txt +++ b/cpp/src/jni/CMakeLists.txt @@ -22,3 +22,7 @@ if(ARROW_ORC) add_subdirectory(orc) endif() + +if(ARROW_PARQUET) + add_subdirectory(parquet) +endif() diff --git a/cpp/src/jni/parquet/CMakeLists.txt b/cpp/src/jni/parquet/CMakeLists.txt new file mode 100644 index 00000000000..4a5c36aba77 --- /dev/null +++ b/cpp/src/jni/parquet/CMakeLists.txt @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_parquet_jni +# + +project(arrow_parquet_jni) + +cmake_minimum_required(VERSION 3.11) + +find_package(JNI REQUIRED) + +add_custom_target(arrow_parquet_jni) + +set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") + +add_subdirectory(../../../../java/adapter/parquet ./java) + +set(ARROW_BUILD_STATIC OFF) + +add_arrow_lib(arrow_parquet_jni + BUILD_SHARED + SOURCES + jni_wrapper.cpp + hdfs_connector.cc + file_connector.cc + parquet_reader.cc + parquet_writer.cc + OUTPUTS + ARROW_PARQUET_JNI_LIBRARIES + SHARED_LINK_LIBS + parquet_shared + arrow_shared + EXTRA_INCLUDES + ${JNI_HEADERS_DIR} + PRIVATE_INCLUDES + ${JNI_INCLUDE_DIRS} + DEPENDENCIES + arrow_parquet_java) + +add_dependencies(arrow_parquet_jni ${ARROW_PARQUET_JNI_LIBRARIES}) diff --git a/cpp/src/jni/parquet/concurrent_map.h b/cpp/src/jni/parquet/concurrent_map.h new file mode 100644 index 00000000000..9ca2fc825df --- /dev/null +++ b/cpp/src/jni/parquet/concurrent_map.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +#ifndef JNI_ID_TO_MODULE_MAP_H +#define JNI_ID_TO_MODULE_MAP_H + +#include +#include +#include +#include + +#include "arrow/util/macros.h" + +namespace arrow { +namespace jni { + +/** + * An utility class that map module id to module pointers. + * @tparam Holder class of the object to hold. + */ +template +class ConcurrentMap { + public: + ConcurrentMap() : module_id_(init_module_id_) {} + + jlong Insert(Holder holder) { + std::lock_guard lock(mtx_); + jlong result = module_id_++; + map_.insert(std::pair(result, holder)); + return result; + } + + void Erase(jlong module_id) { + std::lock_guard lock(mtx_); + map_.erase(module_id); + } + + Holder Lookup(jlong module_id) { + std::lock_guard lock(mtx_); + auto it = map_.find(module_id); + if (it != map_.end()) { + return it->second; + } + return NULLPTR; + } + + void Clear() { + std::lock_guard lock(mtx_); + map_.clear(); + } + + private: + // Initialize the module id starting value to a number greater than zero + // to allow for easier debugging of uninitialized java variables. + static constexpr int init_module_id_ = 4; + + int64_t module_id_; + std::mutex mtx_; + // map from module ids returned to Java and module pointers + std::unordered_map map_; +}; + +} // namespace jni +} // namespace arrow + +#endif // JNI_ID_TO_MODULE_MAP_H diff --git a/cpp/src/jni/parquet/connector.h b/cpp/src/jni/parquet/connector.h new file mode 100644 index 00000000000..cedc75fb5bc --- /dev/null +++ b/cpp/src/jni/parquet/connector.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef CONNECTOR_H +#define CONNECTOR_H + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace jni { +namespace parquet { + +/// \brief A base class for Connectors +/// +/// This class is used by ParquetReader and ParquetWriter to handle +/// parquet files from different resources, local and hdfs for now using +/// unified API. +class Connector { + public: + Connector() = default; + virtual ~Connector() {} + std::string GetFileName() { return file_path_; } + virtual ::arrow::Status OpenReadable(bool option) = 0; + virtual ::arrow::Status OpenWritable(bool option, int replication) = 0; + virtual std::shared_ptr<::arrow::io::RandomAccessFile> GetReader() = 0; + virtual std::shared_ptr<::arrow::io::OutputStream> GetWriter() = 0; + virtual void TearDown() = 0; + + protected: + std::string file_path_; + std::string dir_path_; + virtual ::arrow::Status Mkdir(std::string path) = 0; +}; +} // namespace parquet +} // namespace jni + +#endif diff --git a/cpp/src/jni/parquet/file_connector.cc b/cpp/src/jni/parquet/file_connector.cc new file mode 100644 index 00000000000..d2933191e77 --- /dev/null +++ b/cpp/src/jni/parquet/file_connector.cc @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jni/parquet/file_connector.h" + +#include +#include +#include +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" + +namespace jni { +namespace parquet { + +FileConnector::FileConnector(std::string path) { + file_path_ = path; + auto path_pair = arrow::fs::internal::GetAbstractPathParent(file_path_); + dir_path_ = path_pair.first; +} + +FileConnector::~FileConnector() {} + +void FileConnector::TearDown() { + if (file_writer_) { + file_writer_->Close(); + } +} + +::arrow::Status FileConnector::OpenReadable(bool option) { + ::arrow::Status msg = ::arrow::io::ReadableFile::Open(file_path_, &file_reader_); + if (!msg.ok()) { + std::cerr << "Open file failed, file name is " << file_path_ << ", error is : " << msg + << std::endl; + return msg; + } + return msg; +} + +::arrow::Status FileConnector::OpenWritable(bool option, int replication) { + ::arrow::Status msg; + if (!dir_path_.empty()) { + msg = Mkdir(dir_path_); + if (!msg.ok()) { + std::cerr << "Mkdir for path failed " << dir_path_ << ", error is : " << msg + << std::endl; + return msg; + } + } + + msg = ::arrow::io::FileOutputStream::Open(file_path_, false, &file_writer_); + if (!msg.ok()) { + std::cerr << "Open file failed, file name is " << file_path_ << ", error is : " << msg + << std::endl; + return msg; + } + return msg; +} + +::arrow::Status FileConnector::Mkdir(std::string path) { + std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs = + std::make_shared<::arrow::fs::LocalFileSystem>(); + ::arrow::Status msg = local_fs->CreateDir(path); + if (!msg.ok()) { + std::cerr << "Make Directory for " << path << " failed, error is: " << msg + << std::endl; + return msg; + } + return msg; +} + +} // namespace parquet +} // namespace jni diff --git a/cpp/src/jni/parquet/file_connector.h b/cpp/src/jni/parquet/file_connector.h new file mode 100644 index 00000000000..adf72436aad --- /dev/null +++ b/cpp/src/jni/parquet/file_connector.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef FILE_CONNECTOR_H +#define FILE_CONNECTOR_H + +#include +#include +#include +#include "jni/parquet/connector.h" + +namespace jni { +namespace parquet { + +/// \brief A connector to handle local parquet file +/// +/// This class is derived from base class "Connector", used to provide +/// an unified interface for ParquetReader and ParquetWriter to handle +/// files from different sources. +/// Member methods are wrappers of some methods under arrow/io/file.h +class FileConnector : public Connector { + public: + /// \brief Construction of this class + /// \param[in] path local file path + explicit FileConnector(std::string path); + ~FileConnector(); + + /// \brief Open local parquet file as readable handler + /// \param[in] option a param holder, not used + ::arrow::Status OpenReadable(bool option); + + /// \brief Open local parquet file as writable handler + /// \param[in] option a param holder, not used + /// \param[in] replication a param holder, not used + ::arrow::Status OpenWritable(bool option, int replication); + + /// \brief Get reader handler + std::shared_ptr<::arrow::io::RandomAccessFile> GetReader() { return file_reader_; } + + /// \brief Get writer handler + std::shared_ptr<::arrow::io::OutputStream> GetWriter() { return file_writer_; } + + /// \brief Tear down connection and handlers + void TearDown(); + + protected: + ::arrow::Status Mkdir(std::string path); + std::shared_ptr<::arrow::io::ReadableFile> file_reader_; + std::shared_ptr<::arrow::io::OutputStream> file_writer_; +}; +} // namespace parquet +} // namespace jni + +#endif diff --git a/cpp/src/jni/parquet/hdfs_connector.cc b/cpp/src/jni/parquet/hdfs_connector.cc new file mode 100644 index 00000000000..3c7bc7477b0 --- /dev/null +++ b/cpp/src/jni/parquet/hdfs_connector.cc @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jni/parquet/hdfs_connector.h" + +#include +#include +#include +#include +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include "arrow/filesystem/path_util.h" + +namespace jni { +namespace parquet { + +HdfsConnector::HdfsConnector(std::string hdfs_path) : driver_loaded_(false) { + GetHdfsHostAndPort(hdfs_path, &hdfs_config_); + file_path_ = GetFileName(hdfs_path); + auto path_pair = arrow::fs::internal::GetAbstractPathParent(file_path_); + dir_path_ = path_pair.first; +} + +HdfsConnector::~HdfsConnector() {} + +::arrow::Status HdfsConnector::SetupHdfsClient(bool use_hdfs3) { + if (hdfs_client_) { + return ::arrow::Status::OK(); + } + if (!driver_loaded_) { + ::arrow::Status msg; + if (use_hdfs3) { + hdfs_config_.driver = ::arrow::io::HdfsDriver::LIBHDFS3; + } else { + hdfs_config_.driver = ::arrow::io::HdfsDriver::LIBHDFS; + } + driver_loaded_ = true; + } + + return ::arrow::io::HadoopFileSystem::Connect(&hdfs_config_, &hdfs_client_); +} + +::arrow::Status HdfsConnector::GetHdfsHostAndPort( + std::string hdfs_path, ::arrow::io::HdfsConnectionConfig* hdfs_conf) { + std::string search_str0 = std::string(":"); + std::string::size_type pos0 = hdfs_path.find_first_of(search_str0, 7); + + std::string search_str1 = std::string("/"); + std::string::size_type pos1 = hdfs_path.find_first_of(search_str1, pos0); + + if ((pos0 == std::string::npos) || (pos1 == std::string::npos)) { + std::cerr << "No host and port information. Use default hdfs port!"; + hdfs_conf->host = "localhost"; + hdfs_conf->port = 20500; + } else { + hdfs_conf->host = hdfs_path.substr(7, pos0 - 7); + hdfs_conf->port = std::stoul(hdfs_path.substr(pos0 + 1, pos1 - pos0 - 1)); + } + return ::arrow::Status::OK(); +} + +std::string HdfsConnector::GetFileName(std::string path) { + std::string search_str0 = std::string(":"); + std::string::size_type pos0 = path.find_first_of(search_str0, 7); + std::string file_name; + if (pos0 == std::string::npos) { + file_name = path.substr(7, std::string::npos); + } else { + std::string search_str1 = std::string("/"); + std::string::size_type pos1 = path.find_first_of(search_str1, 7); + file_name = path.substr(pos1, std::string::npos); + } + return file_name; +} + +void HdfsConnector::TearDown() { + if (file_writer_) { + file_writer_->Close(); + } + if (file_reader_) { + file_reader_->Close(); + } + if (hdfs_client_) { + hdfs_client_->Disconnect(); + hdfs_client_ = nullptr; + } +} + +::arrow::Status HdfsConnector::OpenReadable(bool use_hdfs3) { + ::arrow::Status msg; + if (!hdfs_client_) { + msg = SetupHdfsClient(use_hdfs3); + if (!msg.ok()) { + std::cerr << "connect HDFS failed, error is : " << msg << std::endl; + return msg; + } + } + msg = hdfs_client_->OpenReadable(file_path_, &file_reader_); + if (!msg.ok()) { + std::cerr << "Open HDFS file failed, file name is " << file_path_ + << ", error is : " << msg << std::endl; + return msg; + } + return msg; +} + +::arrow::Status HdfsConnector::OpenWritable(bool use_hdfs3, int replication) { + ::arrow::Status msg; + if (!hdfs_client_) { + msg = SetupHdfsClient(use_hdfs3); + if (!msg.ok()) { + std::cerr << "connect HDFS failed, error is : " << msg << std::endl; + return msg; + } + } + if (!dir_path_.empty()) { + msg = Mkdir(dir_path_); + if (!msg.ok()) { + std::cerr << "Mkdir for HDFS path failed " << dir_path_ << ", error is : " << msg + << std::endl; + return msg; + } + } + + msg = hdfs_client_->OpenWritable(file_path_, false, buffer_size_, (int16_t)replication, + default_block_size_, &file_writer_); + if (!msg.ok()) { + std::cerr << "Open HDFS file failed, file name is " << file_path_ + << ", error is : " << msg << std::endl; + return msg; + } + return msg; +} + +::arrow::Status HdfsConnector::Mkdir(std::string path) { + return hdfs_client_->MakeDirectory(path); +} +} // namespace parquet +} // namespace jni diff --git a/cpp/src/jni/parquet/hdfs_connector.h b/cpp/src/jni/parquet/hdfs_connector.h new file mode 100644 index 00000000000..6d05cf33f27 --- /dev/null +++ b/cpp/src/jni/parquet/hdfs_connector.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef HDFS_CONNECTOR_H +#define HDFS_CONNECTOR_H + +#include +#include +#include +#include "jni/parquet/connector.h" + +namespace jni { +namespace parquet { + +/// \brief A connector to handle hdfs parquet file +/// +/// This class is derived from base class "Connector", used to provide +/// an unified interface for ParquetReader and ParquetWriter to handle +/// files from different sources. +/// Member methods are wrappers of some methods under arrow/io/hdfs.h +class HdfsConnector : public Connector { + public: + /// \brief Construction of this class + /// \param[in] hdfs_path hdfs:// is expected at the beginning of the string + explicit HdfsConnector(std::string hdfs_path); + ~HdfsConnector(); + + /// \brief Open hdfs parquet file as readable handler + /// \param[in] use_hdfs3 an option to indicate if libhdfs3 or libhdfs should be expected + ::arrow::Status OpenReadable(bool use_hdfs3); + + /// \brief Open hdfs parquet file as writable handler + /// \param[in] use_hdfs3 an option to indicate if libhdfs3 or libhdfs should be expected + /// \param[in] replication an option to indicate the repliation number of HDFS + ::arrow::Status OpenWritable(bool use_hdfs3, int replication); + + /// \brief Get reader handler + std::shared_ptr<::arrow::io::RandomAccessFile> GetReader() { return file_reader_; } + + /// \brief Get writer handler + std::shared_ptr<::arrow::io::OutputStream> GetWriter() { return file_writer_; } + + /// \brief Tear down connection and handlers + void TearDown(); + + protected: + std::shared_ptr<::arrow::io::HadoopFileSystem> hdfs_client_; + std::shared_ptr<::arrow::io::HdfsReadableFile> file_reader_; + std::shared_ptr<::arrow::io::HdfsOutputStream> file_writer_; + ::arrow::io::HdfsConnectionConfig hdfs_config_; + bool driver_loaded_ = false; + int32_t buffer_size_ = 0; + int64_t default_block_size_ = 0; + + ::arrow::Status SetupHdfsClient(bool use_hdfs3); + ::arrow::Status GetHdfsHostAndPort(std::string hdfs_path, + ::arrow::io::HdfsConnectionConfig* hdfs_conf); + std::string GetFileName(std::string file_path); + ::arrow::Status Mkdir(std::string path); +}; +} // namespace parquet +} // namespace jni + +#endif diff --git a/cpp/src/jni/parquet/jni_wrapper.cpp b/cpp/src/jni/parquet/jni_wrapper.cpp new file mode 100644 index 00000000000..209b55ab3c0 --- /dev/null +++ b/cpp/src/jni/parquet/jni_wrapper.cpp @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "jni/parquet/concurrent_map.h" +#include "jni/parquet/parquet_reader.h" +#include "jni/parquet/parquet_writer.h" + +static jclass arrow_record_batch_builder_class; +static jmethodID arrow_record_batch_builder_constructor; + +static jclass arrow_field_node_builder_class; +static jmethodID arrow_field_node_builder_constructor; + +static jclass arrowbuf_builder_class; +static jmethodID arrowbuf_builder_constructor; + +using arrow::jni::ConcurrentMap; +static ConcurrentMap> buffer_holder_; + +static jclass io_exception_class; +static jclass illegal_access_exception_class; +static jclass illegal_argument_exception_class; + +static jint JNI_VERSION = JNI_VERSION_1_8; + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { + jclass local_class = env->FindClass(class_name); + jclass global_class = (jclass)env->NewGlobalRef(local_class); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_message = + "Unable to createGlobalClassReference for" + std::string(class_name); + env->ThrowNew(illegal_access_exception_class, error_message.c_str()); + } + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) { + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(illegal_access_exception_class, error_message.c_str()); + } + + return ret; +} + +std::string JStringToCString(JNIEnv* env, jstring string) { + int32_t jlen, clen; + clen = env->GetStringUTFLength(string); + jlen = env->GetStringLength(string); + std::vector buffer(clen); + env->GetStringUTFRegion(string, 0, jlen, buffer.data()); + return std::string(buffer.data(), clen); +} + +#ifdef __cplusplus +extern "C" { +#endif + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return JNI_ERR; + } + + io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;"); + illegal_access_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + + arrow_record_batch_builder_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/parquet/ArrowRecordBatchBuilder;"); + arrow_record_batch_builder_constructor = + GetMethodID(env, arrow_record_batch_builder_class, "", + "(I[Lorg/apache/arrow/adapter/parquet/ArrowFieldNodeBuilder;" + "[Lorg/apache/arrow/adapter/parquet/ArrowBufBuilder;)V"); + + arrow_field_node_builder_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/parquet/ArrowFieldNodeBuilder;"); + arrow_field_node_builder_constructor = + GetMethodID(env, arrow_field_node_builder_class, "", "(II)V"); + + arrowbuf_builder_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/parquet/ArrowBufBuilder;"); + arrowbuf_builder_constructor = + GetMethodID(env, arrowbuf_builder_class, "", "(JJIJ)V"); + + return JNI_VERSION; +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + std::cerr << "JNI_OnUnload" << std::endl; + JNIEnv* env; + vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + + env->DeleteGlobalRef(io_exception_class); + env->DeleteGlobalRef(illegal_access_exception_class); + env->DeleteGlobalRef(illegal_argument_exception_class); + + env->DeleteGlobalRef(arrow_field_node_builder_class); + env->DeleteGlobalRef(arrowbuf_builder_class); + env->DeleteGlobalRef(arrow_record_batch_builder_class); + + buffer_holder_.Clear(); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeOpenParquetReader( + JNIEnv* env, jobject obj, jstring path) { + std::string cpath = JStringToCString(env, path); + + jni::parquet::ParquetReader* reader = new jni::parquet::ParquetReader(cpath); + return (int64_t)reader; +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeInitParquetReader( + JNIEnv* env, jobject obj, jlong reader_ptr, jintArray column_indices, + jintArray row_group_indices, jlong batch_size, jboolean use_hdfs3) { + jni::parquet::ParquetReader* reader = (jni::parquet::ParquetReader*)reader_ptr; + + ::arrow::Status msg; + + int column_indices_len = env->GetArrayLength(column_indices); + jint* column_indices_ptr = env->GetIntArrayElements(column_indices, 0); + std::vector _column_indices(column_indices_ptr, + column_indices_ptr + column_indices_len); + + int row_group_indices_len = env->GetArrayLength(row_group_indices); + if (row_group_indices_len == 0) { + std::vector _row_group_indices = {}; + msg = reader->Initialize(_row_group_indices, _column_indices, batch_size, use_hdfs3); + } else { + jint* row_group_indices_ptr = env->GetIntArrayElements(row_group_indices, 0); + std::vector _row_group_indices(row_group_indices_ptr, + row_group_indices_ptr + row_group_indices_len); + msg = reader->Initialize(_row_group_indices, _column_indices, batch_size, use_hdfs3); + env->ReleaseIntArrayElements(row_group_indices, row_group_indices_ptr, JNI_ABORT); + } + if (!msg.ok()) { + std::string error_message = + "nativeInitParquetReader: failed to Initialize, err msg is " + msg.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + env->ReleaseIntArrayElements(column_indices, column_indices_ptr, JNI_ABORT); +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeInitParquetReader2( + JNIEnv* env, jobject obj, jlong reader_ptr, jintArray column_indices, jlong start_pos, + jlong end_pos, jlong batch_size, jboolean use_hdfs3) { + jni::parquet::ParquetReader* reader = (jni::parquet::ParquetReader*)reader_ptr; + + int column_indices_len = env->GetArrayLength(column_indices); + jint* column_indices_ptr = env->GetIntArrayElements(column_indices, 0); + std::vector _column_indices(column_indices_ptr, + column_indices_ptr + column_indices_len); + + ::arrow::Status msg; + msg = reader->Initialize(_column_indices, start_pos, end_pos, batch_size, use_hdfs3); + if (!msg.ok()) { + std::string error_message = + "nativeInitParquetReader: failed to Initialize, err msg is " + msg.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + env->ReleaseIntArrayElements(column_indices, column_indices_ptr, JNI_ABORT); +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeCloseParquetReader( + JNIEnv* env, jobject obj, jlong reader_ptr) { + jni::parquet::ParquetReader* reader = (jni::parquet::ParquetReader*)reader_ptr; + delete reader; +} + +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeReadNext( + JNIEnv* env, jobject obj, jlong reader_ptr) { + std::shared_ptr<::arrow::RecordBatch> record_batch; + jni::parquet::ParquetReader* reader = (jni::parquet::ParquetReader*)reader_ptr; + arrow::Status status = reader->ReadNext(&record_batch); + + if (!status.ok() || !record_batch) { + return nullptr; + } + + auto schema = reader->GetSchema(); + + jobjectArray field_array = + env->NewObjectArray(schema->num_fields(), arrow_field_node_builder_class, nullptr); + + std::vector> buffers; + for (int i = 0; i < schema->num_fields(); ++i) { + auto column = record_batch->column(i); + auto dataArray = column->data(); + jobject field = env->NewObject(arrow_field_node_builder_class, + arrow_field_node_builder_constructor, column->length(), + column->null_count()); + env->SetObjectArrayElement(field_array, i, field); + + for (auto& buffer : dataArray->buffers) { + buffers.push_back(buffer); + } + } + + jobjectArray arrowbuf_builder_array = + env->NewObjectArray(buffers.size(), arrowbuf_builder_class, nullptr); + + for (size_t j = 0; j < buffers.size(); ++j) { + auto buffer = buffers[j]; + jobject arrowBufBuilder = + env->NewObject(arrowbuf_builder_class, arrowbuf_builder_constructor, + buffer_holder_.Insert(buffer), buffer->data(), (int)buffer->size(), + buffer->capacity()); + env->SetObjectArrayElement(arrowbuf_builder_array, j, arrowBufBuilder); + } + + // create RecordBatch + jobject arrowRecordBatchBuilder = env->NewObject( + arrow_record_batch_builder_class, arrow_record_batch_builder_constructor, + record_batch->num_rows(), field_array, arrowbuf_builder_array); + return arrowRecordBatchBuilder; +} + +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeGetSchema( + JNIEnv* env, jobject obj, jlong reader_ptr) { + jni::parquet::ParquetReader* reader = (jni::parquet::ParquetReader*)reader_ptr; + std::shared_ptr<::arrow::Schema> schema = reader->GetSchema(); + std::shared_ptr out; + arrow::Status status = + arrow::ipc::SerializeSchema(*schema, nullptr, arrow::default_memory_pool(), &out); + if (!status.ok()) { + return nullptr; + } + + jbyteArray ret = env->NewByteArray(out->size()); + auto src = reinterpret_cast(out->data()); + env->SetByteArrayRegion(ret, 0, out->size(), src); + return ret; +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_AdaptorReferenceManager_nativeRelease( + JNIEnv* env, jobject this_obj, jlong id) { + buffer_holder_.Erase(id); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeOpenParquetWriter( + JNIEnv* env, jobject obj, jstring path, jbyteArray schemaBytes) { + int schemaBytes_len = env->GetArrayLength(schemaBytes); + jbyte* schemaBytes_data = env->GetByteArrayElements(schemaBytes, 0); + + auto serialized_schema = + std::make_shared((uint8_t*)schemaBytes_data, schemaBytes_len); + arrow::ipc::DictionaryMemo in_memo; + std::shared_ptr schema; + arrow::io::BufferReader buf_reader(serialized_schema); + + arrow::Status msg = arrow::ipc::ReadSchema(&buf_reader, &in_memo, &schema); + if (!msg.ok()) { + std::string error_message = + "nativeOpenParquetWriter: failed to readSchema, err msg is " + msg.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + std::string cpath = JStringToCString(env, path); + jni::parquet::ParquetWriter* writer = new jni::parquet::ParquetWriter(cpath, schema); + + env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT); + return (int64_t)writer; +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeInitParquetWriter( + JNIEnv* env, jobject obj, jlong writer_ptr, jboolean use_hdfs3, jint rep) { + jni::parquet::ParquetWriter* writer = (jni::parquet::ParquetWriter*)writer_ptr; + ::arrow::Status msg = writer->Initialize(use_hdfs3, rep); + if (!msg.ok()) { + std::string error_message = + "nativeInitParquetWriter: failed to Initialize, err msg is " + msg.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeCloseParquetWriter( + JNIEnv* env, jobject obj, jlong writer_ptr) { + jni::parquet::ParquetWriter* writer = (jni::parquet::ParquetWriter*)writer_ptr; + arrow::Status msg = writer->Flush(); + if (!msg.ok()) { + std::string error_message = + "nativeCloseParquetWriter: failed to Flush, err msg is " + msg.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + delete writer; +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeWriteNext( + JNIEnv* env, jobject obj, jlong writer_ptr, jint numRows, jlongArray bufAddrs, + jlongArray bufSizes) { + int in_bufs_len = env->GetArrayLength(bufAddrs); + if (in_bufs_len != env->GetArrayLength(bufSizes)) { + std::string error_message = + "nativeWriteNext: mismatch in arraylen of buf_addrs and buf_sizes"; + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + jlong* in_buf_addrs = env->GetLongArrayElements(bufAddrs, 0); + jlong* in_buf_sizes = env->GetLongArrayElements(bufSizes, 0); + + jni::parquet::ParquetWriter* writer = (jni::parquet::ParquetWriter*)writer_ptr; + arrow::Status msg = writer->WriteNext(numRows, in_buf_addrs, in_buf_sizes, in_bufs_len); + + if (!msg.ok()) { + std::string error_message = "nativeWriteNext: failed, err msg is " + msg.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + env->ReleaseLongArrayElements(bufAddrs, in_buf_addrs, JNI_ABORT); + env->ReleaseLongArrayElements(bufSizes, in_buf_sizes, JNI_ABORT); +} + +#ifdef __cplusplus +} +#endif diff --git a/cpp/src/jni/parquet/parquet_reader.cc b/cpp/src/jni/parquet/parquet_reader.cc new file mode 100644 index 00000000000..882d42a4ca1 --- /dev/null +++ b/cpp/src/jni/parquet/parquet_reader.cc @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jni/parquet/parquet_reader.h" + +#include +#include +#include +#include +#include "jni/parquet/file_connector.h" +#include "jni/parquet/hdfs_connector.h" + +namespace jni { +namespace parquet { + +ParquetReader::ParquetReader(std::string path) + : pool_(::arrow::default_memory_pool()), properties_(false) { + if (path.find("hdfs") != std::string::npos) { + connector_ = new HdfsConnector(path); + } else { + connector_ = new FileConnector(path); + } +} + +ParquetReader::~ParquetReader() { + connector_->TearDown(); + delete connector_; +} + +::arrow::Status ParquetReader::Initialize(const std::vector& column_indices, + const std::vector& row_group_indices, + int64_t batch_size, bool use_hdfs3) { + ::arrow::Status msg; + msg = connector_->OpenReadable(use_hdfs3); + if (!msg.ok()) { + std::cerr << "Create connector failed, error msg: " << msg << std::endl; + return msg; + } + properties_.set_batch_size(batch_size); + + msg = ::parquet::arrow::FileReader::Make( + pool_, ::parquet::ParquetFileReader::Open(connector_->GetReader()), properties_, + &arrow_reader_); + if (!msg.ok()) { + std::cerr << "Open parquet file failed, error msg: " << msg << std::endl; + return msg; + } + + msg = GetRecordBatchReader(row_group_indices, column_indices); + if (!msg.ok()) { + std::cerr << "GetRecordBatchReader failed, error msg: " << msg << std::endl; + return msg; + } + return msg; +} + +::arrow::Status ParquetReader::Initialize(const std::vector& column_indices, + int64_t start_pos, int64_t end_pos, + int64_t batch_size, bool use_hdfs3) { + ::arrow::Status msg; + msg = connector_->OpenReadable(use_hdfs3); + if (!msg.ok()) { + std::cerr << "Create connector failed, error msg: " << msg << std::endl; + return msg; + } + properties_.set_batch_size(batch_size); + + msg = ::parquet::arrow::FileReader::Make( + pool_, ::parquet::ParquetFileReader::Open(connector_->GetReader()), properties_, + &arrow_reader_); + if (!msg.ok()) { + std::cerr << "Open parquet file failed, error msg: " << msg << std::endl; + return msg; + } + + std::vector row_group_indices = + GetRowGroupIndices(arrow_reader_->num_row_groups(), start_pos, end_pos); + msg = GetRecordBatchReader(row_group_indices, column_indices); + if (!msg.ok()) { + std::cerr << "GetRecordBatchReader failed, error msg: " << msg << std::endl; + return msg; + } + return msg; +} + +std::vector ParquetReader::GetRowGroupIndices(int num_row_groups, int64_t start_pos, + int64_t end_pos) { + std::unique_ptr<::parquet::ParquetFileReader> reader = + ::parquet::ParquetFileReader::Open(connector_->GetReader()); + std::vector row_group_indices; + int64_t pos = 0; + for (int i = 0; i < num_row_groups; i++) { + if (pos >= start_pos && pos < end_pos) { + row_group_indices.push_back(i); + break; + } + pos += reader->RowGroup(i)->metadata()->total_byte_size(); + } + if (row_group_indices.empty()) { + row_group_indices.push_back(num_row_groups - 1); + } + return row_group_indices; +} + +::arrow::Status ParquetReader::GetRecordBatchReader( + const std::vector& row_group_indices, const std::vector& column_indices) { + if (column_indices.empty()) { + return arrow_reader_->GetRecordBatchReader(row_group_indices, &rb_reader_); + } else { + return arrow_reader_->GetRecordBatchReader(row_group_indices, column_indices, + &rb_reader_); + } +} + +::arrow::Status ParquetReader::ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) { + std::lock_guard lck(thread_mtx_); + return rb_reader_->ReadNext(out); +} + +std::shared_ptr<::arrow::Schema> ParquetReader::GetSchema() { + return rb_reader_->schema(); +} + +} // namespace parquet +} // namespace jni diff --git a/cpp/src/jni/parquet/parquet_reader.h b/cpp/src/jni/parquet/parquet_reader.h new file mode 100644 index 00000000000..f7c098c28aa --- /dev/null +++ b/cpp/src/jni/parquet/parquet_reader.h @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_READER_H +#define PARQUET_READER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include "jni/parquet/file_connector.h" +#include "jni/parquet/hdfs_connector.h" + +namespace jni { +namespace parquet { + +/// \brief An Reader instance of one parquet file +/// +/// This class is used by jni_wrapper to hold a reader handler for +/// continuous record batch reading. +class ParquetReader { + public: + /// \brief Construction of ParquetReader + /// \param[in] path ParquetReader will open difference connector according to file path + explicit ParquetReader(std::string path); + + ~ParquetReader(); + + /// \brief Initialization of ParquetReader + /// \param[in] column_indices indexes of columns expected to be read + /// \param[in] row_group_indices indexes of row_groups expected to be read + /// \param[in] batch_size batch size, default is 4096 + /// \param[in] use_hdfs3 option used by HdfsConnector + ::arrow::Status Initialize(const std::vector& column_indices, + const std::vector& row_group_indices, + int64_t batch_size, bool use_hdfs3 = true); + + /// \brief Initialization of ParquetReader + /// \param[in] column_indices indexes of columns expected to be read + /// \param[in] start_pos use offset to indicate which row_group is expected + /// \param[in] end_pos use offset to indicate which row_group is expected + /// \param[in] batch_size batch size, default is 4096 + /// \param[in] use_hdfs3 option used by HdfsConnector + ::arrow::Status Initialize(const std::vector& column_indices, int64_t start_pos, + int64_t end_pos, int64_t batch_size, bool use_hdfs3 = true); + + /// \brief Read next batch + /// \param[out] out readed batch will be returned as RecordBatch + ::arrow::Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out); + + /// \brief Get Parquet Schema + std::shared_ptr<::arrow::Schema> GetSchema(); + + Connector* connector_; + + private: + ::arrow::MemoryPool* pool_; + std::mutex thread_mtx_; + + std::unique_ptr<::parquet::arrow::FileReader> arrow_reader_; + ::parquet::ArrowReaderProperties properties_; + std::shared_ptr<::arrow::RecordBatchReader> rb_reader_; + + std::vector GetRowGroupIndices(int num_row_groups, int64_t start_pos, + int64_t end_pos); + + ::arrow::Status GetRecordBatchReader(const std::vector& row_group_indices, + const std::vector& column_indices); +}; +} // namespace parquet +} // namespace jni + +#endif diff --git a/cpp/src/jni/parquet/parquet_writer.cc b/cpp/src/jni/parquet/parquet_writer.cc new file mode 100644 index 00000000000..e8b320dd53a --- /dev/null +++ b/cpp/src/jni/parquet/parquet_writer.cc @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "jni/parquet/parquet_writer.h" + +#include +#include +#include +#include "jni/parquet/file_connector.h" +#include "jni/parquet/hdfs_connector.h" + +namespace jni { +namespace parquet { + +ParquetWriter::ParquetWriter(std::string path, + const std::shared_ptr<::arrow::Schema>& schema) + : pool_(::arrow::default_memory_pool()), schema(schema) { + if (path.find("hdfs") != std::string::npos) { + connector_ = new HdfsConnector(path); + } else { + connector_ = new FileConnector(path); + } +} + +ParquetWriter::~ParquetWriter() { + if (arrow_writer_) { + arrow_writer_->Close(); + } + connector_->TearDown(); + delete connector_; +} + +::arrow::Status ParquetWriter::Initialize(bool use_hdfs3, int replication) { + ::arrow::Status msg; + msg = connector_->OpenWritable(use_hdfs3, replication); + if (!msg.ok()) { + std::cerr << "Create connector_ failed, error msg: " << msg << std::endl; + return msg; + } + + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::default_writer_properties(); + msg = ::parquet::arrow::ToParquetSchema(schema.get(), *properties.get(), + &schema_description_); + if (!msg.ok()) { + std::cerr << "Convert Arrow ::arrow::Schema to " + << "Parquet ::arrow::Schema failed, error msg: " << msg << std::endl; + return msg; + } + + ::parquet::schema::NodeVector group_node_fields; + for (int i = 0; i < schema_description_->group_node()->field_count(); i++) { + group_node_fields.push_back(schema_description_->group_node()->field(i)); + } + auto parquet_schema = std::static_pointer_cast<::parquet::schema::GroupNode>( + ::parquet::schema::GroupNode::Make(schema_description_->schema_root()->name(), + schema_description_->schema_root()->repetition(), + group_node_fields)); + + msg = ::parquet::arrow::FileWriter::Make( + pool_, ::parquet::ParquetFileWriter::Open(connector_->GetWriter(), parquet_schema), + schema, ::parquet::default_arrow_writer_properties(), &arrow_writer_); + if (!msg.ok()) { + std::cerr << "Open parquet file failed, error msg: " << msg << std::endl; + return msg; + } + return msg; +} + +::arrow::Status ParquetWriter::WriteNext(int num_rows, int64_t* in_buf_addrs, + int64_t* in_buf_sizes, int in_bufs_len) { + std::shared_ptr<::arrow::RecordBatch> batch; + ::arrow::Status msg = + MakeRecordBatch(schema, num_rows, in_buf_addrs, in_buf_sizes, in_bufs_len, &batch); + if (!msg.ok()) { + return msg; + } + + std::lock_guard lck(thread_mtx_); + record_batch_buffer_list_.push_back(batch); + + return msg; +} + +::arrow::Status ParquetWriter::Flush() { + std::shared_ptr<::arrow::Table> table; + ::arrow::Status msg = + ::arrow::Table::FromRecordBatches(record_batch_buffer_list_, &table); + if (!msg.ok()) { + std::cerr << "Table::FromRecordBatches failed" << std::endl; + return msg; + } + + msg = arrow_writer_->WriteTable(*table.get(), table->num_rows()); + if (!msg.ok()) { + std::cerr << "arrow_writer_->WriteTable failed" << std::endl; + return msg; + } + + msg = connector_->GetWriter()->Flush(); + if (!msg.ok()) { + std::cerr << "ParquetWriter::Flush() failed" << std::endl; + return msg; + } + return msg; +} + +::arrow::Status ParquetWriter::WriteNext( + const std::shared_ptr<::arrow::RecordBatch>& rb) { + std::lock_guard lck(thread_mtx_); + record_batch_buffer_list_.push_back(rb); + return ::arrow::Status::OK(); +} + +::arrow::Status ParquetWriter::MakeRecordBatch( + const std::shared_ptr<::arrow::Schema>& schema, int num_rows, int64_t* in_buf_addrs, + int64_t* in_buf_sizes, int in_bufs_len, std::shared_ptr* batch) { + std::vector> arrays; + auto num_fields = schema->num_fields(); + int buf_idx = 0; + int sz_idx = 0; + + for (int i = 0; i < num_fields; i++) { + auto field = schema->field(i); + std::vector> buffers; + + if (buf_idx >= in_bufs_len) { + return ::arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + int64_t validity_addr = in_buf_addrs[buf_idx++]; + int64_t validity_size = in_buf_sizes[sz_idx++]; + auto validity = std::shared_ptr<::arrow::Buffer>( + new ::arrow::Buffer(reinterpret_cast(validity_addr), validity_size)); + buffers.push_back(validity); + + if (buf_idx >= in_bufs_len) { + return ::arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + int64_t value_addr = in_buf_addrs[buf_idx++]; + int64_t value_size = in_buf_sizes[sz_idx++]; + auto data = std::shared_ptr<::arrow::Buffer>( + new ::arrow::Buffer(reinterpret_cast(value_addr), value_size)); + buffers.push_back(data); + + if (arrow::is_binary_like(field->type()->id())) { + if (buf_idx >= in_bufs_len) { + return ::arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + + // add offsets buffer for variable-len fields. + int64_t offsets_addr = in_buf_addrs[buf_idx++]; + int64_t offsets_size = in_buf_sizes[sz_idx++]; + auto offsets = std::shared_ptr<::arrow::Buffer>( + new ::arrow::Buffer(reinterpret_cast(offsets_addr), offsets_size)); + buffers.push_back(offsets); + } + + auto array_data = + ::arrow::ArrayData::Make(field->type(), num_rows, std::move(buffers)); + arrays.push_back(array_data); + } + *batch = arrow::RecordBatch::Make(schema, num_rows, arrays); + return ::arrow::Status::OK(); +} + +} // namespace parquet +} // namespace jni diff --git a/cpp/src/jni/parquet/parquet_writer.h b/cpp/src/jni/parquet/parquet_writer.h new file mode 100644 index 00000000000..741ffb278e9 --- /dev/null +++ b/cpp/src/jni/parquet/parquet_writer.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_WRITER_H +#define PARQUET_WRITER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "jni/parquet/file_connector.h" +#include "jni/parquet/hdfs_connector.h" + +namespace jni { +namespace parquet { + +/// \brief An Writer instance of one parquet file +/// +/// This class is used by jni_wrapper to hold a writer handler for +/// continuous record batch writing. +class ParquetWriter { + public: + /// \brief Construction of ParquetWriter + /// \param[in] path ParquetWriter will open difference connector according to file path + /// \param[in] schema Open writable parquet handler requires Parquet Schema as input + ParquetWriter(std::string path, const std::shared_ptr<::arrow::Schema>& schema); + + ~ParquetWriter(); + + /// \brief Initialization of ParquetWriter + /// \param[in] use_hdfs3 option used by HdfsConnector + /// \param[in] replication option used by HdfsConnector + ::arrow::Status Initialize(bool use_hdfs3 = true, int replication = 1); + + /// \brief Write Next record batch + /// \param[in] num_rows rows number in this RecordBatch + /// \param[in] in_buf_addrs buffer addr list in this RecordBatch + /// \param[in] in_buf_sizes buffer size list in this RecordBatch + /// \param[in] in_bufs_len buffer list length in this RecordBatch + /// + /// RecordBatch will be only written to cache + ::arrow::Status WriteNext(int num_rows, int64_t* in_buf_addrs, int64_t* in_buf_sizes, + int in_bufs_len); + + /// \brief Write Next record batch + /// \param[in] rb Next RecordBatch + /// + /// RecordBatch will be only written to cache + ::arrow::Status WriteNext(const std::shared_ptr<::arrow::RecordBatch>& rb); + + /// \brief Flush cached recordBatches as one table + ::arrow::Status Flush(); + + private: + ::arrow::MemoryPool* pool_; + Connector* connector_; + std::mutex thread_mtx_; + std::unique_ptr<::parquet::arrow::FileWriter> arrow_writer_; + const std::shared_ptr<::arrow::Schema> schema; + std::shared_ptr<::parquet::SchemaDescriptor> schema_description_; + std::vector> record_batch_buffer_list_; + + ::arrow::Status MakeRecordBatch(const std::shared_ptr<::arrow::Schema>& schema, + int num_rows, int64_t* in_buf_addrs, + int64_t* in_buf_sizes, int in_bufs_len, + std::shared_ptr<::arrow::RecordBatch>* batch); +}; +} // namespace parquet +} // namespace jni + +#endif diff --git a/java/adapter/parquet/CMakeLists.txt b/java/adapter/parquet/CMakeLists.txt new file mode 100644 index 00000000000..829a93c54e7 --- /dev/null +++ b/java/adapter/parquet/CMakeLists.txt @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_parquet_java +# + +# Headers: top level + +project(arrow_parquet_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar( + arrow_parquet_java + src/main/java/org/apache/arrow/adapter/parquet/ArrowBufBuilder.java + src/main/java/org/apache/arrow/adapter/parquet/ArrowFieldNodeBuilder.java + src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilder.java + GENERATE_NATIVE_HEADERS arrow_parquet_java-native + DESTINATION ${JNI_HEADERS_DIR} +) diff --git a/java/adapter/parquet/pom.xml b/java/adapter/parquet/pom.xml new file mode 100644 index 00000000000..dcae27d8178 --- /dev/null +++ b/java/adapter/parquet/pom.xml @@ -0,0 +1,66 @@ + + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 1.0.0-SNAPSHOT + ../../pom.xml + + org.apache.arrow.parquet + arrow-parquet + Arrow Parquet Adapter + (Experimental/Contrib)A JNI wrapper for the C++ Parquet reader implementation. + jar + + + + org.apache.arrow + arrow-memory + ${project.version} + compile + + + org.apache.arrow + arrow-vector + ${project.version} + compile + + + + + ../../../cpp/release-build/release + + + + + ${arrow.cpp.build.dir} + + **/libarrow_parquet_jni.* + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + UTC + + + + + + diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/AdaptorReferenceManager.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/AdaptorReferenceManager.java new file mode 100644 index 00000000000..083b2404524 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/AdaptorReferenceManager.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; +import java.lang.UnsupportedOperationException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OwnershipTransferResult; +import org.apache.arrow.memory.ReferenceManager; +import org.apache.arrow.util.Preconditions; + +import io.netty.buffer.ArrowBuf; + +/** + * A simple reference manager implementation for memory allocated by native code. + * The underlying memory will be released when reference count reach zero. + */ +public class AdaptorReferenceManager implements ReferenceManager { + private native void nativeRelease(long nativeMemoryHolder); + + private final AtomicInteger bufRefCnt = new AtomicInteger(0); + private long nativeMemoryHolder; + private int size = 0; + + AdaptorReferenceManager(long nativeMemoryHolder, int size) + throws IOException { + ParquetJniUtils.getInstance(); + this.nativeMemoryHolder = nativeMemoryHolder; + this.size = size; + } + + @Override + public int getRefCount() { + return bufRefCnt.get(); + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + Preconditions.checkState(decrement >= 1, + "ref count decrement should be greater than or equal to 1"); + // decrement the ref count + final int refCnt; + synchronized (this) { + refCnt = bufRefCnt.addAndGet(-decrement); + if (refCnt == 0) { + // refcount of this reference manager has dropped to 0 + // release the underlying memory + nativeRelease(nativeMemoryHolder); + } + } + // the new ref count should be >= 0 + Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); + return refCnt == 0; + } + + @Override + public void retain() { + retain(1); + } + + @Override + public void retain(int increment) { + Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment); + bufRefCnt.addAndGet(increment); + } + + @Override + public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { + retain(); + return srcBuffer; + } + + @Override + public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, int index, int length) { + final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; + + // create new ArrowBuf + final ArrowBuf derivedBuf = new ArrowBuf( + this, + null, + length, + derivedBufferAddress, + false); + + return derivedBuf; + } + + @Override + public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { + throw new UnsupportedOperationException(); + } + + @Override + public BufferAllocator getAllocator() { + return null; + } + + @Override + public int getSize() { + return size; + } + + @Override + public int getAccountedSize() { + return 0; + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowBufBuilder.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowBufBuilder.java new file mode 100644 index 00000000000..cc2a8c723ca --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowBufBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +/** + * ArrowBufBuilder. + */ +public class ArrowBufBuilder { + + public long nativeInstanceId; + public long memoryAddress; + public int size; + public long capacity; + + /** + * Create an instance for ArrowBufBuilder. + * @param nativeInstanceId native ArrowBuf holder. + * @param memoryAddress native ArrowBuf data addr. + * @param size ArrowBuf size. + * @param capacity ArrowBuf rowNums. + */ + public ArrowBufBuilder(long nativeInstanceId, long memoryAddress, int size, long capacity) { + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + this.nativeInstanceId = nativeInstanceId; + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowFieldNodeBuilder.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowFieldNodeBuilder.java new file mode 100644 index 00000000000..02bee8d5109 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowFieldNodeBuilder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +/** + * Wrapper for ArrowFieldNodeBuilder. + */ +public class ArrowFieldNodeBuilder { + + public int length; + public int nullCount; + + /** + * Create an instance of ArrowFieldNodeBuilder. + * @param length numRows in this Field + * @param nullCount numCount in this Fields + */ + public ArrowFieldNodeBuilder(int length, int nullCount) { + this.length = length; + this.nullCount = nullCount; + } + +} + diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilder.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilder.java new file mode 100644 index 00000000000..73c8363049f --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +/** + * ArrowRecordBatchBuilder. + */ +public class ArrowRecordBatchBuilder { + + public int length; + public ArrowFieldNodeBuilder[] nodeBuilders; + public ArrowBufBuilder[] bufferBuilders; + + /** + * Create an instance to wrap native parameters for ArrowRecordBatchBuilder. + * @param length ArrowRecordBatch rowNums. + * @param nodeBuilders an Array hold ArrowFieldNodeBuilder. + * @param bufferBuilders an Array hold ArrowBufBuilder. + */ + public ArrowRecordBatchBuilder( + int length, ArrowFieldNodeBuilder[] nodeBuilders, ArrowBufBuilder[] bufferBuilders) { + this.length = length; + this.nodeBuilders = nodeBuilders; + this.bufferBuilders = bufferBuilders; + } + +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilderImpl.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilderImpl.java new file mode 100644 index 00000000000..36f2bfa0f01 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ArrowRecordBatchBuilderImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import io.netty.buffer.ArrowBuf; + +/** + * ArrowRecordBatchBuilderImpl used to wrap native returned data into an ArrowRecordBatch. + */ +public class ArrowRecordBatchBuilderImpl { + + private int length; + private ArrowRecordBatchBuilder recordBatchBuilder; + + /** + * Create ArrowRecordBatchBuilderImpl instance from ArrowRecordBatchBuilder. + * @param recordBatchBuilder ArrowRecordBatchBuilder instance. + */ + public ArrowRecordBatchBuilderImpl(ArrowRecordBatchBuilder recordBatchBuilder) { + this.recordBatchBuilder = recordBatchBuilder; + } + + /** + * Build ArrowRecordBatch from ArrowRecordBatchBuilder instance. + * @throws IOException throws exception + */ + public ArrowRecordBatch build() throws IOException { + if (recordBatchBuilder.length == 0) { + return null; + } + + List nodes = new ArrayList(); + for (ArrowFieldNodeBuilder tmp : recordBatchBuilder.nodeBuilders) { + nodes.add(new ArrowFieldNode(tmp.length, tmp.nullCount)); + } + + List buffers = new ArrayList(); + for (ArrowBufBuilder tmp : recordBatchBuilder.bufferBuilders) { + AdaptorReferenceManager referenceManager = + new AdaptorReferenceManager(tmp.nativeInstanceId, tmp.size); + buffers.add(new ArrowBuf(referenceManager, null, tmp.size, tmp.memoryAddress, false)); + } + return new ArrowRecordBatch(recordBatchBuilder.length, nodes, buffers); + } + +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetJniUtils.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetJniUtils.java new file mode 100644 index 00000000000..0f4207b04bf --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetJniUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +/** + * Helper class for JNI related operations. + */ +class ParquetJniUtils { + private static final String LIBRARY_NAME = "arrow_parquet_jni"; + private static boolean isLoaded = false; + private static volatile ParquetJniUtils INSTANCE; + + static ParquetJniUtils getInstance() throws IOException { + if (INSTANCE == null) { + synchronized (ParquetJniUtils.class) { + if (INSTANCE == null) { + try { + INSTANCE = new ParquetJniUtils(); + } catch (IllegalAccessException ex) { + throw new IOException("IllegalAccess"); + } + } + } + } + + return INSTANCE; + } + + private ParquetJniUtils() throws IOException, IllegalAccessException { + loadParquetAdapterLibraryFromJar(); + } + + static void loadParquetAdapterLibraryFromJar() + throws IOException, IllegalAccessException { + synchronized (ParquetJniUtils.class) { + if (!isLoaded) { + final String libraryToLoad = System.mapLibraryName(LIBRARY_NAME); + final File libraryFile = moveFileFromJarToTemp( + System.getProperty("java.io.tmpdir"), libraryToLoad); + System.load(libraryFile.getAbsolutePath()); + isLoaded = true; + } + } + } + + private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad) + throws IOException { + final File temp = File.createTempFile(tmpDir, libraryToLoad); + try (final InputStream is = ParquetJniUtils.class.getClassLoader() + .getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } else { + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + return temp; + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReader.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReader.java new file mode 100644 index 00000000000..9006b654436 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReader.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Wrapper for Parquet Reader native API. + */ +public class ParquetReader { + private ParquetReaderJniWrapper wrapper; + private long parquetReaderHandler; + private long lastReadLength; + + /** + * Create an instance for ParquetReader. + * @param wrapper An holder of ParquetReaderJniWrapper. + * @param path Parquet Reader File Path. + * @param rowGroupIndices An array to indicate which rowGroup to read. + * @param columnIndices An array to indicate which column to read. + * @param batchSize how many rows will be read in one batch. + * @param useHdfs3 A flag to tell if ArrowParquetReader should use hdfs3. + * @throws IOException throws exception in case of io issues. + */ + public ParquetReader(ParquetReaderJniWrapper wrapper, String path, int[] rowGroupIndices, + int[] columnIndices, long batchSize, boolean useHdfs3) throws IOException { + this.wrapper = wrapper; + parquetReaderHandler = wrapper.openParquetFile(path, rowGroupIndices, columnIndices, + batchSize, useHdfs3); + } + + /** + * Create an instance for ParquetReader. + * @param wrapper An holder of ParquetReaderJniWrapper. + * @param path Parquet Reader File Path. + * @param rowGroupIndices An array to indicate which rowGroup to read. + * @param columnIndices An array to indicate which column to read. + * @param batchSize how many rows will be read in one batch. + * @throws IOException throws exception in case of io issues. + */ + public ParquetReader(ParquetReaderJniWrapper wrapper, String path, int[] rowGroupIndices, + int[] columnIndices, long batchSize) throws IOException { + this.wrapper = wrapper; + parquetReaderHandler = wrapper.openParquetFile(path, rowGroupIndices, columnIndices, + batchSize, true); + } + + /** + * Create an instance for ParquetReader. + * @param wrapper An holder of ParquetReaderJniWrapper. + * @param path Parquet Reader File Path. + * @param columnIndices An array to indicate which column to read. + * @param startPos A start position to indicate rowGroup. + * @param endPos A end position to indicate rowGroup. + * @param batchSize how many rows will be read in one batch. + * @param useHdfs3 A flag to tell if ArrowParquetReader should use hdfs3. + * @throws IOException throws exception in case of io issues. + */ + public ParquetReader(ParquetReaderJniWrapper wrapper, String path, int[] columnIndices, + long startPos, long endPos, long batchSize, boolean useHdfs3) + throws IOException { + this.wrapper = wrapper; + parquetReaderHandler = wrapper.openParquetFile(path, columnIndices, startPos, endPos, + batchSize, useHdfs3); + } + + /** + * Create an instance for ParquetReader. + * @param wrapper An holder of ParquetReaderJniWrapper. + * @param path Parquet Reader File Path. + * @param columnIndices An array to indicate which column to read. + * @param startPos A start position to indicate rowGroup. + * @param endPos A end position to indicate rowGroup. + * @param batchSize how many rows will be read in one batch. + * @throws IOException throws exception in case of io issues. + */ + public ParquetReader(ParquetReaderJniWrapper wrapper, String path, int[] columnIndices, + long startPos, long endPos, long batchSize) throws IOException { + this.wrapper = wrapper; + parquetReaderHandler = wrapper.openParquetFile(path, columnIndices, startPos, endPos, + batchSize, true); + } + + /** + * close native ParquetReader Instance. + */ + public void close() { + wrapper.closeParquetFile(parquetReaderHandler); + } + + /** + * Read Next ArrowRecordBatch from ParquetReader. + * @return readed ArrowRecordBatch. + * @throws IOException throws exception in case of io issues. + */ + public ArrowRecordBatch readNext() throws IOException { + ArrowRecordBatch batch = wrapper.readNext(parquetReaderHandler); + if (batch == null) { + return null; + } + lastReadLength = batch.getLength(); + return batch; + } + + /** + * Get Arrow Schema from ParquetReader. + * @return Arrow Schema. + * @throws IOException throws exception in case of io issues. + */ + public Schema getSchema() throws IOException { + return wrapper.getSchema(parquetReaderHandler); + } + + /** + * Read Next ValueVectorList from ParquetReader. + * @return readed ValueVectorList. + * @throws IOException throws exception in case of io issues. + */ + public List readNextVectors(VectorSchemaRoot root) throws IOException { + ArrowRecordBatch batch = readNext(); + if (batch == null) { + return null; + } + VectorLoader loader = new VectorLoader(root); + loader.load(batch); + batch.close(); + return root.getFieldVectors(); + } + + /** + * Get last readed ArrowRecordBatch Length. + * @return lastReadLength. + */ + public long lastReadLength() { + return lastReadLength; + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReaderJniWrapper.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReaderJniWrapper.java new file mode 100644 index 00000000000..378ef8dee6d --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReaderJniWrapper.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageResult; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +/** + * Wrapper for Parquet Reader native API. + */ +public class ParquetReaderJniWrapper { + private native long nativeOpenParquetReader(String path); + + private native void nativeInitParquetReader(long nativeHandler, int[] columnIndices, + int[] rowGroupIndices, long batchSize, + boolean useHdfs3); + + private native void nativeInitParquetReader2(long nativeHandler, int[] columnIndices, + long startPos, long endPos, + long batchSize, boolean useHdfs3); + + private native void nativeCloseParquetReader(long nativeHandler); + + private native ArrowRecordBatchBuilder nativeReadNext(long nativeHandler); + + private native byte[] nativeGetSchema(long nativeHandler); + + private BufferAllocator allocator; + + /** + * Create an instance for ParquetReaderJniWrapper. + * @param allocator An pre-created BufferAllocator. + */ + public ParquetReaderJniWrapper(BufferAllocator allocator) + throws IOException { + ParquetJniUtils.getInstance(); + this.allocator = allocator; + } + + /** + * Open Parquet File as ParquetReader. + * @param path Parquet Reader File Path. + * @param rowGroupIndices An array to indicate which rowGroup to read. + * @param columnIndices An array to indicate which column to read. + * @param batchSize how many rows will be read in one batch. + * @param useHdfs3 A flag to tell if ArrowParquetReader should use hdfs3. + * @return native ParquetReader handler. + * @throws IOException throws exception in case of io issues. + */ + long openParquetFile(String path, int[] rowGroupIndices, int[] columnIndices, long batchSize, + boolean useHdfs3) throws IOException { + long nativeHandler = nativeOpenParquetReader(path); + nativeInitParquetReader(nativeHandler, columnIndices, rowGroupIndices, batchSize, useHdfs3); + return nativeHandler; + } + + /** + * Open Parquet File as ParquetReader. + * @param path Parquet Reader File Path. + * @param columnIndices An array to indicate which column to read. + * @param startPos A start position to indicate rowGroup. + * @param endPos A end position to indicate rowGroup. + * @param batchSize how many rows will be read in one batch. + * @param useHdfs3 A flag to tell if ArrowParquetReader should use hdfs3. + * @return native ParquetReader handler. + * @throws IOException throws exception in case of io issues. + */ + long openParquetFile(String path, int[] columnIndices, long startPos, long endPos, + long batchSize, boolean useHdfs3) throws IOException { + long nativeHandler = nativeOpenParquetReader(path); + nativeInitParquetReader2(nativeHandler, columnIndices, startPos, endPos, batchSize, useHdfs3); + return nativeHandler; + } + + /** + * close native ParquetReader Instance. + * @param nativeHandler native ParquetReader handler. + */ + void closeParquetFile(long nativeHandler) { + nativeCloseParquetReader(nativeHandler); + } + + /** + * Get Arrow Schema from ParquetReader. + * @param nativeHandler native ParquetReader handler. + * @return Arrow Schema. + * @throws IOException throws exception in case of io issues. + */ + Schema getSchema(long nativeHandler) throws IOException { + byte[] schemaBytes = nativeGetSchema(nativeHandler); + + try (MessageChannelReader schemaReader = + new MessageChannelReader( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel(schemaBytes)), allocator)) { + + MessageResult result = schemaReader.readNext(); + if (result == null) { + throw new IOException("Unexpected end of input. Missing schema."); + } + + return MessageSerializer.deserializeSchema(result.getMessage()); + } + } + + /** + * Read Next ArrowRecordBatch from ParquetReader. + * @param nativeHandler native ParquetReader handler. + * @return readed ArrowRecordBatch. + * @throws IOException throws exception in case of io issues. + */ + ArrowRecordBatch readNext(long nativeHandler) throws IOException { + ArrowRecordBatchBuilder recordBatchBuilder = nativeReadNext(nativeHandler); + ArrowRecordBatchBuilderImpl recordBatchBuilderImpl = + new ArrowRecordBatchBuilderImpl(recordBatchBuilder); + if (recordBatchBuilder == null) { + return null; + } + return recordBatchBuilderImpl.build(); + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriter.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriter.java new file mode 100644 index 00000000000..03227f9ae03 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; + +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Wrapper for Parquet Writer native API. + */ +public class ParquetWriter { + + private long parquetWriterHandler; + private ParquetWriterJniWrapper wrapper; + + /** + * Open native ParquetWriter Instance. + * @param wrapper ParquetWriterJniWrapper instance. + * @param path Parquet File Path to write. + * @param schema arrow schema to initialize Parquet file. + * @param useHdfs3 A flag to tell if ArrowParquetReader should use hdfs3. + * @param rep Replication num for Hdfs Write. + * @throws IOException throws exception in case of io issues. + */ + public ParquetWriter(ParquetWriterJniWrapper wrapper, String path, Schema schema, + boolean useHdfs3, int rep) throws IOException { + this.wrapper = wrapper; + parquetWriterHandler = wrapper.openParquetFile(path, schema, useHdfs3, rep); + } + + /** + * Open native ParquetWriter Instance. + * @param wrapper ParquetWriterJniWrapper instance. + * @param path Parquet File Path to write. + * @param schema arrow schema to initialize Parquet file. + * @throws IOException throws exception in case of io issues. + */ + public ParquetWriter(ParquetWriterJniWrapper wrapper, String path, Schema schema) + throws IOException { + this.wrapper = wrapper; + parquetWriterHandler = wrapper.openParquetFile(path, schema, true, 1); + } + + /** + * close native ParquetWriter Instance. + * @throws IOException throws exception in case of io issues. + */ + public void close() throws IOException { + wrapper.closeParquetFile(parquetWriterHandler); + } + + /** + * Write Next ArrowRecordBatch to ParquetWriter. + * @param recordBatch next ArrowRecordBatch to write. + * @throws IOException throws exception in case of io issues. + */ + public void writeNext(ArrowRecordBatch recordBatch) throws IOException { + wrapper.writeNext(parquetWriterHandler, recordBatch); + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriterJniWrapper.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriterJniWrapper.java new file mode 100644 index 00000000000..c162b431bd2 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriterJniWrapper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.arrow.adapter.parquet; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.List; + +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.ArrowBuffer; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; + +import io.netty.buffer.ArrowBuf; + +/** + * Wrapper for Parquet Writer native API. + */ +public class ParquetWriterJniWrapper { + private native long nativeOpenParquetWriter(String path, byte[] schemaBytes); + + private native void nativeInitParquetWriter(long nativeHandler, boolean useHdfs3, int rep); + + private native void nativeCloseParquetWriter(long nativeHandler); + + private native void nativeWriteNext( + long nativeHandler, int numRows, long[] bufAddrs, long[] bufSizes); + + /** + * Create an instance for ParquetWriterJniWrapper. + * @throws IOException throws exception in case failed to open library. + */ + public ParquetWriterJniWrapper() throws IOException { + ParquetJniUtils.getInstance(); + } + + /** + * Open native ParquetWriter Instance. + * @param path Parquet File Path to write. + * @param schema arrow schema to initialize Parquet file. + * @param useHdfs3 A flag to tell if ArrowParquetReader should use hdfs3. + * @param replication Replication num for Hdfs Write. + * @return native instance handler. + * @throws IOException throws exception in case of io issues. + */ + long openParquetFile(String path, Schema schema, boolean useHdfs3, int replication) + throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), schema); + byte[] schemaBytes = out.toByteArray(); + long nativeHandler = nativeOpenParquetWriter(path, schemaBytes); + nativeInitParquetWriter(nativeHandler, useHdfs3, replication); + return nativeHandler; + } + + /** + * close native ParquetWriter Instance. + * @param nativeHandler native ParquetWriter Handler. + */ + void closeParquetFile(long nativeHandler) throws IOException { + nativeCloseParquetWriter(nativeHandler); + } + + /** + * Write Next ArrowRecordBatch to ParquetWriter. + * @param nativeHandler native ParquetWriter Handler. + * @param recordBatch next ArrowRecordBatch to write. + */ + void writeNext(long nativeHandler, ArrowRecordBatch recordBatch) throws IOException { + // convert ArrowRecordBatch to buffer List + int numRows = recordBatch.getLength(); + List buffers = recordBatch.getBuffers(); + List buffersLayout = recordBatch.getBuffersLayout(); + + long[] bufAddrs = new long[buffers.size()]; + long[] bufSizes = new long[buffers.size()]; + + int idx = 0; + for (ArrowBuf buf : buffers) { + bufAddrs[idx++] = buf.memoryAddress(); + } + + idx = 0; + for (ArrowBuffer bufLayout : buffersLayout) { + bufSizes[idx++] = bufLayout.getSize(); + } + nativeWriteNext(nativeHandler, numRows, bufAddrs, bufSizes); + } +} diff --git a/java/adapter/parquet/src/test/java/org/apache/arrow/adapter/parquet/ParquetReadWriteTest.java b/java/adapter/parquet/src/test/java/org/apache/arrow/adapter/parquet/ParquetReadWriteTest.java new file mode 100644 index 00000000000..7cf829d0275 --- /dev/null +++ b/java/adapter/parquet/src/test/java/org/apache/arrow/adapter/parquet/ParquetReadWriteTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.lang.Exception; +import java.lang.Long; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import io.netty.buffer.ArrowBuf; + +public class ParquetReadWriteTest { + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void teardown() { + allocator.close(); + } + + @Test + public void testParquetReadWrite() throws Exception { + + File testFile = testFolder.newFile("_tmpfile_ParquetWriterReaderTest"); + String path = testFile.getAbsolutePath(); + + int numColumns = 10; + int[] rowGroupIndices = {0}; + int[] columnIndices = new int[numColumns];; + for (int i = 0; i < numColumns; i++) { + columnIndices[i] = i; + } + + ParquetReaderJniWrapper readerHandler = new ParquetReaderJniWrapper(allocator); + ParquetWriterJniWrapper writerHandler = new ParquetWriterJniWrapper(); + + Schema schema = new Schema(asList( + field("a", new Int(32, true)), + field("b", new Int(32, true)), + field("c", new Int(32, true)), + field("d", new Int(32, true)), + field("e", new Int(32, true)), + field("f", new Int(32, true)), + field("g", new Int(32, true)), + field("h", new Int(32, true)), + field("i", new Int(32, true)), + field("j", new Int(32, true)) + )); + + VectorSchemaRoot expectedSchemaRoot = VectorSchemaRoot.create(schema, allocator); + for (FieldVector vector : expectedSchemaRoot.getFieldVectors()) { + vector.allocateNew(); + IntVector intVector = (IntVector) vector; + for (int i = 0; i < 16; i++) { + intVector.set(i, i); + } + } + + ArrowRecordBatch batch = createArrowRecordBatch(expectedSchemaRoot); + ParquetWriter writer = new ParquetWriter(writerHandler, path, schema); + writer.writeNext(batch); + releaseArrowRecordBatch(batch); + writer.close(); + + ParquetReader reader = + new ParquetReader(readerHandler, path, columnIndices, rowGroupIndices, 16); + + Schema readedSchema = reader.getSchema(); + assertEquals(schema.toJson(), readedSchema.toJson()); + + VectorSchemaRoot actualSchemaRoot = VectorSchemaRoot.create(readedSchema, allocator); + reader.readNextVectors(actualSchemaRoot); + + assertEquals(actualSchemaRoot.getRowCount(), expectedSchemaRoot.getRowCount()); + assertEquals(actualSchemaRoot.contentToTSVString(), expectedSchemaRoot.contentToTSVString()); + + actualSchemaRoot.close(); + expectedSchemaRoot.close(); + reader.close(); + testFile.delete(); + } + + private static Field field(String name, boolean nullable, ArrowType type, Field... children) { + return new Field(name, new FieldType(nullable, type, null, null), asList(children)); + } + + private static Field field(String name, ArrowType type, Field... children) { + return field(name, true, type, children); + } + + private ArrowBuf buf(byte[] bytes) { + ArrowBuf buffer = allocator.buffer(bytes.length); + buffer.writeBytes(bytes); + return buffer; + } + + private ArrowRecordBatch createArrowRecordBatch(VectorSchemaRoot root) { + List fieldNodes = new ArrayList(); + List inputData = new ArrayList(); + int numRowsInBatch = root.getRowCount(); + for (FieldVector inputVector : root.getFieldVectors()) { + fieldNodes.add(new ArrowFieldNode(numRowsInBatch, inputVector.getNullCount())); + inputData.add(inputVector.getValidityBuffer()); + inputData.add(inputVector.getDataBuffer()); + } + return new ArrowRecordBatch(numRowsInBatch, fieldNodes, inputData); + } + + private void releaseArrowRecordBatch(ArrowRecordBatch recordBatch) { + recordBatch.close(); + } + +} diff --git a/java/pom.xml b/java/pom.xml index d2171995e67..62eb41fec54 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -693,6 +693,7 @@ adapter/orc + adapter/parquet gandiva