diff --git a/cpp/src/jni/CMakeLists.txt b/cpp/src/jni/CMakeLists.txt index 3872d671934..e56ad0c5e10 100644 --- a/cpp/src/jni/CMakeLists.txt +++ b/cpp/src/jni/CMakeLists.txt @@ -22,3 +22,7 @@ if(ARROW_ORC) add_subdirectory(orc) endif() + +if(ARROW_PARQUET) + add_subdirectory(parquet) +endif() diff --git a/cpp/src/jni/orc/concurrent_map.h b/cpp/src/jni/concurrent_map.h similarity index 100% rename from cpp/src/jni/orc/concurrent_map.h rename to cpp/src/jni/concurrent_map.h diff --git a/cpp/src/jni/jni_common.h b/cpp/src/jni/jni_common.h new file mode 100644 index 00000000000..fd394d79d75 --- /dev/null +++ b/cpp/src/jni/jni_common.h @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/api.h" +#include "jni/concurrent_map.h" + +static jclass io_exception_class; +static jclass illegal_access_exception_class; +static jclass illegal_argument_exception_class; + +static jclass arrow_record_batch_builder_class; +static jmethodID arrow_record_batch_builder_constructor; + +static jclass arrow_field_node_builder_class; +static jmethodID arrow_field_node_builder_constructor; + +static jclass arrowbuf_builder_class; +static jmethodID arrowbuf_builder_constructor; + +static arrow::jni::ConcurrentMap> buffer_holder_; + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { + jclass local_class = env->FindClass(class_name); + jclass global_class = (jclass)env->NewGlobalRef(local_class); + env->DeleteLocalRef(local_class); + if (global_class == nullptr) { + std::string error_message = + "Unable to createGlobalClassReference for" + std::string(class_name); + env->ThrowNew(illegal_access_exception_class, error_message.c_str()); + } + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) { + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(illegal_access_exception_class, error_message.c_str()); + } + return ret; +} + +void LoadExceptionClassReferences(JNIEnv* env) { + io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;"); + illegal_access_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); +} + +void LoadRecordBatchClassReferences(JNIEnv* env) { + arrow_record_batch_builder_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/common/ArrowRecordBatchBuilder;"); + arrow_record_batch_builder_constructor = + GetMethodID(env, arrow_record_batch_builder_class, "", + "(I[Lorg/apache/arrow/adapter/common/ArrowFieldNodeBuilder;" + "[Lorg/apache/arrow/adapter/common/ArrowBufBuilder;)V"); + + arrow_field_node_builder_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/common/ArrowFieldNodeBuilder;"); + arrow_field_node_builder_constructor = + GetMethodID(env, arrow_field_node_builder_class, "", "(II)V"); + + arrowbuf_builder_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/adapter/common/ArrowBufBuilder;"); + arrowbuf_builder_constructor = + GetMethodID(env, arrowbuf_builder_class, "", "(JJIJ)V"); +} + +void UnloadExceptionClassReferences(JNIEnv* env) { + env->DeleteGlobalRef(io_exception_class); + env->DeleteGlobalRef(illegal_access_exception_class); + env->DeleteGlobalRef(illegal_argument_exception_class); +} + +void UnloadRecordBatchClassReferences(JNIEnv* env) { + env->DeleteGlobalRef(arrow_field_node_builder_class); + env->DeleteGlobalRef(arrowbuf_builder_class); + env->DeleteGlobalRef(arrow_record_batch_builder_class); + + buffer_holder_.Clear(); +} + +std::string JStringToCString(JNIEnv* env, jstring string) { + int32_t jlen, clen; + clen = env->GetStringUTFLength(string); + jlen = env->GetStringLength(string); + std::vector buffer(clen); + env->GetStringUTFRegion(string, 0, jlen, buffer.data()); + return std::string(buffer.data(), clen); +} + +arrow::Status MakeRecordBatch(const std::shared_ptr& schema, int num_rows, + int64_t* in_buf_addrs, int64_t* in_buf_sizes, + int in_bufs_len, + std::shared_ptr* batch) { + std::vector> arrays; + auto num_fields = schema->num_fields(); + int buf_idx = 0; + int sz_idx = 0; + + for (int i = 0; i < num_fields; i++) { + auto field = schema->field(i); + std::vector> buffers; + + if (buf_idx >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + int64_t validity_addr = in_buf_addrs[buf_idx++]; + int64_t validity_size = in_buf_sizes[sz_idx++]; + auto validity = std::shared_ptr<::arrow::Buffer>( + new ::arrow::Buffer(reinterpret_cast(validity_addr), validity_size)); + buffers.push_back(validity); + + if (buf_idx >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + int64_t value_addr = in_buf_addrs[buf_idx++]; + int64_t value_size = in_buf_sizes[sz_idx++]; + auto data = std::shared_ptr<::arrow::Buffer>( + new ::arrow::Buffer(reinterpret_cast(value_addr), value_size)); + buffers.push_back(data); + + if (arrow::is_binary_like(field->type()->id())) { + if (buf_idx >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + + // add offsets buffer for variable-len fields. + int64_t offsets_addr = in_buf_addrs[buf_idx++]; + int64_t offsets_size = in_buf_sizes[sz_idx++]; + auto offsets = std::shared_ptr<::arrow::Buffer>( + new ::arrow::Buffer(reinterpret_cast(offsets_addr), offsets_size)); + buffers.push_back(offsets); + } + + auto array_data = + ::arrow::ArrayData::Make(field->type(), num_rows, std::move(buffers)); + arrays.push_back(array_data); + } + *batch = arrow::RecordBatch::Make(schema, num_rows, arrays); + return arrow::Status::OK(); +} + +jobject MakeRecordBatchBuilder(JNIEnv* env, std::shared_ptr schema, + std::shared_ptr record_batch) { + jobjectArray field_array = + env->NewObjectArray(schema->num_fields(), arrow_field_node_builder_class, nullptr); + + std::vector> buffers; + for (int i = 0; i < schema->num_fields(); ++i) { + auto column = record_batch->column(i); + auto dataArray = column->data(); + jobject field = env->NewObject(arrow_field_node_builder_class, + arrow_field_node_builder_constructor, column->length(), + column->null_count()); + env->SetObjectArrayElement(field_array, i, field); + + for (auto& buffer : dataArray->buffers) { + buffers.push_back(buffer); + } + } + + jobjectArray arrowbuf_builder_array = + env->NewObjectArray(buffers.size(), arrowbuf_builder_class, nullptr); + + for (size_t j = 0; j < buffers.size(); ++j) { + auto buffer = buffers[j]; + uint8_t* data = nullptr; + int size = 0; + int64_t capacity = 0; + if (buffer != nullptr) { + data = (uint8_t*)buffer->data(); + size = (int)buffer->size(); + capacity = buffer->capacity(); + } + jobject arrowBufBuilder = + env->NewObject(arrowbuf_builder_class, arrowbuf_builder_constructor, + buffer_holder_.Insert(buffer), data, size, capacity); + env->SetObjectArrayElement(arrowbuf_builder_array, j, arrowBufBuilder); + } + + // create RecordBatch + jobject arrowRecordBatchBuilder = env->NewObject( + arrow_record_batch_builder_class, arrow_record_batch_builder_constructor, + record_batch->num_rows(), field_array, arrowbuf_builder_array); + return arrowRecordBatchBuilder; +} + +jbyteArray ToSchemaByteArray(JNIEnv* env, std::shared_ptr schema) { + arrow::Status status; + std::shared_ptr buffer; + status = arrow::ipc::SerializeSchema(*schema.get(), nullptr, + arrow::default_memory_pool(), &buffer); + if (!status.ok()) { + std::string error_message = + "Unable to convert schema to byte array, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + jbyteArray out = env->NewByteArray(buffer->size()); + auto src = reinterpret_cast(buffer->data()); + env->SetByteArrayRegion(out, 0, buffer->size(), src); + return out; +} + +arrow::Status FromSchemaByteArray(JNIEnv* env, jbyteArray schemaBytes, + std::shared_ptr* schema) { + arrow::Status status; + arrow::ipc::DictionaryMemo in_memo; + + int schemaBytes_len = env->GetArrayLength(schemaBytes); + jbyte* schemaBytes_data = env->GetByteArrayElements(schemaBytes, 0); + + auto serialized_schema = + std::make_shared((uint8_t*)schemaBytes_data, schemaBytes_len); + arrow::io::BufferReader buf_reader(serialized_schema); + status = arrow::ipc::ReadSchema(&buf_reader, &in_memo, schema); + + env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT); + + return status; +} + +void ReleaseBuffer(jlong id) { buffer_holder_.Erase(id); } diff --git a/cpp/src/jni/orc/jni_wrapper.cpp b/cpp/src/jni/orc/jni_wrapper.cpp index f18bacc864d..6ff82b8b5e1 100644 --- a/cpp/src/jni/orc/jni_wrapper.cpp +++ b/cpp/src/jni/orc/jni_wrapper.cpp @@ -29,7 +29,7 @@ #include "org_apache_arrow_adapter_orc_OrcReaderJniWrapper.h" #include "org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper.h" -#include "./concurrent_map.h" +#include "../concurrent_map.h" using ORCFileReader = arrow::adapters::orc::ORCFileReader; using RecordBatchReader = arrow::RecordBatchReader; diff --git a/cpp/src/jni/parquet/CMakeLists.txt b/cpp/src/jni/parquet/CMakeLists.txt new file mode 100644 index 00000000000..83bc610340c --- /dev/null +++ b/cpp/src/jni/parquet/CMakeLists.txt @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_parquet_jni +# + +project(arrow_parquet_jni) + +cmake_minimum_required(VERSION 3.11) + +find_package(JNI REQUIRED) + +add_custom_target(arrow_parquet_jni) + +set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") + +add_subdirectory(../../../../java/adapter/common ./java) + +set(ARROW_BUILD_STATIC OFF) + +add_arrow_lib(arrow_parquet_jni + BUILD_SHARED + SOURCES + jni_wrapper.cc + adapter.cc + OUTPUTS + ARROW_PARQUET_JNI_LIBRARIES + SHARED_LINK_LIBS + parquet_shared + arrow_shared + EXTRA_INCLUDES + ${JNI_HEADERS_DIR} + PRIVATE_INCLUDES + ${JNI_INCLUDE_DIRS} + DEPENDENCIES + arrow_parquet_java) + +add_dependencies(arrow_parquet_jni ${ARROW_PARQUET_JNI_LIBRARIES}) diff --git a/cpp/src/jni/parquet/adapter.cc b/cpp/src/jni/parquet/adapter.cc new file mode 100644 index 00000000000..f10bcd92dea --- /dev/null +++ b/cpp/src/jni/parquet/adapter.cc @@ -0,0 +1,252 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" +#include "jni/parquet/adapter.h" +#include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/schema.h" + +namespace jni { +namespace parquet { +namespace adapters { + +using RecordBatchReader = arrow::RecordBatchReader; +using Table = arrow::Table; + +class ParquetFileReader::Impl { + public: + Impl() = default; + ~Impl() {} + + Status Open(std::shared_ptr& file, MemoryPool* pool, + ::parquet::ArrowReaderProperties properties) { + file_ = file; + RETURN_NOT_OK(::parquet::arrow::FileReader::Make( + pool, ::parquet::ParquetFileReader::Open(file_), properties, &parquet_reader_)); + return Status::OK(); + } + + Status InitRecordBatchReader(const std::vector& column_indices, + const std::vector& row_group_indices) { + RETURN_NOT_OK( + GetRecordBatchReader(row_group_indices, column_indices, &record_batch_reader_)); + return Status::OK(); + } + + Status InitRecordBatchReader(const std::vector& column_indices, int64_t start_pos, + int64_t end_pos) { + std::vector row_group_indices = + GetRowGroupIndices(parquet_reader_->num_row_groups(), start_pos, end_pos); + RETURN_NOT_OK(InitRecordBatchReader(column_indices, row_group_indices)); + return Status::OK(); + } + + Status ReadSchema(std::shared_ptr* out) { + RETURN_NOT_OK(parquet_reader_->GetSchema(out)); + return Status::OK(); + } + + Status ReadNext(std::shared_ptr* out) { + RETURN_NOT_OK(record_batch_reader_->ReadNext(out)); + return Status::OK(); + } + + private: + std::shared_ptr file_; + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader_; + std::shared_ptr record_batch_reader_; + + Status GetRecordBatchReader(const std::vector& row_group_indices, + const std::vector& column_indices, + std::shared_ptr* rb_reader) { + if (column_indices.empty()) { + return parquet_reader_->GetRecordBatchReader(row_group_indices, rb_reader); + } else { + return parquet_reader_->GetRecordBatchReader(row_group_indices, column_indices, + rb_reader); + } + } + + std::vector GetRowGroupIndices(int num_row_groups, int64_t start_pos, + int64_t end_pos) { + std::unique_ptr<::parquet::ParquetFileReader> reader = + ::parquet::ParquetFileReader::Open(file_); + std::vector row_group_indices; + int64_t pos = 0; + for (int i = 0; i < num_row_groups; i++) { + if (pos >= start_pos && pos < end_pos) { + row_group_indices.push_back(i); + break; + } + pos += reader->RowGroup(i)->metadata()->total_byte_size(); + } + if (row_group_indices.empty()) { + row_group_indices.push_back(num_row_groups - 1); + } + return row_group_indices; + } +}; + +ParquetFileReader::ParquetFileReader() : impl_(new Impl()) {} + +ParquetFileReader::~ParquetFileReader() {} + +Status ParquetFileReader::Open(std::shared_ptr& file, MemoryPool* pool, + ::parquet::ArrowReaderProperties properties, + std::unique_ptr* reader) { + auto result = std::unique_ptr(new ParquetFileReader()); + RETURN_NOT_OK(result->impl_->Open(file, pool, properties)); + *reader = std::move(result); + return Status::OK(); +} + +Status ParquetFileReader::Open(std::shared_ptr& file, MemoryPool* pool, + std::unique_ptr* reader) { + ::parquet::ArrowReaderProperties properties(true); + return Open(file, pool, properties, reader); +} + +Status ParquetFileReader::InitRecordBatchReader( + const std::vector& column_indices, const std::vector& row_group_indices) { + return impl_->InitRecordBatchReader(column_indices, row_group_indices); +} + +Status ParquetFileReader::InitRecordBatchReader(const std::vector& column_indices, + int64_t start_pos, int64_t end_pos) { + return impl_->InitRecordBatchReader(column_indices, start_pos, end_pos); +} + +Status ParquetFileReader::ReadSchema(std::shared_ptr* out) { + return impl_->ReadSchema(out); +} + +Status ParquetFileReader::ReadNext(std::shared_ptr* out) { + return impl_->ReadNext(out); +} + +class ParquetFileWriter::Impl { + public: + Impl() = default; + ~Impl() {} + + Status Open(std::shared_ptr& output_stream, MemoryPool* pool, + std::shared_ptr schema, + std::shared_ptr<::parquet::ArrowWriterProperties> properties) { + output_stream_ = output_stream; + schema_ = schema; + std::shared_ptr<::parquet::schema::GroupNode> parquet_schema; + RETURN_NOT_OK(GetParquetSchema(schema, &parquet_schema)); + RETURN_NOT_OK(::parquet::arrow::FileWriter::Make( + pool, ::parquet::ParquetFileWriter::Open(output_stream, parquet_schema), schema, + properties, &parquet_writer_)); + return Status::OK(); + } + + Status WriteNext(std::shared_ptr in) { + std::lock_guard lck(thread_mtx_); + record_batch_buffer_list_.push_back(in); + return Status::OK(); + } + + Status Flush() { + std::shared_ptr table; + RETURN_NOT_OK(Table::FromRecordBatches(record_batch_buffer_list_, &table)); + RETURN_NOT_OK(parquet_writer_->WriteTable(*table.get(), table->num_rows())); + RETURN_NOT_OK(output_stream_->Flush()); + return Status::OK(); + } + + Status GetSchema(std::shared_ptr* out) { + *out = schema_; + return Status::OK(); + } + + private: + MemoryPool* pool_; + std::shared_ptr schema_; + std::mutex thread_mtx_; + std::shared_ptr output_stream_; + std::unique_ptr<::parquet::arrow::FileWriter> parquet_writer_; + std::vector> record_batch_buffer_list_; + + Status GetParquetSchema(std::shared_ptr schema, + std::shared_ptr<::parquet::schema::GroupNode>* parquet_schema) { + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::default_writer_properties(); + std::shared_ptr<::parquet::SchemaDescriptor> schema_description; + RETURN_NOT_OK(::parquet::arrow::ToParquetSchema(schema.get(), *properties.get(), + &schema_description)); + + ::parquet::schema::NodeVector group_node_fields; + for (int i = 0; i < schema_description->group_node()->field_count(); i++) { + group_node_fields.push_back(schema_description->group_node()->field(i)); + } + *parquet_schema = std::static_pointer_cast<::parquet::schema::GroupNode>( + ::parquet::schema::GroupNode::Make( + schema_description->schema_root()->name(), + schema_description->schema_root()->repetition(), group_node_fields)); + + return Status::OK(); + } +}; + +ParquetFileWriter::ParquetFileWriter() : impl_(new Impl()) {} +ParquetFileWriter::~ParquetFileWriter() {} + +Status ParquetFileWriter::Open( + std::shared_ptr& output_stream, MemoryPool* pool, + std::shared_ptr schema, + std::shared_ptr<::parquet::ArrowWriterProperties> properties, + std::unique_ptr* writer) { + auto result = std::unique_ptr(new ParquetFileWriter()); + RETURN_NOT_OK(result->impl_->Open(output_stream, pool, schema, properties)); + *writer = std::move(result); + return Status::OK(); +} + +Status ParquetFileWriter::Open(std::shared_ptr& output_stream, + MemoryPool* pool, std::shared_ptr schema, + std::unique_ptr* writer) { + return Open(output_stream, pool, schema, ::parquet::default_arrow_writer_properties(), + writer); +} + +Status ParquetFileWriter::WriteNext(std::shared_ptr in) { + return impl_->WriteNext(in); +} + +Status ParquetFileWriter::Flush() { return impl_->Flush(); } + +Status ParquetFileWriter::GetSchema(std::shared_ptr* out) { + return impl_->GetSchema(out); +} + +} // namespace adapters +} // namespace parquet +} // namespace jni diff --git a/cpp/src/jni/parquet/adapter.h b/cpp/src/jni/parquet/adapter.h new file mode 100644 index 00000000000..39e761b11b6 --- /dev/null +++ b/cpp/src/jni/parquet/adapter.h @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_CONVERTER_H +#define ARROW_PARQUET_CONVERTER_H + +#include +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/memory_pool.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/visibility.h" +#include "parquet/properties.h" + +namespace jni { + +namespace parquet { + +namespace adapters { + +using MemoryPool = arrow::MemoryPool; +using OutputStream = arrow::io::OutputStream; +using RandomAccessFile = arrow::io::RandomAccessFile; +using RecordBatch = arrow::RecordBatch; +using Schema = arrow::Schema; +using Status = arrow::Status; + +/// \class ParquetFileReader +/// \brief Read an Arrow RecordBatch from an PARQUET file. +class ARROW_EXPORT ParquetFileReader { + public: + ~ParquetFileReader(); + + /// \brief Creates a new PARQUET reader. + /// + /// \param[in] file the data source + /// \param[in] pool a MemoryPool to use for buffer allocations + /// \param[in] properties ArrowReaderProperties + /// \param[out] reader the returned reader object + /// \return Status + static Status Open(std::shared_ptr& file, MemoryPool* pool, + ::parquet::ArrowReaderProperties properties, + std::unique_ptr* reader); + + /// \brief Creates a new PARQUET reader. + /// + /// \param[in] file the data source + /// \param[in] pool a MemoryPool to use for buffer allocations + /// \param[out] reader the returned reader object + /// \return Status + static Status Open(std::shared_ptr& file, MemoryPool* pool, + std::unique_ptr* reader); + + /// \brief Get a record batch iterator with specified row group index and + // column indices. + /// + /// \param[in] column_indices indexes of columns expected to be read + /// \param[in] row_group_indices indexes of row_groups expected to be read + Status InitRecordBatchReader(const std::vector& column_indices, + const std::vector& row_group_indices); + + /// \brief Get a record batch iterator with a start and end pos to indicate + // row groupand column indices. + /// + /// \param[in] column_indices indexes of columns expected to be read + /// \param[in] start_pos start position of row_groups expected to be read + /// \param[in] end_pos end position of row_groups expected to be read + Status InitRecordBatchReader(const std::vector& column_indices, int64_t start_pos, + int64_t end_pos); + + /// \brief Return the schema read from the PARQUET file + /// + /// \param[out] out the returned Schema object + Status ReadSchema(std::shared_ptr* out); + + /// \brief Read a RecordBatch + /// + /// \param[out] out the returned RecordBatch + Status ReadNext(std::shared_ptr* out); + + private: + class Impl; + std::unique_ptr impl_; + ParquetFileReader(); +}; + +/// \class ParquetFileWriter +/// \brief Write an Arrow RecordBatch to an PARQUET file. +class ARROW_EXPORT ParquetFileWriter { + public: + ~ParquetFileWriter(); + + /// \brief Creates a new PARQUET writer. + /// + /// \param[in] output_stream output stream + /// \param[in] pool a MemoryPool to use for buffer allocations + /// \param[in] schema Arrow schema for to be written data + /// \param[out] writer the returned writer object + /// \return Status + static Status Open(std::shared_ptr& output_stream, MemoryPool* pool, + std::shared_ptr schema, + std::unique_ptr* writer); + + /// \brief Creates a new PARQUET writer. + /// + /// \param[in] output_stream output stream + /// \param[in] pool a MemoryPool to use for buffer allocations + /// \param[in] schema Arrow schema for to be written data + /// \param[in] properties ArrowWriterProperties + /// \param[out] writer the returned writer object + /// \return Status + static Status Open(std::shared_ptr& output_stream, MemoryPool* pool, + std::shared_ptr schema, + std::shared_ptr<::parquet::ArrowWriterProperties> properties, + std::unique_ptr* writer); + + /// \brief write a RecordBatch to buffer + /// + /// \param[in] in record batch data to be written + Status WriteNext(std::shared_ptr in); + + /// \brief flush all record batch in buffer to parquet output stream + Status Flush(); + + /// \brief Return the schema read from the PARQUET file + /// + /// \param[out] out the returned Schema object + Status GetSchema(std::shared_ptr* out); + + private: + class Impl; + std::unique_ptr impl_; + ParquetFileWriter(); +}; +} // namespace adapters + +} // namespace parquet + +} // namespace jni + +#endif // ARROW_PARQUET_CONVERTER_H diff --git a/cpp/src/jni/parquet/jni_wrapper.cc b/cpp/src/jni/parquet/jni_wrapper.cc new file mode 100644 index 00000000000..8445462b9da --- /dev/null +++ b/cpp/src/jni/parquet/jni_wrapper.cc @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include "jni/concurrent_map.h" +#include "jni/jni_common.h" +#include "jni/parquet/adapter.h" + +static jint JNI_VERSION = JNI_VERSION_1_8; + +using FileSystem = arrow::fs::FileSystem; +using ParquetFileReader = jni::parquet::adapters::ParquetFileReader; +using ParquetFileWriter = jni::parquet::adapters::ParquetFileWriter; + +static arrow::jni::ConcurrentMap> reader_holder_; +static arrow::jni::ConcurrentMap> writer_holder_; + +std::shared_ptr GetFileReader(JNIEnv* env, jlong id) { + auto reader = reader_holder_.Lookup(id); + if (!reader) { + std::string error_message = "invalid reader id " + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + } + return reader; +} + +std::shared_ptr GetFileWriter(JNIEnv* env, jlong id) { + auto writer = writer_holder_.Lookup(id); + if (!writer) { + std::string error_message = "invalid reader id " + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + } + return writer; +} + +#ifdef __cplusplus +extern "C" { +#endif + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return JNI_ERR; + } + + LoadExceptionClassReferences(env); + LoadRecordBatchClassReferences(env); + + return JNI_VERSION; +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + std::cerr << "JNI_OnUnload" << std::endl; + JNIEnv* env; + vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + + UnloadExceptionClassReferences(env); + UnloadRecordBatchClassReferences(env); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeOpenParquetReader( + JNIEnv* env, jobject obj, jstring path, jlong batch_size) { + arrow::Status status; + std::string cpath = JStringToCString(env, path); + + std::shared_ptr fs; + std::string file_name; + status = arrow::fs::FileSystemFromUri(cpath, &fs, &file_name); + if (!status.ok()) { + std::string error_message = "nativeOpenParquetReader: " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + std::shared_ptr file; + status = fs->OpenInputFile(file_name).Value(&file); + if (!status.ok()) { + std::string error_message = "nativeOpenParquetReader: " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + parquet::ArrowReaderProperties properties(true); + properties.set_batch_size(batch_size); + + std::unique_ptr reader; + status = + ParquetFileReader::Open(file, arrow::default_memory_pool(), properties, &reader); + if (!status.ok()) { + std::string error_message = "nativeOpenParquetReader: " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + return reader_holder_.Insert(std::shared_ptr(reader.release())); +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeInitParquetReader( + JNIEnv* env, jobject obj, jlong id, jintArray column_indices, + jintArray row_group_indices) { + // Prepare column_indices and row_group_indices from java array. + bool column_indices_need_release = false; + int column_indices_len = env->GetArrayLength(column_indices); + jint* column_indices_ptr = env->GetIntArrayElements(column_indices, 0); + std::vector _column_indices(column_indices_ptr, + column_indices_ptr + column_indices_len); + column_indices_need_release = true; + + bool row_group_indices_need_release = false; + std::vector _row_group_indices = {}; + jint* row_group_indices_ptr; + int row_group_indices_len = env->GetArrayLength(row_group_indices); + if (row_group_indices_len != 0) { + row_group_indices_ptr = env->GetIntArrayElements(row_group_indices, 0); + std::vector rg_indices_tmp(row_group_indices_ptr, + row_group_indices_ptr + row_group_indices_len); + _row_group_indices = rg_indices_tmp; + row_group_indices_need_release = true; + } + + // Call ParquetFileReader init func. + arrow::Status status; + auto reader = GetFileReader(env, id); + status = reader->InitRecordBatchReader(_column_indices, _row_group_indices); + if (!status.ok()) { + std::string error_message = + "nativeInitParquetReader: failed to Initialize, err msg is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + if (column_indices_need_release) { + env->ReleaseIntArrayElements(column_indices, column_indices_ptr, JNI_ABORT); + } + if (row_group_indices_need_release) { + env->ReleaseIntArrayElements(row_group_indices, row_group_indices_ptr, JNI_ABORT); + } +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeInitParquetReader2( + JNIEnv* env, jobject obj, jlong id, jintArray column_indices, jlong start_pos, + jlong end_pos) { + // Prepare column_indices and row_group_indices from java array. + bool column_indices_need_release = false; + int column_indices_len = env->GetArrayLength(column_indices); + jint* column_indices_ptr = env->GetIntArrayElements(column_indices, 0); + std::vector _column_indices(column_indices_ptr, + column_indices_ptr + column_indices_len); + column_indices_need_release = true; + + // Call ParquetFileReader init func. + arrow::Status status; + auto reader = GetFileReader(env, id); + status = reader->InitRecordBatchReader(_column_indices, start_pos, end_pos); + if (!status.ok()) { + std::string error_message = + "nativeInitParquetReader2: failed to Initialize, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + if (column_indices_need_release) { + env->ReleaseIntArrayElements(column_indices, column_indices_ptr, JNI_ABORT); + } +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeCloseParquetReader( + JNIEnv* env, jobject obj, jlong id) { + reader_holder_.Erase(id); +} + +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeReadNext(JNIEnv* env, + jobject obj, + jlong id) { + arrow::Status status; + auto reader = GetFileReader(env, id); + + std::shared_ptr record_batch; + status = reader->ReadNext(&record_batch); + if (!status.ok()) { + std::string error_message = + "nativeReadNext: failed to read next batch, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + if (record_batch == nullptr) { + return nullptr; + } + std::shared_ptr schema; + status = reader->ReadSchema(&schema); + if (!status.ok()) { + std::string error_message = + "nativeReadNext: failed to read schema, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + return MakeRecordBatchBuilder(env, schema, record_batch); +} + +JNIEXPORT jobject JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetReaderJniWrapper_nativeGetSchema(JNIEnv* env, + jobject obj, + jlong id) { + arrow::Status status; + auto reader = GetFileReader(env, id); + std::shared_ptr schema; + status = reader->ReadSchema(&schema); + if (!status.ok()) { + std::string error_message = + "nativeGetSchema: failed to read schema, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + jbyteArray ret = ToSchemaByteArray(env, schema); + if (ret == nullptr) { + std::string error_message = "nativeGetSchema: failed to convert schema to byte array"; + env->ThrowNew(io_exception_class, error_message.c_str()); + } + return ret; +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_common_AdaptorReferenceManager_nativeRelease( + JNIEnv* env, jobject this_obj, jlong id) { + ReleaseBuffer(id); +} + +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeOpenParquetWriter( + JNIEnv* env, jobject obj, jstring path, jbyteArray schemaBytes) { + arrow::Status status; + std::shared_ptr schema; + status = FromSchemaByteArray(env, schemaBytes, &schema); + if (!status.ok()) { + std::string error_message = + "nativeOpenParquetWriter: failed to readSchema, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + std::string cpath = JStringToCString(env, path); + + std::shared_ptr fs; + std::string file_name; + status = arrow::fs::FileSystemFromUri(cpath, &fs, &file_name); + if (!status.ok()) { + std::string error_message = "nativeOpenParquetWriter: " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + std::shared_ptr sink; + status = fs->OpenOutputStream(file_name).Value(&sink); + if (!status.ok()) { + std::string error_message = "nativeOpenParquetWriter: " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + std::unique_ptr writer; + status = ParquetFileWriter::Open(sink, arrow::default_memory_pool(), schema, &writer); + if (!status.ok()) { + std::string error_message = "nativeOpenParquetWriter: " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + return writer_holder_.Insert(std::shared_ptr(writer.release())); +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeCloseParquetWriter( + JNIEnv* env, jobject obj, jlong id) { + arrow::Status status; + auto writer = GetFileWriter(env, id); + status = writer->Flush(); + if (!status.ok()) { + std::string error_message = + "nativeCloseParquetWriter: failed to Flush, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + writer_holder_.Erase(id); +} + +JNIEXPORT void JNICALL +Java_org_apache_arrow_adapter_parquet_ParquetWriterJniWrapper_nativeWriteNext( + JNIEnv* env, jobject obj, jlong id, jint num_rows, jlongArray bufAddrs, + jlongArray bufSizes) { + // convert input data to record batch + int in_bufs_len = env->GetArrayLength(bufAddrs); + if (in_bufs_len != env->GetArrayLength(bufSizes)) { + std::string error_message = + "nativeWriteNext: mismatch in arraylen of buf_addrs and buf_sizes"; + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + jlong* in_buf_addrs = env->GetLongArrayElements(bufAddrs, 0); + jlong* in_buf_sizes = env->GetLongArrayElements(bufSizes, 0); + + arrow::Status status; + auto writer = GetFileWriter(env, id); + + std::shared_ptr schema; + status = writer->GetSchema(&schema); + if (!status.ok()) { + std::string error_message = + "nativeWriteNext: failed to read schema, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + std::shared_ptr record_batch; + status = MakeRecordBatch(schema, num_rows, in_buf_addrs, in_buf_sizes, in_bufs_len, + &record_batch); + if (!status.ok()) { + std::string error_message = + "nativeWriteNext: failed to get record batch, err is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + status = writer->WriteNext(record_batch); + if (!status.ok()) { + std::string error_message = + "nativeWriteNext: failed to write next batch, err msg is " + status.message(); + env->ThrowNew(io_exception_class, error_message.c_str()); + } + + env->ReleaseLongArrayElements(bufAddrs, in_buf_addrs, JNI_ABORT); + env->ReleaseLongArrayElements(bufSizes, in_buf_sizes, JNI_ABORT); +} + +#ifdef __cplusplus +} +#endif diff --git a/java/adapter/common/CMakeLists.txt b/java/adapter/common/CMakeLists.txt new file mode 100644 index 00000000000..61a0a0b2dcd --- /dev/null +++ b/java/adapter/common/CMakeLists.txt @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_parquet_java +# + +# Headers: top level + +project(arrow_parquet_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar( + arrow_parquet_java + src/main/java/org/apache/arrow/adapter/common/ArrowBufBuilder.java + src/main/java/org/apache/arrow/adapter/common/ArrowFieldNodeBuilder.java + src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilder.java + GENERATE_NATIVE_HEADERS arrow_parquet_java-native + DESTINATION ${JNI_HEADERS_DIR} +) diff --git a/java/adapter/common/pom.xml b/java/adapter/common/pom.xml new file mode 100644 index 00000000000..6ce42dfadce --- /dev/null +++ b/java/adapter/common/pom.xml @@ -0,0 +1,40 @@ + + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 1.0.0-SNAPSHOT + ../../pom.xml + + org.apache.arrow + arrow-adapter-common + Arrow Adapter Common + jar + + + + org.apache.arrow + arrow-memory + ${project.version} + compile + + + org.apache.arrow + arrow-vector + ${project.version} + compile + + + diff --git a/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/AdaptorReferenceManager.java b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/AdaptorReferenceManager.java new file mode 100644 index 00000000000..3d96051860a --- /dev/null +++ b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/AdaptorReferenceManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.common; + +import java.io.IOException; +import java.lang.UnsupportedOperationException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.OwnershipTransferResult; +import org.apache.arrow.memory.ReferenceManager; +import org.apache.arrow.util.Preconditions; + +import io.netty.buffer.ArrowBuf; + +/** + * A simple reference manager implementation for memory allocated by native code. + * The underlying memory will be released when reference count reach zero. + */ +public class AdaptorReferenceManager implements ReferenceManager { + private native void nativeRelease(long nativeMemoryHolder); + + private final AtomicInteger bufRefCnt = new AtomicInteger(0); + private long nativeMemoryHolder; + private int size = 0; + + AdaptorReferenceManager(long nativeMemoryHolder, int size) + throws IOException { + this.nativeMemoryHolder = nativeMemoryHolder; + this.size = size; + } + + @Override + public int getRefCount() { + return bufRefCnt.get(); + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + Preconditions.checkState(decrement >= 1, + "ref count decrement should be greater than or equal to 1"); + // decrement the ref count + final int refCnt; + synchronized (this) { + refCnt = bufRefCnt.addAndGet(-decrement); + if (refCnt == 0) { + // refcount of this reference manager has dropped to 0 + // release the underlying memory + nativeRelease(nativeMemoryHolder); + } + } + // the new ref count should be >= 0 + Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative"); + return refCnt == 0; + } + + @Override + public void retain() { + retain(1); + } + + @Override + public void retain(int increment) { + Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment); + bufRefCnt.addAndGet(increment); + } + + @Override + public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { + retain(); + return srcBuffer; + } + + @Override + public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, int index, int length) { + final long derivedBufferAddress = sourceBuffer.memoryAddress() + index; + + // create new ArrowBuf + final ArrowBuf derivedBuf = new ArrowBuf( + this, + null, + length, + derivedBufferAddress, + false); + + return derivedBuf; + } + + @Override + public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) { + throw new UnsupportedOperationException(); + } + + @Override + public BufferAllocator getAllocator() { + return null; + } + + @Override + public int getSize() { + return size; + } + + @Override + public int getAccountedSize() { + return 0; + } +} diff --git a/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowBufBuilder.java b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowBufBuilder.java new file mode 100644 index 00000000000..49971f3de49 --- /dev/null +++ b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowBufBuilder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.common; + +/** + * ArrowBufBuilder. + */ +public class ArrowBufBuilder { + + public long nativeInstanceId; + public long memoryAddress; + public int size; + public long capacity; + + /** + * Create an instance for ArrowBufBuilder. + * @param nativeInstanceId native ArrowBuf holder. + * @param memoryAddress native ArrowBuf data addr. + * @param size ArrowBuf size. + * @param capacity ArrowBuf rowNums. + */ + public ArrowBufBuilder(long nativeInstanceId, long memoryAddress, int size, long capacity) { + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + this.nativeInstanceId = nativeInstanceId; + } +} diff --git a/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowFieldNodeBuilder.java b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowFieldNodeBuilder.java new file mode 100644 index 00000000000..aee480cb4c5 --- /dev/null +++ b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowFieldNodeBuilder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.common; + +/** + * Wrapper for ArrowFieldNodeBuilder. + */ +public class ArrowFieldNodeBuilder { + + public int length; + public int nullCount; + + /** + * Create an instance of ArrowFieldNodeBuilder. + * @param length numRows in this Field + * @param nullCount numCount in this Fields + */ + public ArrowFieldNodeBuilder(int length, int nullCount) { + this.length = length; + this.nullCount = nullCount; + } + +} + diff --git a/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilder.java b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilder.java new file mode 100644 index 00000000000..37c8c394f6e --- /dev/null +++ b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.common; + +/** + * ArrowRecordBatchBuilder. + */ +public class ArrowRecordBatchBuilder { + + public int length; + public ArrowFieldNodeBuilder[] nodeBuilders; + public ArrowBufBuilder[] bufferBuilders; + + /** + * Create an instance to wrap native parameters for ArrowRecordBatchBuilder. + * @param length ArrowRecordBatch rowNums. + * @param nodeBuilders an Array hold ArrowFieldNodeBuilder. + * @param bufferBuilders an Array hold ArrowBufBuilder. + */ + public ArrowRecordBatchBuilder( + int length, ArrowFieldNodeBuilder[] nodeBuilders, ArrowBufBuilder[] bufferBuilders) { + this.length = length; + this.nodeBuilders = nodeBuilders; + this.bufferBuilders = bufferBuilders; + } + +} diff --git a/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilderImpl.java b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilderImpl.java new file mode 100644 index 00000000000..588673fd046 --- /dev/null +++ b/java/adapter/common/src/main/java/org/apache/arrow/adapter/common/ArrowRecordBatchBuilderImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.common; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; + +import io.netty.buffer.ArrowBuf; + +/** + * ArrowRecordBatchBuilderImpl used to wrap native returned data into an ArrowRecordBatch. + */ +public class ArrowRecordBatchBuilderImpl { + + private int length; + private ArrowRecordBatchBuilder recordBatchBuilder; + + /** + * Create ArrowRecordBatchBuilderImpl instance from ArrowRecordBatchBuilder. + * @param recordBatchBuilder ArrowRecordBatchBuilder instance. + */ + public ArrowRecordBatchBuilderImpl(ArrowRecordBatchBuilder recordBatchBuilder) { + this.recordBatchBuilder = recordBatchBuilder; + } + + /** + * Build ArrowRecordBatch from ArrowRecordBatchBuilder instance. + * @throws IOException throws exception + */ + public ArrowRecordBatch build() throws IOException { + if (recordBatchBuilder.length == 0) { + return null; + } + + List nodes = new ArrayList(); + for (ArrowFieldNodeBuilder tmp : recordBatchBuilder.nodeBuilders) { + nodes.add(new ArrowFieldNode(tmp.length, tmp.nullCount)); + } + + List buffers = new ArrayList(); + for (ArrowBufBuilder tmp : recordBatchBuilder.bufferBuilders) { + AdaptorReferenceManager referenceManager = + new AdaptorReferenceManager(tmp.nativeInstanceId, tmp.size); + buffers.add(new ArrowBuf(referenceManager, null, tmp.size, tmp.memoryAddress, false)); + } + return new ArrowRecordBatch(recordBatchBuilder.length, nodes, buffers); + } + +} diff --git a/java/adapter/parquet/pom.xml b/java/adapter/parquet/pom.xml new file mode 100644 index 00000000000..1d250eaa982 --- /dev/null +++ b/java/adapter/parquet/pom.xml @@ -0,0 +1,72 @@ + + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 1.0.0-SNAPSHOT + ../../pom.xml + + org.apache.arrow.parquet + arrow-parquet + Arrow Parquet Adapter + (Experimental/Contrib)A JNI wrapper for the C++ Parquet reader implementation. + jar + + + + org.apache.arrow + arrow-adapter-common + ${project.version} + compile + + + org.apache.arrow + arrow-memory + ${project.version} + compile + + + org.apache.arrow + arrow-vector + ${project.version} + compile + + + + + ../../../cpp/release-build/release + + + + + ${arrow.cpp.build.dir} + + **/libarrow_parquet_jni.* + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + UTC + + + + + + diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetJniUtils.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetJniUtils.java new file mode 100644 index 00000000000..0f4207b04bf --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetJniUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +/** + * Helper class for JNI related operations. + */ +class ParquetJniUtils { + private static final String LIBRARY_NAME = "arrow_parquet_jni"; + private static boolean isLoaded = false; + private static volatile ParquetJniUtils INSTANCE; + + static ParquetJniUtils getInstance() throws IOException { + if (INSTANCE == null) { + synchronized (ParquetJniUtils.class) { + if (INSTANCE == null) { + try { + INSTANCE = new ParquetJniUtils(); + } catch (IllegalAccessException ex) { + throw new IOException("IllegalAccess"); + } + } + } + } + + return INSTANCE; + } + + private ParquetJniUtils() throws IOException, IllegalAccessException { + loadParquetAdapterLibraryFromJar(); + } + + static void loadParquetAdapterLibraryFromJar() + throws IOException, IllegalAccessException { + synchronized (ParquetJniUtils.class) { + if (!isLoaded) { + final String libraryToLoad = System.mapLibraryName(LIBRARY_NAME); + final File libraryFile = moveFileFromJarToTemp( + System.getProperty("java.io.tmpdir"), libraryToLoad); + System.load(libraryFile.getAbsolutePath()); + isLoaded = true; + } + } + } + + private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad) + throws IOException { + final File temp = File.createTempFile(tmpDir, libraryToLoad); + try (final InputStream is = ParquetJniUtils.class.getClassLoader() + .getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } else { + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + return temp; + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReader.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReader.java new file mode 100644 index 00000000000..39e6a7f52d7 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReader.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.adapter.common.ArrowRecordBatchBuilder; +import org.apache.arrow.adapter.common.ArrowRecordBatchBuilderImpl; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageResult; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +/** Parquet Reader Class. */ +public class ParquetReader implements AutoCloseable { + + /** reference to native reader instance. */ + private long nativeInstanceId; + + /** last readed length of a record batch. */ + private long lastReadLength; + + private BufferAllocator allocator; + private ParquetReaderJniWrapper jniWrapper; + + /** + * Create an instance for ParquetReader. + * + * @param path Parquet Reader File Path. + * @param rowGroupIndices An array to indicate which rowGroup to read. + * @param columnIndices An array to indicate which columns to read. + * @param batchSize number of rows expected to be read in one batch. + * @param allocator A BufferAllocator reference. + * @throws IOException throws io exception in case of native failure. + */ + public ParquetReader( + String path, + int[] rowGroupIndices, + int[] columnIndices, + long batchSize, + BufferAllocator allocator) + throws IOException { + this.jniWrapper = new ParquetReaderJniWrapper(); + this.allocator = allocator; + this.nativeInstanceId = jniWrapper.nativeOpenParquetReader(path, batchSize); + jniWrapper.nativeInitParquetReader(nativeInstanceId, columnIndices, rowGroupIndices); + } + + /** + * Create an instance for ParquetReader. + * + * @param path Parquet Reader File Path. + * @param startPos A start pos to indicate which rowGroup to read. + * @param endPos An end pos indicate which rowGroup to read. + * @param columnIndices An array to indicate which columns to read. + * @param batchSize number of rows expected to be read in one batch. + * @param allocator A BufferAllocator reference. + * @throws IOException throws io exception in case of native failure. + */ + public ParquetReader( + String path, + long startPos, + long endPos, + int[] columnIndices, + long batchSize, + BufferAllocator allocator) + throws IOException { + this.jniWrapper = new ParquetReaderJniWrapper(); + this.allocator = allocator; + this.nativeInstanceId = jniWrapper.nativeOpenParquetReader(path, batchSize); + jniWrapper.nativeInitParquetReader2(nativeInstanceId, columnIndices, startPos, endPos); + } + + /** + * Get Arrow Schema from ParquetReader. + * + * @return Schema of parquet file + * @throws IOException throws io exception in case of native failure + */ + Schema getSchema() throws IOException { + byte[] schemaBytes = jniWrapper.nativeGetSchema(nativeInstanceId); + + try (MessageChannelReader schemaReader = + new MessageChannelReader( + new ReadChannel(new ByteArrayReadableSeekableByteChannel(schemaBytes)), allocator)) { + + MessageResult result = schemaReader.readNext(); + if (result == null) { + throw new IOException("Unexpected end of input. Missing schema."); + } + + return MessageSerializer.deserializeSchema(result.getMessage()); + } + } + + /** + * Read Next ArrowRecordBatch from ParquetReader. + * + * @return One ArrowRecordBatch readed from parquet file reader + * @throws IOException throws io exception in case of native failure + */ + ArrowRecordBatch readNext() throws IOException { + ArrowRecordBatchBuilder recordBatchBuilder = jniWrapper.nativeReadNext(nativeInstanceId); + if (recordBatchBuilder == null) { + return null; + } + ArrowRecordBatchBuilderImpl recordBatchBuilderImpl = + new ArrowRecordBatchBuilderImpl(recordBatchBuilder); + ArrowRecordBatch batch = recordBatchBuilderImpl.build(); + this.lastReadLength = batch.getLength(); + return batch; + } + + /** + * Read Next ValueVectorList from ParquetReader. + * + * @return Next ValueVectorList readed from parquet file. + * @throws IOException throws io exception in case of native failure + */ + public List readNextVectors(VectorSchemaRoot root) throws IOException { + ArrowRecordBatch batch = readNext(); + if (batch == null) { + return null; + } + VectorLoader loader = new VectorLoader(root); + loader.load(batch); + batch.close(); + return root.getFieldVectors(); + } + + /** + * Get last readed ArrowRecordBatch Length. + * + * @return lastReadLength. + */ + public long lastReadLength() { + return lastReadLength; + } + + @Override + public void close() { + jniWrapper.nativeCloseParquetReader(nativeInstanceId); + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReaderJniWrapper.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReaderJniWrapper.java new file mode 100644 index 00000000000..59b118ffc10 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetReaderJniWrapper.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; + +import org.apache.arrow.adapter.common.ArrowRecordBatchBuilder; + +/** Wrapper for Parquet Reader native API. */ +public class ParquetReaderJniWrapper { + + /** Construct a Jni Instance. */ + ParquetReaderJniWrapper() throws IOException { + ParquetJniUtils.getInstance(); + } + + /** + * Construct a parquet file reader over the target file name. + * + * @param path absolute file path of target file + * @param batchSize number of rows of one readed batch + * @return long id of the parquet reader instance + * @throws IOException throws exception in case of any io exception in native codes + */ + public native long nativeOpenParquetReader(String path, long batchSize) throws IOException; + + /** + * Init a parquet file reader by specifying columns and rowgroups. + * + * @param id parquet reader instance number + * @param columnIndices a array of indexes indicate which columns to be read + * @param rowGroupIndices a array of indexes indicate which row groups to be read + * @throws IOException throws exception in case of any io exception in native codes + */ + public native void nativeInitParquetReader(long id, int[] columnIndices, int[] rowGroupIndices) + throws IOException; + + /** + * Init a parquet file reader by specifying columns and rowgroups. + * + * @param id parquet reader instance number + * @param columnIndices a array of indexes indicate which columns to be read + * @param startPos a start pos to indicate which row group to be read + * @param endPos a end pos to indicate which row group to be read + * @throws IOException throws exception in case of any io exception in native codes + */ + public native void nativeInitParquetReader2( + long id, int[] columnIndices, long startPos, long endPos) throws IOException; + + /** + * Close a parquet file reader. + * + * @param id parquet reader instance number + */ + public native void nativeCloseParquetReader(long id); + + /** + * Read next record batch from parquet file reader. + * + * @param id parquet reader instance number + * @throws IOException throws exception in case of any io exception in native codes + */ + public native ArrowRecordBatchBuilder nativeReadNext(long id) throws IOException; + + /** + * Get schema from parquet file reader. + * + * @param id parquet reader instance number + * @throws IOException throws exception in case of any io exception in native codes + */ + public native byte[] nativeGetSchema(long id) throws IOException; +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriter.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriter.java new file mode 100644 index 00000000000..04ae6930a47 --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.util.List; + +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.ArrowBuffer; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; + +import io.netty.buffer.ArrowBuf; + +/** Wrapper for Parquet Writer native API. */ +public class ParquetWriter implements AutoCloseable { + + /** reference to native reader instance. */ + private long nativeInstanceId; + + private ParquetWriterJniWrapper jniWrapper; + + /** + * Open native ParquetWriter Instance. + * + * @param path Parquet File Path to write. + * @param schema arrow schema to initialize Parquet file. + * @throws IOException throws io exception in case of native failure. + */ + public ParquetWriter(String path, Schema schema) + throws IOException { + this.jniWrapper = new ParquetWriterJniWrapper(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), schema); + byte[] schemaBytes = out.toByteArray(); + this.nativeInstanceId = jniWrapper.nativeOpenParquetWriter(path, schemaBytes); + } + + /** + * Write Next ArrowRecordBatch to ParquetWriter. + * + * @param recordBatch next ArrowRecordBatch to write. + * @throws IOException throws exception in case of io issues. + */ + public void writeNext(ArrowRecordBatch recordBatch) throws IOException { + // convert ArrowRecordBatch to buffer List + int numRows = recordBatch.getLength(); + List buffers = recordBatch.getBuffers(); + List buffersLayout = recordBatch.getBuffersLayout(); + + long[] bufAddrs = new long[buffers.size()]; + long[] bufSizes = new long[buffers.size()]; + + int idx = 0; + for (ArrowBuf buf : buffers) { + bufAddrs[idx++] = buf.memoryAddress(); + } + + idx = 0; + for (ArrowBuffer bufLayout : buffersLayout) { + bufSizes[idx++] = bufLayout.getSize(); + } + jniWrapper.nativeWriteNext(nativeInstanceId, numRows, bufAddrs, bufSizes); + } + + @Override + public void close() throws IOException { + jniWrapper.nativeCloseParquetWriter(nativeInstanceId); + } +} diff --git a/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriterJniWrapper.java b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriterJniWrapper.java new file mode 100644 index 00000000000..b46c45c2c0c --- /dev/null +++ b/java/adapter/parquet/src/main/java/org/apache/arrow/adapter/parquet/ParquetWriterJniWrapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import java.io.IOException; + +/** Wrapper for Parquet Writer native API. */ +public class ParquetWriterJniWrapper { + + /** Construct a Jni Instance. */ + public ParquetWriterJniWrapper() throws IOException { + ParquetJniUtils.getInstance(); + } + + /** + * Construct a parquet file reader over the target file name. + * + * @param path absolute file path of target file + * @param schemaBytes a byte array of Schema serialized output + * @return long id of the parquet writer instance + * @throws IOException throws exception in case of any io exception in native codes + */ + public native long nativeOpenParquetWriter(String path, byte[] schemaBytes); + + /** + * Close a parquet file writer. + * + * @param id parquet writer instance number + */ + public native void nativeCloseParquetWriter(long id); + + /** + * Write next record batch to parquet file writer. + * + * @param id parquet writer instance number + * @param numRows number of Rows in this batch + * @param bufAddrs a array of buffers address of this batch + * @param bufSizes a array of buffers size of this batch + * @throws IOException throws exception in case of any io exception in native codes + */ + public native void nativeWriteNext(long id, int numRows, long[] bufAddrs, long[] bufSizes); +} diff --git a/java/adapter/parquet/src/test/java/org/apache/arrow/adapter/parquet/ParquetReadWriteTest.java b/java/adapter/parquet/src/test/java/org/apache/arrow/adapter/parquet/ParquetReadWriteTest.java new file mode 100644 index 00000000000..34e4bf24b81 --- /dev/null +++ b/java/adapter/parquet/src/test/java/org/apache/arrow/adapter/parquet/ParquetReadWriteTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.parquet; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.Int; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import io.netty.buffer.ArrowBuf; + +public class ParquetReadWriteTest { + + @Rule public TemporaryFolder testFolder = new TemporaryFolder(); + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void teardown() { + allocator.close(); + } + + @Test + public void testParquetReadWrite() throws Exception { + + File testFile = testFolder.newFile("_tmpfile_ParquetWriterReaderTest"); + String path = testFile.getAbsolutePath(); + + int numColumns = 10; + int[] rowGroupIndices = {0}; + int[] columnIndices = new int[numColumns]; + ; + for (int i = 0; i < numColumns; i++) { + columnIndices[i] = i; + } + + Schema schema = + new Schema( + asList( + field("a", new Int(32, true)), + field("b", new Int(32, true)), + field("c", new Int(32, true)), + field("d", new Int(32, true)), + field("e", new Int(32, true)), + field("f", new Int(32, true)), + field("g", new Int(32, true)), + field("h", new Int(32, true)), + field("i", new Int(32, true)), + field("j", new Int(32, true)))); + + VectorSchemaRoot expectedSchemaRoot = VectorSchemaRoot.create(schema, allocator); + for (FieldVector vector : expectedSchemaRoot.getFieldVectors()) { + vector.allocateNew(); + IntVector intVector = (IntVector) vector; + for (int i = 0; i < 16; i++) { + intVector.set(i, i); + } + } + + ArrowRecordBatch batch = createArrowRecordBatch(expectedSchemaRoot); + ParquetWriter writer = new ParquetWriter(path, schema); + writer.writeNext(batch); + releaseArrowRecordBatch(batch); + writer.close(); + + ParquetReader reader = new ParquetReader(path, rowGroupIndices, columnIndices, 16, allocator); + + Schema readedSchema = reader.getSchema(); + assertEquals(schema.toJson(), readedSchema.toJson()); + + VectorSchemaRoot actualSchemaRoot = VectorSchemaRoot.create(readedSchema, allocator); + reader.readNextVectors(actualSchemaRoot); + + assertEquals(actualSchemaRoot.getRowCount(), expectedSchemaRoot.getRowCount()); + assertEquals(actualSchemaRoot.contentToTSVString(), expectedSchemaRoot.contentToTSVString()); + + actualSchemaRoot.close(); + expectedSchemaRoot.close(); + reader.close(); + testFile.delete(); + } + + private static Field field(String name, boolean nullable, ArrowType type, Field... children) { + return new Field(name, new FieldType(nullable, type, null, null), asList(children)); + } + + private static Field field(String name, ArrowType type, Field... children) { + return field(name, true, type, children); + } + + private ArrowBuf buf(byte[] bytes) { + ArrowBuf buffer = allocator.buffer(bytes.length); + buffer.writeBytes(bytes); + return buffer; + } + + private ArrowRecordBatch createArrowRecordBatch(VectorSchemaRoot root) { + List fieldNodes = new ArrayList(); + List inputData = new ArrayList(); + int numRowsInBatch = root.getRowCount(); + for (FieldVector inputVector : root.getFieldVectors()) { + fieldNodes.add(new ArrowFieldNode(numRowsInBatch, inputVector.getNullCount())); + inputData.add(inputVector.getValidityBuffer()); + inputData.add(inputVector.getDataBuffer()); + } + return new ArrowRecordBatch(numRowsInBatch, fieldNodes, inputData); + } + + private void releaseArrowRecordBatch(ArrowRecordBatch recordBatch) { + recordBatch.close(); + } +} diff --git a/java/pom.xml b/java/pom.xml index 9aaaefac4c8..a8103a42f43 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -693,7 +693,9 @@ arrow-jni + adapter/common adapter/orc + adapter/parquet gandiva