diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a60c2575f7a..a5893c89b8d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -286,6 +286,10 @@ if(ARROW_DATASET) set(ARROW_FILESYSTEM ON) endif() +if(ARROW_FILESYSTEM) + set(ARROW_WITH_URIPARSER ON) +endif() + if(MSVC) # ORC doesn't build on windows set(ARROW_ORC OFF) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 8f70ad7b0b4..8c7015e0639 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -265,8 +265,17 @@ endif() if(ARROW_FILESYSTEM) add_subdirectory(filesystem) + if(ARROW_HDFS) + add_definitions(-DARROW_HDFS) + endif() + + if(ARROW_S3) + add_definitions(-DARROW_S3) + endif() + list(APPEND ARROW_SRCS filesystem/filesystem.cc + filesystem/filesystem_utils.cc filesystem/localfs.cc filesystem/mockfs.cc filesystem/path_tree.cc @@ -277,6 +286,10 @@ if(ARROW_FILESYSTEM) list(APPEND ARROW_SRCS filesystem/s3fs.cc) endif() + if(ARROW_HDFS) + list(APPEND ARROW_SRCS filesystem/hdfs.cc) + endif() + list(APPEND ARROW_TESTING_SRCS filesystem/test_util.cc) endif() diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index efb78055a9a..8d4f65ac069 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -18,7 +18,12 @@ # Headers: top level arrow_install_all_headers("arrow/filesystem") +if(ARROW_S3) + add_definitions(-DARROW_S3) +endif() + add_arrow_test(filesystem_test) +add_arrow_test(filesystem_utils_test) add_arrow_test(localfs_test) add_arrow_test(path_tree_test) @@ -32,3 +37,7 @@ if(ARROW_S3) add_dependencies(arrow-tests arrow-s3fs-narrative-test) endif() endif() + +if(ARROW_HDFS) + add_arrow_test(hdfs_test) +endif() diff --git a/cpp/src/arrow/filesystem/api.h b/cpp/src/arrow/filesystem/api.h index fd8f566a78e..f224a7c9399 100644 --- a/cpp/src/arrow/filesystem/api.h +++ b/cpp/src/arrow/filesystem/api.h @@ -18,9 +18,11 @@ #ifndef ARROW_FILESYSTEM_API_H #define ARROW_FILESYSTEM_API_H -#include "arrow/filesystem/filesystem.h" // IWYU pragma: export -#include "arrow/filesystem/localfs.h" // IWYU pragma: export -#include "arrow/filesystem/mockfs.h" // IWYU pragma: export -#include "arrow/filesystem/s3fs.h" // IWYU pragma: export +#include "arrow/filesystem/filesystem.h" // IWYU pragma: export +#include "arrow/filesystem/filesystem_utils.h" // IWYU pragma: export +#include "arrow/filesystem/hdfs.h" // IWYU pragma: export +#include "arrow/filesystem/localfs.h" // IWYU pragma: export +#include "arrow/filesystem/mockfs.h" // IWYU pragma: export +#include "arrow/filesystem/s3fs.h" // IWYU pragma: export #endif // ARROW_FILESYSTEM_API_H diff --git a/cpp/src/arrow/filesystem/filesystem_utils.cc b/cpp/src/arrow/filesystem/filesystem_utils.cc new file mode 100644 index 00000000000..10697f69630 --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_utils.cc @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include "arrow/filesystem/filesystem_utils.h" +#ifdef ARROW_HDFS +#include "arrow/filesystem/hdfs.h" +#endif +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/result.h" +#include "arrow/util/uri.h" + +namespace arrow { + +namespace fs { + +enum class FileSystemType { HDFS, LOCAL, S3, UNKNOWN }; + +namespace { + +class PathInfo { + public: + PathInfo() {} + ~PathInfo() {} + + static Status Make(const std::string& full_path, std::shared_ptr* path_info) { + *path_info = std::make_shared(); + RETURN_NOT_OK((*path_info)->Init(full_path)); + return Status::OK(); + } + + Status Init(const std::string& full_path) { + RETURN_NOT_OK(ParseURI(full_path)); + return Status::OK(); + } + + FileSystemType GetFileSystemType() { return fs_type_; } + + std::string GetHostName() { + auto search = options_.find("host_name"); + if (search == options_.end()) { + return ""; + } + return search->second; + } + + int GetHostPort() { + auto search = options_.find("host_port"); + if (search == options_.end()) { + return -1; + } + std::string port_text = search->second; + return std::stoi(port_text); + } + + std::string GetUser() { + auto search = options_.find("user"); + if (search != options_.end()) { + return search->second; + } + return ""; + } + + bool GetIfUseHdfs3() { + auto search = options_.find("use_hdfs3"); + if (search != options_.end()) { + if (search->second.compare("1") == 0) { + return true; + } else { + return false; + } + } else { + return false; + } + } + + int GetRepsNum() { + auto search = options_.find("replication"); + if (search != options_.end()) { + if (search->second.empty()) { + return 3; + } else { + return std::stoi(search->second); + } + } else { + return 3; + } + } + + private: + FileSystemType GetFileSystemTypeFromString(const std::string& s) { + if (s == "hdfs") { + return FileSystemType::HDFS; + } + if (s == "http") { + return FileSystemType::S3; + } + if (s == "https") { + return FileSystemType::S3; + } + if (s == "file") { + return FileSystemType::LOCAL; + } + if (s.empty()) { + return FileSystemType::LOCAL; + } + return FileSystemType::UNKNOWN; + } + + Status ParseURI(const std::string& s) { + arrow::internal::Uri uri; + RETURN_NOT_OK(uri.Parse(s)); + fs_type_ = GetFileSystemTypeFromString(uri.scheme()); + switch (fs_type_) { + case FileSystemType::HDFS: + options_.emplace("host_name", uri.host()); + options_.emplace("host_port", uri.port_text()); + RETURN_NOT_OK(ParseOptions(&uri)); + break; + case FileSystemType::LOCAL: + RETURN_NOT_OK(ParseOptions(&uri)); + break; + case FileSystemType::S3: + return Status::NotImplemented("S3 is not supported yet."); + default: + break; + } + return Status::OK(); + } + + Status ParseOptions(arrow::internal::Uri* uri) { + ARROW_ASSIGN_OR_RAISE(auto options, uri->query_items()); + for (auto option : options) { + options_.emplace(option.first, option.second); + } + return Status::OK(); + } + + FileSystemType fs_type_; + std::unordered_map options_; +}; +} // namespace + +Status MakeFileSystem(const std::string& full_path, std::shared_ptr* fs) { + std::shared_ptr path_info; + RETURN_NOT_OK(PathInfo::Make(full_path, &path_info)); + FileSystemType fs_type = path_info->GetFileSystemType(); + + switch (fs_type) { +#ifdef ARROW_HDFS + case FileSystemType::HDFS: { + // Init Hdfs FileSystem + HdfsOptions hdfs_options; + hdfs_options.ConfigureEndPoint(path_info->GetHostName(), path_info->GetHostPort()); + hdfs_options.ConfigureHdfsDriver(path_info->GetIfUseHdfs3()); + hdfs_options.ConfigureHdfsReplication(path_info->GetRepsNum()); + hdfs_options.ConfigureHdfsUser(path_info->GetUser()); + + std::shared_ptr hdfs; + RETURN_NOT_OK(HadoopFileSystem::Make(hdfs_options, &hdfs)); + + *fs = hdfs; + } break; +#endif + case FileSystemType::LOCAL: { + auto local_fs = std::make_shared(); + *fs = local_fs; + } break; +#ifdef ARROW_S3 + case FileSystemType::S3: + return Status::NotImplemented("S3 is not supported yet."); +#endif + default: + return Status::NotImplemented("This type of filesystem is not supported yet."); + } + + return Status::OK(); +} +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_utils.h b/cpp/src/arrow/filesystem/filesystem_utils.h new file mode 100644 index 00000000000..0dd84953ad6 --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_utils.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/filesystem/filesystem.h" + +namespace arrow { + +namespace fs { + +/// \brief Creates a new FileSystem by path +/// +/// \param[in] full_path a URI-based path, ex: hdfs:///some/path?replication=3 +/// \param[out] fs FileSystemFactory instance. +/// \return Status +ARROW_EXPORT Status MakeFileSystem(const std::string& full_path, + std::shared_ptr* fs); +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_utils_test.cc b/cpp/src/arrow/filesystem/filesystem_utils_test.cc new file mode 100644 index 00000000000..a251dda6ccb --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_utils_test.cc @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/filesystem/filesystem_utils.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" +#include "arrow/util/uri.h" +#ifdef ARROW_HDFS +#include "arrow/io/hdfs.h" +#endif + +namespace arrow { +namespace fs { + +#define SOME_DATA_SIZE 9 + +struct CommonPathFormatter { + std::string operator()(const arrow::internal::PlatformFilename& fn) { + return fn.ToString(); + } +}; + +#ifndef _WIN32 +using PathFormatters = ::testing::Types; + +class TestFileSystem : public ::testing::Test { + public: + void SetUp() override {} + + void TestFileReadWrite() { + std::string data = "some data"; + // Test that the right location on disk is accessed + std::shared_ptr stream; + ASSERT_OK(fs_->OpenOutputStream(file_name_, &stream)); + auto data_size = static_cast(data.size()); + ASSERT_OK(stream->Write(data.data(), data_size)); + ASSERT_OK(stream->Close()); + + std::shared_ptr file; + ASSERT_OK(fs_->OpenInputFile(file_name_, &file)); + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(SOME_DATA_SIZE, file_size); + uint8_t buffer[SOME_DATA_SIZE]; + int64_t bytes_read = 0; + ASSERT_OK(file->Read(SOME_DATA_SIZE, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), SOME_DATA_SIZE)); + } + + std::string GetDirectoryName(const std::string& file_name) { + auto dir_file = internal::GetAbstractPathParent(file_name); + return dir_file.first; + } + + void TestCreateDir() { ASSERT_OK(fs_->CreateDir(GetDirectoryName(file_name_))); } + + void TestDeleteDir() { ASSERT_OK(fs_->DeleteDir(GetDirectoryName(file_name_))); } + + protected: + std::shared_ptr fs_; + std::string file_name_; +}; + +template +class TestLocalFileSystem : public TestFileSystem { + public: + void SetUp() override { + ASSERT_OK(arrow::internal::TemporaryDir::Make("test-localfs-", &temp_dir_)); + std::string full_path = PathFormatter()(temp_dir_->path()); + full_path.append("/AB/abc"); + arrow::internal::Uri uri; + ASSERT_OK(uri.Parse(full_path)); + file_name_ = uri.path(); + ASSERT_OK(MakeFileSystem(full_path, &fs_)); + } + + protected: + std::unique_ptr temp_dir_; +}; + +TYPED_TEST_CASE(TestLocalFileSystem, PathFormatters); + +TYPED_TEST(TestLocalFileSystem, MakeFileSystemLocal) { + this->TestCreateDir(); + this->TestFileReadWrite(); + this->TestDeleteDir(); +} +#endif + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc new file mode 100644 index 00000000000..67869f37044 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#ifdef _WIN32 +#include "arrow/util/windows_compatibility.h" +#else +#include +#include +#include +#include +#endif // _WIN32 + +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/hdfs.h" +#include "arrow/io/hdfs_internal.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace fs { + +class HadoopFileSystem::Impl { + public: + explicit Impl(HdfsOptions options) : options_(std::move(options)) {} + ~Impl() { auto status = Close(); } + + Status Init() { + io::internal::LibHdfsShim* driver_shim; + if (options_.hdfs_config.driver == io::HdfsDriver::LIBHDFS3) { + RETURN_NOT_OK(ConnectLibHdfs3(&driver_shim)); + } else { + RETURN_NOT_OK(ConnectLibHdfs(&driver_shim)); + } + RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.hdfs_config, &client_)); + return Status::OK(); + } + + Status Close() { + if (client_) { + RETURN_NOT_OK(client_->Disconnect()); + } + return Status::OK(); + } + + Status GetTargetStats(const std::string& path, FileStats* out) { + io::HdfsPathInfo info; + auto status = client_->GetPathInfo(path, &info); + out->set_path(path); + if (status.IsIOError()) { + out->set_type(FileType::NonExistent); + return Status::OK(); + } + + if (info.kind == io::ObjectType::DIRECTORY) { + out->set_type(FileType::Directory); + out->set_size(kNoSize); + } else if (info.kind == io::ObjectType::FILE) { + out->set_type(FileType::File); + out->set_size(info.size); + } + out->set_mtime(ToTimePoint(info.last_modified_time)); + return Status::OK(); + } + + Status StatSelector(std::string path, const Selector& select, int nesting_depth, + std::vector* out) { + FileStats stat; + RETURN_NOT_OK(GetTargetStats(path, &stat)); + if (stat.type() == FileType::NonExistent) { + if (select.allow_non_existent) { + return Status::OK(); + } else { + return Status::IOError("Directory does not exist: '", path, "'"); + } + } + + if (nesting_depth > 1 && + (!select.recursive || nesting_depth > select.max_recursion)) { + // if this selector dosn't support recursive or nesting_depth is over max_recursive, + // return without add this filestat into out. + return Status::OK(); + } + + if (stat.type() == FileType::Directory && path != select.base_dir) { + out->push_back(stat); + } else if (stat.type() == FileType::File) { + out->push_back(stat); + return Status::OK(); + } + + std::vector file_list; + RETURN_NOT_OK(client_->GetChildren(path, &file_list)); + for (auto file : file_list) { + // In hdfs case, file will be absolute path, trim here. + RETURN_NOT_OK(StatSelector(file, select, nesting_depth + 1, out)); + } + return Status::OK(); + } + + Status GetTargetStats(const Selector& select, std::vector* out) { + out->clear(); + FileStats stat; + RETURN_NOT_OK(GetTargetStats(select.base_dir, &stat)); + if (stat.type() == FileType::File) { + return Status::Invalid( + "GetTargetStates expects base_dir of selector to be a directory, while '", + select.base_dir, "' is a file"); + } + RETURN_NOT_OK(StatSelector(select.base_dir, select, 0, out)); + return Status::OK(); + } + + Status CreateDir(const std::string& path, bool recursive) { + if (client_->Exists(path)) { + return Status::OK(); + } + RETURN_NOT_OK(client_->MakeDirectory(path)); + return Status::OK(); + } + + Status DeleteDir(const std::string& path) { + if (!IsDirectory(path)) { + return Status::IOError("path is not a directory"); + } + if (!client_->Exists(path)) { + return Status::OK(); + } + RETURN_NOT_OK(client_->DeleteDirectory(path)); + return Status::OK(); + } + + Status DeleteDirContents(const std::string& path) { + if (!IsDirectory(path)) { + return Status::IOError("path is not a directory"); + } + std::vector file_list; + RETURN_NOT_OK(client_->GetChildren(path, &file_list)); + for (auto file : file_list) { + RETURN_NOT_OK(client_->Delete(file, true)); + } + return Status::OK(); + } + + Status DeleteFile(const std::string& path) { + if (IsDirectory(path)) { + return Status::IOError("path is a directory"); + } + RETURN_NOT_OK(client_->Delete(path)); + return Status::OK(); + } + + Status Move(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(client_->Rename(src, dest)); + return Status::OK(); + } + + Status CopyFile(const std::string& src, const std::string& dest) { + // TODO + return Status::NotImplemented("HadoopFileSystem::CopyFile is not supported yet"); + } + + Status OpenInputStream(const std::string& path, std::shared_ptr* out) { + std::shared_ptr file; + RETURN_NOT_OK(OpenInputFile(path, &file)); + *out = file; + return Status::OK(); + } + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenReadable(path, &file)); + *out = file; + return Status::OK(); + } + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + bool append = false; + return OpenOutputStreamGeneric(path, append, out); + } + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + bool append = true; + return OpenOutputStreamGeneric(path, append, out); + } + + protected: + HdfsOptions options_; + std::shared_ptr<::arrow::io::HadoopFileSystem> client_; + + Status OpenOutputStreamGeneric(const std::string& path, bool append, + std::shared_ptr* out) { + std::shared_ptr stream; + RETURN_NOT_OK(client_->OpenWritable(path, append, options_.buffer_size, + options_.replication, options_.default_block_size, + &stream)); + *out = stream; + return Status::OK(); + } + + bool IsDirectory(const std::string& path) { + io::HdfsPathInfo info; + Status status = client_->GetPathInfo(path, &info); + if (!status.ok()) { + return false; + } + if (info.kind == io::ObjectType::DIRECTORY) { + return true; + } + return false; + } + + TimePoint ToTimePoint(int secs) { + std::chrono::nanoseconds ns_count(static_cast(secs) * 1000000000); + return TimePoint(std::chrono::duration_cast(ns_count)); + } +}; + +void HdfsOptions::ConfigureEndPoint(const std::string& host, int port) { + hdfs_config.host = host; + hdfs_config.port = port; +} + +void HdfsOptions::ConfigureHdfsDriver(bool use_hdfs3) { + if (use_hdfs3) { + hdfs_config.driver = ::arrow::io::HdfsDriver::LIBHDFS3; + } else { + hdfs_config.driver = ::arrow::io::HdfsDriver::LIBHDFS; + } +} + +void HdfsOptions::ConfigureHdfsUser(const std::string& user_name) { + if (!user_name.empty()) { + hdfs_config.user = user_name; + } +} + +void HdfsOptions::ConfigureHdfsReplication(int16_t replication) { + this->replication = replication; +} + +void HdfsOptions::ConfigureHdfsBufferSize(int32_t buffer_size) { + this->buffer_size = buffer_size; +} + +void HdfsOptions::ConfigureHdfsBlockSize(int64_t default_block_size) { + this->default_block_size = default_block_size; +} + +HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options) + : impl_(new Impl{options}) {} + +HadoopFileSystem::~HadoopFileSystem() {} + +Status HadoopFileSystem::Make(const HdfsOptions& options, + std::shared_ptr* out) { + std::shared_ptr ptr(new HadoopFileSystem(options)); + RETURN_NOT_OK(ptr->impl_->Init()); + *out = std::move(ptr); + return Status::OK(); +} + +Status HadoopFileSystem::GetTargetStats(const std::string& path, FileStats* out) { + return impl_->GetTargetStats(path, out); +} + +Status HadoopFileSystem::GetTargetStats(const Selector& select, + std::vector* out) { + return impl_->GetTargetStats(select, out); +} + +Status HadoopFileSystem::CreateDir(const std::string& path, bool recursive) { + return impl_->CreateDir(path, recursive); +} + +Status HadoopFileSystem::DeleteDir(const std::string& path) { + return impl_->DeleteDir(path); +} + +Status HadoopFileSystem::DeleteDirContents(const std::string& path) { + return impl_->DeleteDirContents(path); +} + +Status HadoopFileSystem::DeleteFile(const std::string& path) { + return impl_->DeleteFile(path); +} + +Status HadoopFileSystem::Move(const std::string& src, const std::string& dest) { + return impl_->Move(src, dest); +} + +Status HadoopFileSystem::CopyFile(const std::string& src, const std::string& dest) { + return impl_->CopyFile(src, dest); +} + +Status HadoopFileSystem::OpenInputStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenInputStream(path, out); +} + +Status HadoopFileSystem::OpenInputFile(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenInputFile(path, out); +} + +Status HadoopFileSystem::OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenOutputStream(path, out); +} + +Status HadoopFileSystem::OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenAppendStream(path, out); +} + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h new file mode 100644 index 00000000000..0f4753590c4 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/hdfs.h" + +namespace arrow { +namespace fs { + +/// Options for the HDFS implementation. +struct ARROW_EXPORT HdfsOptions { + HdfsOptions() = default; + ~HdfsOptions() = default; + + /// Hdfs configuration options, contains host, port, driver + io::HdfsConnectionConfig hdfs_config; + + /// Used by Hdfs OpenWritable Interface. + int32_t buffer_size = 0; + int16_t replication = 0; + int64_t default_block_size = 0; + + void ConfigureEndPoint(const std::string& host, int port); + /// Be cautious that libhdfs3 is a unmaintained project + void ConfigureHdfsDriver(bool use_hdfs3); + void ConfigureHdfsReplication(int16_t replication); + void ConfigureHdfsUser(const std::string& user_name); + void ConfigureHdfsBufferSize(int32_t buffer_size); + void ConfigureHdfsBlockSize(int64_t default_block_siz); +}; + +/// HDFS-backed FileSystem implementation. +/// +/// implementation notes: +/// - This is a wrapper of arrow/io/hdfs, so we can use FileSystem API to handle hdfs. +class ARROW_EXPORT HadoopFileSystem : public FileSystem { + public: + ~HadoopFileSystem() override; + + /// \cond FALSE + using FileSystem::GetTargetStats; + /// \endcond + Status GetTargetStats(const std::string& path, FileStats* out) override; + Status GetTargetStats(const Selector& select, std::vector* out) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + + Status DeleteDirContents(const std::string& path) override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Status OpenInputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) override; + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) override; + + /// Create a HdfsFileSystem instance from the given options. + static Status Make(const HdfsOptions& options, std::shared_ptr* out); + + protected: + explicit HadoopFileSystem(const HdfsOptions& options); + + class Impl; + std::unique_ptr impl_; +}; + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc new file mode 100644 index 00000000000..82e67e359cd --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" + +namespace arrow { +namespace fs { +namespace internal { + +#define SOME_DATA_SIZE 9 + +using HdfsDriver = arrow::io::HdfsDriver; + +struct JNIDriver { + static HdfsDriver type; +}; + +struct PivotalDriver { + static HdfsDriver type; +}; + +template +class TestHadoopFileSystem : public ::testing::Test { + public: + void SetUp() override { + const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); + const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); + const char* user = std::getenv("ARROW_HDFS_TEST_USER"); + + std::string hdfs_host = host == nullptr ? "localhost" : std::string(host); + int hdfs_port = port == nullptr ? 20500 : atoi(port); + std::string hdfs_user = user == nullptr ? "root" : std::string(user); + + bool use_hdfs3; + if (DRIVER::type == HdfsDriver::LIBHDFS) { + use_hdfs3 = false; + } else { + use_hdfs3 = true; + } + + HdfsOptions hdfs_options; + hdfs_options.ConfigureEndPoint(hdfs_host, hdfs_port); + hdfs_options.ConfigureHdfsUser(hdfs_user); + hdfs_options.ConfigureHdfsDriver(use_hdfs3); + hdfs_options.ConfigureHdfsReplication(0); + + std::shared_ptr hdfs; + auto status = HadoopFileSystem::Make(hdfs_options, &hdfs); + if (!status.ok()) { + ARROW_LOG(INFO) + << "HadoopFileSystem::Make failed, it is possible when we don't have " + "proper driver on this node, err msg is " + << status.message(); + loaded_driver_ = false; + return; + } + loaded_driver_ = true; + fs_ = std::make_shared("", hdfs); + } + + protected: + std::shared_ptr fs_; + bool loaded_driver_ = false; +}; + +HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; +HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; + +typedef ::testing::Types DriverTypes; + +TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); + +#define SKIP_IF_NO_DRIVER() \ + if (!this->loaded_driver_) { \ + ARROW_LOG(INFO) << "Driver not loaded, skipping"; \ + return; \ + } + +TYPED_TEST(TestHadoopFileSystem, CreateAndDeleteDir) { + SKIP_IF_NO_DRIVER(); + + FileStats stat; + std::string directory_name = "/AB"; + ASSERT_OK(this->fs_->CreateDir(directory_name)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name, &stat)); + AssertFileStats(stat, directory_name, FileType::Directory); + + ASSERT_OK(this->fs_->DeleteDir(directory_name)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name, &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); +} + +TYPED_TEST(TestHadoopFileSystem, WriteReadFile) { + SKIP_IF_NO_DRIVER(); + + ASSERT_OK(this->fs_->CreateDir("/CD")); + constexpr int kDataSize = 9; + std::string file_name = "/CD/abc"; + std::string data = "some data"; + std::shared_ptr stream; + ASSERT_OK(this->fs_->OpenOutputStream(file_name, &stream)); + auto data_size = static_cast(data.size()); + ASSERT_OK(stream->Write(data.data(), data_size)); + ASSERT_OK(stream->Close()); + + std::shared_ptr file; + ASSERT_OK(this->fs_->OpenInputFile(file_name, &file)); + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(kDataSize, file_size); + uint8_t buffer[kDataSize]; + int64_t bytes_read = 0; + ASSERT_OK(file->Read(kDataSize, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), kDataSize)); +} + +TYPED_TEST(TestHadoopFileSystem, GetTargetStats) { + SKIP_IF_NO_DRIVER(); + + std::vector stats; + + ASSERT_OK(this->fs_->CreateDir("/AB")); + ASSERT_OK(this->fs_->CreateDir("/AB/CD")); + ASSERT_OK(this->fs_->CreateDir("/AB/EF")); + ASSERT_OK(this->fs_->CreateDir("/AB/EF/GH")); + + std::shared_ptr stream; + ASSERT_OK(this->fs_->OpenOutputStream("/AB/data", &stream)); + + Selector selector; + selector.base_dir = "/AB"; + selector.recursive = false; + + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], "/AB/CD", FileType::Directory); + AssertFileStats(stats[1], "/AB/EF", FileType::Directory); + AssertFileStats(stats[2], "/AB/data", FileType::File); + + selector.recursive = true; + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 4); + AssertFileStats(stats[0], "/AB/CD", FileType::Directory); + AssertFileStats(stats[1], "/AB/EF", FileType::Directory); + AssertFileStats(stats[2], "/AB/EF/GH", FileType::Directory); + AssertFileStats(stats[3], "/AB/data", FileType::File); + + selector.max_recursion = 1; + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], "/AB/CD", FileType::Directory); + AssertFileStats(stats[1], "/AB/EF", FileType::Directory); + AssertFileStats(stats[2], "/AB/data", FileType::File); + + selector.base_dir = "XYZ"; + selector.allow_non_existent = true; + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 0); + + ASSERT_OK(stream->Close()); + ASSERT_OK(this->fs_->DeleteDirContents("/AB")); + ASSERT_OK(this->fs_->DeleteDir("/AB")); + FileStats stat; + ASSERT_OK(this->fs_->GetTargetStats("/AB", &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); +} + +TYPED_TEST(TestHadoopFileSystem, MoveDir) { + SKIP_IF_NO_DRIVER(); + + FileStats stat; + std::string directory_name_src = "/AB"; + std::string directory_name_dest = "/CD"; + ASSERT_OK(this->fs_->CreateDir(directory_name_src)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name_src, &stat)); + AssertFileStats(stat, directory_name_src, FileType::Directory); + + // move file + ASSERT_OK(this->fs_->Move(directory_name_src, directory_name_dest)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name_src, &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); + + ASSERT_OK(this->fs_->GetTargetStats(directory_name_dest, &stat)); + AssertFileStats(stat, directory_name_dest, FileType::Directory); + ASSERT_OK(this->fs_->DeleteDir(directory_name_dest)); +} + +} // namespace internal +} // namespace fs +} // namespace arrow