Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/jni/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@
if(ARROW_ORC)
add_subdirectory(orc)
endif()

if(ARROW_PARQUET)
add_subdirectory(parquet)
endif()
56 changes: 56 additions & 0 deletions cpp/src/jni/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#
# arrow_parquet_jni
#

project(arrow_parquet_jni)

cmake_minimum_required(VERSION 3.11)

find_package(JNI REQUIRED)

add_custom_target(arrow_parquet_jni)

set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated")

add_subdirectory(../../../../java/adapter/parquet ./java)

set(ARROW_BUILD_STATIC OFF)

add_arrow_lib(arrow_parquet_jni
BUILD_SHARED
SOURCES
jni_wrapper.cpp
hdfs_connector.cc
file_connector.cc
parquet_reader.cc
parquet_writer.cc
OUTPUTS
ARROW_PARQUET_JNI_LIBRARIES
SHARED_LINK_LIBS
parquet_shared
arrow_shared
EXTRA_INCLUDES
${JNI_HEADERS_DIR}
PRIVATE_INCLUDES
${JNI_INCLUDE_DIRS}
DEPENDENCIES
arrow_parquet_java)

add_dependencies(arrow_parquet_jni ${ARROW_PARQUET_JNI_LIBRARIES})
80 changes: 80 additions & 0 deletions cpp/src/jni/parquet/concurrent_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
*/

#ifndef JNI_ID_TO_MODULE_MAP_H
#define JNI_ID_TO_MODULE_MAP_H

#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>

#include "arrow/util/macros.h"

namespace arrow {
namespace jni {

/**
* An utility class that map module id to module pointers.
* @tparam Holder class of the object to hold.
*/
template <typename Holder>
class ConcurrentMap {
public:
ConcurrentMap() : module_id_(init_module_id_) {}

jlong Insert(Holder holder) {
std::lock_guard<std::mutex> lock(mtx_);
jlong result = module_id_++;
map_.insert(std::pair<jlong, Holder>(result, holder));
return result;
}

void Erase(jlong module_id) {
std::lock_guard<std::mutex> lock(mtx_);
map_.erase(module_id);
}

Holder Lookup(jlong module_id) {
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(module_id);
if (it != map_.end()) {
return it->second;
}
return NULLPTR;
}

void Clear() {
std::lock_guard<std::mutex> lock(mtx_);
map_.clear();
}

private:
// Initialize the module id starting value to a number greater than zero
// to allow for easier debugging of uninitialized java variables.
static constexpr int init_module_id_ = 4;

int64_t module_id_;
std::mutex mtx_;
// map from module ids returned to Java and module pointers
std::unordered_map<jlong, Holder> map_;
};

} // namespace jni
} // namespace arrow

#endif // JNI_ID_TO_MODULE_MAP_H
57 changes: 57 additions & 0 deletions cpp/src/jni/parquet/connector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef CONNECTOR_H
#define CONNECTOR_H

#include <arrow/array.h>
#include <arrow/buffer.h>
#include <arrow/io/interfaces.h>
#include <arrow/status.h>
#include <cstring>
#include <iostream>
#include <memory>
#include <string>

namespace jni {
namespace parquet {

/// \brief A base class for Connectors
///
/// This class is used by ParquetReader and ParquetWriter to handle
/// parquet files from different resources, local and hdfs for now using
/// unified API.
class Connector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide class documentation. for this class. It looks like this has nothing to do with Parquet, can it be moved someplace else (maybe under filesystem)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually how is this different then FileSystem (and for instance HdfsFileSystem (https://github.com/apache/arrow/blob/master/cpp/src/arrow/io/hdfs.h#L71)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield , the reason of keeping connector classes under jni/parquet is that, I want to make the connector interface unified regardless its connection type(hdfs or local), while I can't find an unified class under arrow/io or arrow/filesystem, for example, hdfsFileSystem are derived from Filesystem while openReadable under file.h are derived from RandomAccessFile, and LocalFileSystem under arrow/filesystem/localfs.h does not have OpenReadable interface.
Since I hope to limit my code change to existing module, so I am figuring as what I did now to add a small wrapper firstly to make the ParquetReader and ParquetWriter functioning, then I can open a new PR to move the connector classes to arrow/filesystem and make it as a baseFileSystem class? Do you think that makes sense to you?

public:
Connector() = default;
virtual ~Connector() {}
std::string GetFileName() { return file_path_; }
virtual ::arrow::Status OpenReadable(bool option) = 0;
virtual ::arrow::Status OpenWritable(bool option, int replication) = 0;
virtual std::shared_ptr<::arrow::io::RandomAccessFile> GetReader() = 0;
virtual std::shared_ptr<::arrow::io::OutputStream> GetWriter() = 0;
virtual void TearDown() = 0;

protected:
std::string file_path_;
std::string dir_path_;
virtual ::arrow::Status Mkdir(std::string path) = 0;
};
} // namespace parquet
} // namespace jni

#endif
92 changes: 92 additions & 0 deletions cpp/src/jni/parquet/file_connector.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "jni/parquet/file_connector.h"

#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <numeric>
#include <sstream> // IWYU pragma: keep
#include <string>
#include <vector>
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"

namespace jni {
namespace parquet {

FileConnector::FileConnector(std::string path) {
file_path_ = path;
auto path_pair = arrow::fs::internal::GetAbstractPathParent(file_path_);
dir_path_ = path_pair.first;
}

FileConnector::~FileConnector() {}

void FileConnector::TearDown() {
if (file_writer_) {
file_writer_->Close();
}
}

::arrow::Status FileConnector::OpenReadable(bool option) {
::arrow::Status msg = ::arrow::io::ReadableFile::Open(file_path_, &file_reader_);
if (!msg.ok()) {
std::cerr << "Open file failed, file name is " << file_path_ << ", error is : " << msg
<< std::endl;
return msg;
}
return msg;
}

::arrow::Status FileConnector::OpenWritable(bool option, int replication) {
::arrow::Status msg;
if (!dir_path_.empty()) {
msg = Mkdir(dir_path_);
if (!msg.ok()) {
std::cerr << "Mkdir for path failed " << dir_path_ << ", error is : " << msg
<< std::endl;
return msg;
}
}

msg = ::arrow::io::FileOutputStream::Open(file_path_, false, &file_writer_);
if (!msg.ok()) {
std::cerr << "Open file failed, file name is " << file_path_ << ", error is : " << msg
<< std::endl;
return msg;
}
return msg;
}

::arrow::Status FileConnector::Mkdir(std::string path) {
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs =
std::make_shared<::arrow::fs::LocalFileSystem>();
::arrow::Status msg = local_fs->CreateDir(path);
if (!msg.ok()) {
std::cerr << "Make Directory for " << path << " failed, error is: " << msg
<< std::endl;
return msg;
}
return msg;
}

} // namespace parquet
} // namespace jni
68 changes: 68 additions & 0 deletions cpp/src/jni/parquet/file_connector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef FILE_CONNECTOR_H
#define FILE_CONNECTOR_H

#include <arrow/io/file.h>
#include <memory>
#include <string>
#include "jni/parquet/connector.h"

namespace jni {
namespace parquet {

/// \brief A connector to handle local parquet file
///
/// This class is derived from base class "Connector", used to provide
/// an unified interface for ParquetReader and ParquetWriter to handle
/// files from different sources.
/// Member methods are wrappers of some methods under arrow/io/file.h
class FileConnector : public Connector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs documentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documentation added, thanks

public:
/// \brief Construction of this class
/// \param[in] path local file path
explicit FileConnector(std::string path);
~FileConnector();

/// \brief Open local parquet file as readable handler
/// \param[in] option a param holder, not used
::arrow::Status OpenReadable(bool option);

/// \brief Open local parquet file as writable handler
/// \param[in] option a param holder, not used
/// \param[in] replication a param holder, not used
::arrow::Status OpenWritable(bool option, int replication);

/// \brief Get reader handler
std::shared_ptr<::arrow::io::RandomAccessFile> GetReader() { return file_reader_; }

/// \brief Get writer handler
std::shared_ptr<::arrow::io::OutputStream> GetWriter() { return file_writer_; }

/// \brief Tear down connection and handlers
void TearDown();

protected:
::arrow::Status Mkdir(std::string path);
std::shared_ptr<::arrow::io::ReadableFile> file_reader_;
std::shared_ptr<::arrow::io::OutputStream> file_writer_;
};
} // namespace parquet
} // namespace jni

#endif
Loading