From 64983b8e4dd0349c97dddde8f463ab07e9abc7e7 Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Mon, 28 Oct 2019 16:18:47 +0800 Subject: [PATCH 1/3] [C++]Add filesystem_utils_test and hdfs_test Signed-off-by: Chendi Xue --- cpp/src/arrow/filesystem/CMakeLists.txt | 13 ++ .../arrow/filesystem/filesystem_utils_test.cc | 116 ++++++++++++ cpp/src/arrow/filesystem/hdfs_test.cc | 169 ++++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 cpp/src/arrow/filesystem/filesystem_utils_test.cc create mode 100644 cpp/src/arrow/filesystem/hdfs_test.cc diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index efb78055a9a..fbd86fcd4d5 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -18,7 +18,16 @@ # Headers: top level arrow_install_all_headers("arrow/filesystem") +if(ARROW_HDFS) + add_definitions(-DARROW_HDFS) +endif() + +if(ARROW_S3) + add_definitions(-DARROW_S3) +endif() + add_arrow_test(filesystem_test) +add_arrow_test(filesystem_utils_test) add_arrow_test(localfs_test) add_arrow_test(path_tree_test) @@ -32,3 +41,7 @@ if(ARROW_S3) add_dependencies(arrow-tests arrow-s3fs-narrative-test) endif() endif() + +if(ARROW_HDFS) + add_arrow_test(hdfs_test) +endif() diff --git a/cpp/src/arrow/filesystem/filesystem_utils_test.cc b/cpp/src/arrow/filesystem/filesystem_utils_test.cc new file mode 100644 index 00000000000..211ce9aa8aa --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_utils_test.cc @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/filesystem/filesystem_utils.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" +#include "arrow/util/uri.h" +#ifdef ARROW_HDFS +#include "arrow/io/hdfs.h" +#endif + +namespace arrow { +namespace fs { + +#define SOME_DATA_SIZE 9 + +struct CommonPathFormatter { + std::string operator()(const arrow::internal::PlatformFilename& fn) { + return fn.ToString(); + } +}; + +#ifndef _WIN32 +using PathFormatters = ::testing::Types; + +class TestFileSystem : public ::testing::Test { + public: + void SetUp() override {} + + void TestFileReadWrite() { + std::string data = "some data"; + // Test that the right location on disk is accessed + std::shared_ptr stream; + ASSERT_OK(fs_->OpenOutputStream(file_name_, &stream)); + auto data_size = static_cast(data.size()); + ASSERT_OK(stream->Write(data.data(), data_size)); + ASSERT_OK(stream->Close()); + + std::shared_ptr file; + ASSERT_OK(fs_->OpenInputFile(file_name_, &file)); + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(SOME_DATA_SIZE, file_size); + uint8_t buffer[SOME_DATA_SIZE]; + int64_t bytes_read = 0; + ASSERT_OK(file->Read(SOME_DATA_SIZE, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), SOME_DATA_SIZE)); + } + + std::string GetDirectoryName(const std::string& file_name) { + auto dir_file = internal::GetAbstractPathParent(file_name); + return dir_file.first; + } + + void TestCreateDir() { ASSERT_OK(fs_->CreateDir(GetDirectoryName(file_name_))); } + + void TestDeleteDir() { ASSERT_OK(fs_->DeleteDir(GetDirectoryName(file_name_))); } + + protected: + std::shared_ptr fs_; + std::string file_name_; +}; + +template +class TestLocalFileSystem : public TestFileSystem { + public: + void SetUp() override { + ASSERT_OK(arrow::internal::TemporaryDir::Make("test-localfs-", &temp_dir_)); + std::string full_path = PathFormatter()(temp_dir_->path()); + full_path.append("/AB/abc"); + arrow::internal::Uri uri; + ASSERT_OK(uri.Parse(full_path)); + file_name_ = uri.path(); + ASSERT_OK(factory::MakeFileSystem(full_path, &fs_)); + } + + protected: + std::unique_ptr temp_dir_; +}; + +TYPED_TEST_CASE(TestLocalFileSystem, PathFormatters); + +TYPED_TEST(TestLocalFileSystem, MakeFileSystemLocal) { + this->TestCreateDir(); + this->TestFileReadWrite(); + this->TestDeleteDir(); +} +#endif + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc new file mode 100644 index 00000000000..8e4dff81421 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" + +namespace arrow { +namespace fs { +namespace internal { + +#ifndef _WIN32 + +#define SOME_DATA_SIZE 9 + +using HdfsDriver = arrow::io::HdfsDriver; + +struct JNIDriver { + static HdfsDriver type; +}; + +struct PivotalDriver { + static HdfsDriver type; +}; + +template +class TestHadoopFileSystem : public ::testing::Test { + public: + void SetUp() override { + const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); + const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); + const char* user = std::getenv("ARROW_HDFS_TEST_USER"); + + std::string hdfs_host = host == nullptr ? "localhost" : std::string(host); + int hdfs_port = port == nullptr ? 20500 : atoi(port); + std::string hdfs_user = user == nullptr ? "root" : std::string(user); + + bool use_hdfs3; + if (DRIVER::type == HdfsDriver::LIBHDFS) { + use_hdfs3 = false; + } else { + use_hdfs3 = true; + } + + HdfsOptions hdfs_options; + hdfs_options.ConfigureEndPoint(hdfs_host, hdfs_port); + hdfs_options.ConfigureHdfsUser(hdfs_user); + hdfs_options.ConfigureHdfsDriver(use_hdfs3); + hdfs_options.ConfigureHdfsReplication(0); + + std::shared_ptr hdfs; + auto status = HadoopFileSystem::Make(hdfs_options, &hdfs); + if (!status.ok()) { + loaded_driver_ = false; + return; + } + loaded_driver_ = true; + fs_ = std::make_shared("", hdfs); + } + + protected: + std::shared_ptr fs_; + bool loaded_driver_ = false; +}; + +HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; +HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; + +typedef ::testing::Types DriverTypes; + +TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); + +#define SKIP_IF_NO_DRIVER() \ + if (!this->loaded_driver_) { \ + std::cout << "Driver not loaded, skipping" << std::endl; \ + return; \ + } + +TYPED_TEST(TestHadoopFileSystem, CreateDir) { + SKIP_IF_NO_DRIVER(); + + std::string directory_name = "AB"; + ASSERT_OK(this->fs_->CreateDir(directory_name)); +} + +TYPED_TEST(TestHadoopFileSystem, GetTargetStats) { + SKIP_IF_NO_DRIVER(); + + std::string directory_name = "AB"; + FileStats stat; + ASSERT_OK(this->fs_->GetTargetStats(directory_name, &stat)); + AssertFileStats(stat, "AB", FileType::Directory); +} + +TYPED_TEST(TestHadoopFileSystem, MoveDir) { + SKIP_IF_NO_DRIVER(); + + std::string directory_name_src = "AB"; + std::string directory_name_dest = "CD"; + ASSERT_OK(this->fs_->Move(directory_name_src, directory_name_dest)); + FileStats stat; + ASSERT_OK(this->fs_->GetTargetStats(directory_name_dest, &stat)); + AssertFileStats(stat, "CD", FileType::Directory); +} + +TYPED_TEST(TestHadoopFileSystem, WriteReadFile) { + SKIP_IF_NO_DRIVER(); + + std::string file_name = "CD/abc"; + std::string data = "some data"; + std::shared_ptr stream; + ASSERT_OK(this->fs_->OpenOutputStream(file_name, &stream)); + auto data_size = static_cast(data.size()); + ASSERT_OK(stream->Write(data.data(), data_size)); + ASSERT_OK(stream->Close()); + + std::shared_ptr file; + ASSERT_OK(this->fs_->OpenInputFile(file_name, &file)); + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(SOME_DATA_SIZE, file_size); + uint8_t buffer[SOME_DATA_SIZE]; + int64_t bytes_read = 0; + ASSERT_OK(file->Read(SOME_DATA_SIZE, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), SOME_DATA_SIZE)); +} + +TYPED_TEST(TestHadoopFileSystem, DeleteFile) { + SKIP_IF_NO_DRIVER(); + + std::string file_name = "CD/abc"; + ASSERT_OK(this->fs_->DeleteFile(file_name)); + FileStats stat; + ASSERT_TRUE(Status::OK() != this->fs_->GetTargetStats(file_name, &stat)); +} + +TYPED_TEST(TestHadoopFileSystem, DeleteDir) { + SKIP_IF_NO_DRIVER(); + + std::string directory_name = "CD"; + ASSERT_OK(this->fs_->DeleteDir(directory_name)); + FileStats stat; + ASSERT_TRUE(Status::OK() != this->fs_->GetTargetStats(directory_name, &stat)); +} +#endif // _WIN32 + +} // namespace internal +} // namespace fs +} // namespace arrow From ff5aa7009bdbb67c5b8c31010c2f9d3af7c95997 Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Wed, 6 Nov 2019 13:04:34 +0800 Subject: [PATCH 2/3] [C++]Add Filesystem_utils and HadoopFileSystem 1. Make MakeFileSystem as a namespace function 2. Use uri.h to parse path string Signed-off-by: Chendi Xue --- cpp/CMakeLists.txt | 4 + cpp/src/arrow/CMakeLists.txt | 13 + cpp/src/arrow/filesystem/api.h | 10 +- cpp/src/arrow/filesystem/filesystem_utils.cc | 203 +++++++++++++ cpp/src/arrow/filesystem/filesystem_utils.h | 40 +++ cpp/src/arrow/filesystem/hdfs.cc | 298 +++++++++++++++++++ cpp/src/arrow/filesystem/hdfs.h | 105 +++++++ 7 files changed, 669 insertions(+), 4 deletions(-) create mode 100644 cpp/src/arrow/filesystem/filesystem_utils.cc create mode 100644 cpp/src/arrow/filesystem/filesystem_utils.h create mode 100644 cpp/src/arrow/filesystem/hdfs.cc create mode 100644 cpp/src/arrow/filesystem/hdfs.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a60c2575f7a..a5893c89b8d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -286,6 +286,10 @@ if(ARROW_DATASET) set(ARROW_FILESYSTEM ON) endif() +if(ARROW_FILESYSTEM) + set(ARROW_WITH_URIPARSER ON) +endif() + if(MSVC) # ORC doesn't build on windows set(ARROW_ORC OFF) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 8f70ad7b0b4..8c7015e0639 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -265,8 +265,17 @@ endif() if(ARROW_FILESYSTEM) add_subdirectory(filesystem) + if(ARROW_HDFS) + add_definitions(-DARROW_HDFS) + endif() + + if(ARROW_S3) + add_definitions(-DARROW_S3) + endif() + list(APPEND ARROW_SRCS filesystem/filesystem.cc + filesystem/filesystem_utils.cc filesystem/localfs.cc filesystem/mockfs.cc filesystem/path_tree.cc @@ -277,6 +286,10 @@ if(ARROW_FILESYSTEM) list(APPEND ARROW_SRCS filesystem/s3fs.cc) endif() + if(ARROW_HDFS) + list(APPEND ARROW_SRCS filesystem/hdfs.cc) + endif() + list(APPEND ARROW_TESTING_SRCS filesystem/test_util.cc) endif() diff --git a/cpp/src/arrow/filesystem/api.h b/cpp/src/arrow/filesystem/api.h index fd8f566a78e..f224a7c9399 100644 --- a/cpp/src/arrow/filesystem/api.h +++ b/cpp/src/arrow/filesystem/api.h @@ -18,9 +18,11 @@ #ifndef ARROW_FILESYSTEM_API_H #define ARROW_FILESYSTEM_API_H -#include "arrow/filesystem/filesystem.h" // IWYU pragma: export -#include "arrow/filesystem/localfs.h" // IWYU pragma: export -#include "arrow/filesystem/mockfs.h" // IWYU pragma: export -#include "arrow/filesystem/s3fs.h" // IWYU pragma: export +#include "arrow/filesystem/filesystem.h" // IWYU pragma: export +#include "arrow/filesystem/filesystem_utils.h" // IWYU pragma: export +#include "arrow/filesystem/hdfs.h" // IWYU pragma: export +#include "arrow/filesystem/localfs.h" // IWYU pragma: export +#include "arrow/filesystem/mockfs.h" // IWYU pragma: export +#include "arrow/filesystem/s3fs.h" // IWYU pragma: export #endif // ARROW_FILESYSTEM_API_H diff --git a/cpp/src/arrow/filesystem/filesystem_utils.cc b/cpp/src/arrow/filesystem/filesystem_utils.cc new file mode 100644 index 00000000000..cc02bc07ff2 --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_utils.cc @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include "arrow/filesystem/filesystem_utils.h" +#ifdef ARROW_HDFS +#include "arrow/filesystem/hdfs.h" +#endif +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/util/uri.h" + +namespace arrow { + +namespace fs { + +namespace factory { + +namespace internal { + +class PathInfo { + public: + PathInfo() {} + ~PathInfo() {} + + static Status Make(const std::string& full_path, std::shared_ptr* path_info) { + *path_info = std::make_shared(); + RETURN_NOT_OK((*path_info)->Init(full_path)); + return Status::OK(); + } + + Status Init(const std::string& full_path) { + RETURN_NOT_OK(ParseURI(full_path)); + full_path_ = std::string(full_path); + return Status::OK(); + } + + FileSystemType GetFileSystemType() { return fs_type_; } + + std::string GetHostName() { + auto search = options_.find("host_name"); + if (search == options_.end()) { + return ""; + } + return search->second; + } + + int GetHostPort() { + auto search = options_.find("host_port"); + if (search == options_.end()) { + return -1; + } + std::string port_text = search->second; + return std::stoi(port_text); + } + + std::string GetUser() { + auto search = options_.find("user"); + if (search != options_.end()) { + return search->second; + } + return ""; + } + + bool GetIfUseHdfs3() { + auto search = options_.find("use_hdfs3"); + if (search != options_.end()) { + if (search->second.compare("1") == 0) { + return true; + } else { + return false; + } + } else { + return false; + } + } + + int GetRepsNum() { + auto search = options_.find("replication"); + if (search != options_.end()) { + if (search->second.empty()) { + return 3; + } else { + return std::stoi(search->second); + } + } else { + return 3; + } + } + + private: + FileSystemType GetFileSystemTypeFromString(const std::string& s) { + if (s == "hdfs") { + return FileSystemType::HDFS; + } + if (s == "http") { + return FileSystemType::S3; + } + if (s == "https") { + return FileSystemType::S3; + } + if (s == "file") { + return FileSystemType::LOCAL; + } + if (s.empty()) { + return FileSystemType::LOCAL; + } + return FileSystemType::UNKNOWN; + } + + Status ParseURI(const std::string& s) { + arrow::internal::Uri uri; + RETURN_NOT_OK(uri.Parse(s)); + fs_type_ = GetFileSystemTypeFromString(uri.scheme()); + switch (fs_type_) { + case FileSystemType::HDFS: + options_.emplace("host_name", uri.host()); + options_.emplace("host_port", uri.port_text()); + RETURN_NOT_OK(ParseOptions(&uri)); + break; + case FileSystemType::LOCAL: + RETURN_NOT_OK(ParseOptions(&uri)); + break; + case FileSystemType::S3: + return Status::NotImplemented("S3 is not supported yet."); + default: + break; + } + return Status::OK(); + } + + Status ParseOptions(arrow::internal::Uri* uri) { + auto options = uri->query_items(); + for (auto option : *options) { + options_.emplace(option.first, option.second); + } + return Status::OK(); + } + + FileSystemType fs_type_; + std::string full_path_; + std::unordered_map options_; +}; +} // namespace internal + +Status MakeFileSystem(const std::string& full_path, std::shared_ptr* fs) { + std::shared_ptr path_info; + RETURN_NOT_OK(internal::PathInfo::Make(full_path, &path_info)); + FileSystemType fs_type = path_info->GetFileSystemType(); + + switch (fs_type) { +#ifdef ARROW_HDFS + case FileSystemType::HDFS: { + // Init Hdfs FileSystem + HdfsOptions hdfs_options; + hdfs_options.ConfigureEndPoint(path_info->GetHostName(), path_info->GetHostPort()); + hdfs_options.ConfigureHdfsDriver(path_info->GetIfUseHdfs3()); + hdfs_options.ConfigureHdfsReplication(path_info->GetRepsNum()); + hdfs_options.ConfigureHdfsUser(path_info->GetUser()); + + std::shared_ptr hdfs; + RETURN_NOT_OK(HadoopFileSystem::Make(hdfs_options, &hdfs)); + + *fs = std::make_shared("", hdfs); + } break; +#endif + case FileSystemType::LOCAL: { + auto local_fs = std::make_shared(); + *fs = std::make_shared("", local_fs); + } break; +#ifdef ARROW_S3 + case FileSystemType::S3: + return Status::NotImplemented("S3 is not supported yet."); +#endif + default: + return Status::NotImplemented("This type of filesystem is not supported yet."); + } + + return Status::OK(); +} +} // namespace factory + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_utils.h b/cpp/src/arrow/filesystem/filesystem_utils.h new file mode 100644 index 00000000000..9e3aef9f77f --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_utils.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include "arrow/filesystem/filesystem.h" + +namespace arrow { + +namespace fs { + +enum class FileSystemType { HDFS, LOCAL, S3, UNKNOWN }; + +namespace factory { +/// \brief Creates a new FileSystem by path +/// +/// \param[in] full_path a URI-based path, ex: hdfs:///some/path?replication=3 +/// \param[out] fs FileSystemFactory instance. +/// \return Status +ARROW_EXPORT Status MakeFileSystem(const std::string& full_path, + std::shared_ptr* fs); +} // namespace factory +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc new file mode 100644 index 00000000000..0accd667f84 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#ifdef _WIN32 +#include "arrow/util/windows_compatibility.h" +#else +#include +#include +#include +#include +#endif // _WIN32 + +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/hdfs.h" +#include "arrow/io/hdfs_internal.h" +#include "arrow/util/io_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace fs { + +class HadoopFileSystem::Impl { + public: + explicit Impl(HdfsOptions options) : options_(std::move(options)) {} + ~Impl() { auto status = Close(); } + + Status Init() { + io::internal::LibHdfsShim* driver_shim; + if (options_.hdfs_config.driver == io::HdfsDriver::LIBHDFS3) { + RETURN_NOT_OK(ConnectLibHdfs3(&driver_shim)); + } else { + RETURN_NOT_OK(ConnectLibHdfs(&driver_shim)); + } + RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.hdfs_config, &client_)); + return Status::OK(); + } + + Status Close() { + if (client_) { + RETURN_NOT_OK(client_->Disconnect()); + } + return Status::OK(); + } + + Status GetTargetStats(const std::string& path, FileStats* out) { + io::HdfsPathInfo info; + RETURN_NOT_OK(client_->GetPathInfo(path, &info)); + + out->set_path(path); + if (info.kind == io::ObjectType::DIRECTORY) { + out->set_type(FileType::Directory); + out->set_size(kNoSize); + } else { + out->set_type(FileType::File); + out->set_size(info.size); + } + out->set_mtime(ToTimePoint(info.last_modified_time)); + return Status::OK(); + } + + Status GetTargetStats(const Selector& select, std::vector* out) { + std::vector file_list; + RETURN_NOT_OK(client_->GetChildren(select.base_dir, &file_list)); + for (auto file : file_list) { + FileStats stat; + RETURN_NOT_OK(GetTargetStats(file, &stat)); + out->push_back(stat); + } + return Status::OK(); + } + + Status CreateDir(const std::string& path, bool recursive) { + if (client_->Exists(path)) { + return Status::OK(); + } + RETURN_NOT_OK(client_->MakeDirectory(path)); + return Status::OK(); + } + + Status DeleteDir(const std::string& path) { + if (!IsDirectory(path)) { + return Status::IOError("path is not a directory"); + } + if (!client_->Exists(path)) { + return Status::OK(); + } + RETURN_NOT_OK(client_->DeleteDirectory(path)); + return Status::OK(); + } + + Status DeleteDirContents(const std::string& path) { + if (!IsDirectory(path)) { + return Status::IOError("path is not a directory"); + } + std::vector file_list; + RETURN_NOT_OK(client_->GetChildren(path, &file_list)); + for (auto file : file_list) { + RETURN_NOT_OK(client_->Delete(path + file)); + } + return Status::OK(); + } + + Status DeleteFile(const std::string& path) { + if (IsDirectory(path)) { + return Status::IOError("path is a directory"); + } + RETURN_NOT_OK(client_->Delete(path)); + return Status::OK(); + } + + Status Move(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(client_->Rename(src, dest)); + return Status::OK(); + } + + Status CopyFile(const std::string& src, const std::string& dest) { + // TODO + return Status::NotImplemented("HadoopFileSystem::CopyFile is not supported yet"); + } + + Status OpenInputStream(const std::string& path, std::shared_ptr* out) { + // TODO + return Status::NotImplemented( + "HadoopFileSystem::OpenInputStream is not supported yet"); + } + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenReadable(path, &file)); + *out = file; + return Status::OK(); + } + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + bool append = false; + return OpenOutputStreamGeneric(path, append, out); + } + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + bool append = true; + return OpenOutputStreamGeneric(path, append, out); + } + + protected: + HdfsOptions options_; + std::shared_ptr<::arrow::io::HadoopFileSystem> client_; + + Status OpenOutputStreamGeneric(const std::string& path, bool append, + std::shared_ptr* out) { + std::shared_ptr stream; + RETURN_NOT_OK(client_->OpenWritable(path, append, options_.buffer_size, + options_.replication, options_.default_block_size, + &stream)); + *out = stream; + return Status::OK(); + } + + bool IsDirectory(const std::string& path) { + io::HdfsPathInfo info; + Status status = client_->GetPathInfo(path, &info); + if (!status.ok()) { + return false; + } + if (info.kind == io::ObjectType::DIRECTORY) { + return true; + } + return false; + } + + TimePoint ToTimePoint(int secs) { + std::chrono::nanoseconds ns_count(static_cast(secs) * 1000000000); + return TimePoint(std::chrono::duration_cast(ns_count)); + } +}; + +void HdfsOptions::ConfigureEndPoint(const std::string& host, int port) { + hdfs_config.host = host; + hdfs_config.port = port; +} + +void HdfsOptions::ConfigureHdfsDriver(bool use_hdfs3) { + if (use_hdfs3) { + hdfs_config.driver = ::arrow::io::HdfsDriver::LIBHDFS3; + } else { + hdfs_config.driver = ::arrow::io::HdfsDriver::LIBHDFS; + } +} + +void HdfsOptions::ConfigureHdfsUser(const std::string& user_name) { + if (!user_name.empty()) { + hdfs_config.user = user_name; + } +} + +void HdfsOptions::ConfigureHdfsReplication(int16_t replication) { + this->replication = replication; +} + +void HdfsOptions::ConfigureHdfsBufferSize(int32_t buffer_size) { + this->buffer_size = buffer_size; +} + +void HdfsOptions::ConfigureHdfsBlockSize(int64_t default_block_size) { + this->default_block_size = default_block_size; +} + +HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options) + : impl_(new Impl{options}) {} + +HadoopFileSystem::~HadoopFileSystem() {} + +Status HadoopFileSystem::Make(const HdfsOptions& options, + std::shared_ptr* out) { + std::shared_ptr ptr(new HadoopFileSystem(options)); + RETURN_NOT_OK(ptr->impl_->Init()); + *out = std::move(ptr); + return Status::OK(); +} + +Status HadoopFileSystem::GetTargetStats(const std::string& path, FileStats* out) { + return impl_->GetTargetStats(path, out); +} + +Status HadoopFileSystem::GetTargetStats(const Selector& select, + std::vector* out) { + return impl_->GetTargetStats(select, out); +} + +Status HadoopFileSystem::CreateDir(const std::string& path, bool recursive) { + return impl_->CreateDir(path, recursive); +} + +Status HadoopFileSystem::DeleteDir(const std::string& path) { + return impl_->DeleteDir(path); +} + +Status HadoopFileSystem::DeleteDirContents(const std::string& path) { + return impl_->DeleteDirContents(path); +} + +Status HadoopFileSystem::DeleteFile(const std::string& path) { + return impl_->DeleteFile(path); +} + +Status HadoopFileSystem::Move(const std::string& src, const std::string& dest) { + return impl_->Move(src, dest); +} + +Status HadoopFileSystem::CopyFile(const std::string& src, const std::string& dest) { + return impl_->CopyFile(src, dest); +} + +Status HadoopFileSystem::OpenInputStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenInputStream(path, out); +} + +Status HadoopFileSystem::OpenInputFile(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenInputFile(path, out); +} + +Status HadoopFileSystem::OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenOutputStream(path, out); +} + +Status HadoopFileSystem::OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenAppendStream(path, out); +} + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h new file mode 100644 index 00000000000..99cd0f736cf --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/hdfs.h" + +namespace arrow { +namespace fs { + +/// Options for the HDFS implementation. +struct ARROW_EXPORT HdfsOptions { + HdfsOptions() = default; + ~HdfsOptions() = default; + + /// Hdfs configuration options, contains host, port, driver + io::HdfsConnectionConfig hdfs_config; + + /// Directory and file_name + std::string directory; + std::string file_name; + + /// Used by Hdfs OpenWritale Interface. + int32_t buffer_size = 0; + int16_t replication = 0; + int64_t default_block_size = 0; + + void ConfigureEndPoint(const std::string& host, int port); + /// Be cautious that libhdfs3 is a unmaintained project + void ConfigureHdfsDriver(bool use_hdfs3); + void ConfigureHdfsReplication(int16_t replication); + void ConfigureHdfsUser(const std::string& user_name); + void ConfigureHdfsBufferSize(int32_t buffer_size); + void ConfigureHdfsBlockSize(int64_t default_block_siz); +}; + +/// HDFS-backed FileSystem implementation. +/// +/// implementation notes: +/// - This is a wrapper of arrow/io/hdfs, so we can use FileSystem API to handle hdfs. +class ARROW_EXPORT HadoopFileSystem : public FileSystem { + public: + ~HadoopFileSystem() override; + + /// \cond FALSE + using FileSystem::GetTargetStats; + /// \endcond + Status GetTargetStats(const std::string& path, FileStats* out) override; + Status GetTargetStats(const Selector& select, std::vector* out) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + + Status DeleteDirContents(const std::string& path) override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Status OpenInputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) override; + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) override; + + /// Create a HdfsFileSystem instance from the given options. + static Status Make(const HdfsOptions& options, std::shared_ptr* out); + + protected: + explicit HadoopFileSystem(const HdfsOptions& options); + + class Impl; + std::unique_ptr impl_; +}; + +} // namespace fs +} // namespace arrow From 282ec95074924cd18f8ce1c43a375d019ecdf55b Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Mon, 11 Nov 2019 16:21:20 +0800 Subject: [PATCH 3/3] [C++] issue fixing according to review comments from @pitrou Signed-off-by: Chendi Xue --- cpp/src/arrow/filesystem/CMakeLists.txt | 4 - cpp/src/arrow/filesystem/filesystem_utils.cc | 23 ++-- cpp/src/arrow/filesystem/filesystem_utils.h | 4 - .../arrow/filesystem/filesystem_utils_test.cc | 2 +- cpp/src/arrow/filesystem/hdfs.cc | 66 +++++++-- cpp/src/arrow/filesystem/hdfs.h | 6 +- cpp/src/arrow/filesystem/hdfs_test.cc | 127 ++++++++++++------ 7 files changed, 153 insertions(+), 79 deletions(-) diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index fbd86fcd4d5..8d4f65ac069 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -18,10 +18,6 @@ # Headers: top level arrow_install_all_headers("arrow/filesystem") -if(ARROW_HDFS) - add_definitions(-DARROW_HDFS) -endif() - if(ARROW_S3) add_definitions(-DARROW_S3) endif() diff --git a/cpp/src/arrow/filesystem/filesystem_utils.cc b/cpp/src/arrow/filesystem/filesystem_utils.cc index cc02bc07ff2..10697f69630 100644 --- a/cpp/src/arrow/filesystem/filesystem_utils.cc +++ b/cpp/src/arrow/filesystem/filesystem_utils.cc @@ -27,15 +27,16 @@ #endif #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/path_util.h" +#include "arrow/result.h" #include "arrow/util/uri.h" namespace arrow { namespace fs { -namespace factory { +enum class FileSystemType { HDFS, LOCAL, S3, UNKNOWN }; -namespace internal { +namespace { class PathInfo { public: @@ -50,7 +51,6 @@ class PathInfo { Status Init(const std::string& full_path) { RETURN_NOT_OK(ParseURI(full_path)); - full_path_ = std::string(full_path); return Status::OK(); } @@ -149,22 +149,21 @@ class PathInfo { } Status ParseOptions(arrow::internal::Uri* uri) { - auto options = uri->query_items(); - for (auto option : *options) { + ARROW_ASSIGN_OR_RAISE(auto options, uri->query_items()); + for (auto option : options) { options_.emplace(option.first, option.second); } return Status::OK(); } FileSystemType fs_type_; - std::string full_path_; std::unordered_map options_; }; -} // namespace internal +} // namespace Status MakeFileSystem(const std::string& full_path, std::shared_ptr* fs) { - std::shared_ptr path_info; - RETURN_NOT_OK(internal::PathInfo::Make(full_path, &path_info)); + std::shared_ptr path_info; + RETURN_NOT_OK(PathInfo::Make(full_path, &path_info)); FileSystemType fs_type = path_info->GetFileSystemType(); switch (fs_type) { @@ -180,12 +179,12 @@ Status MakeFileSystem(const std::string& full_path, std::shared_ptr* std::shared_ptr hdfs; RETURN_NOT_OK(HadoopFileSystem::Make(hdfs_options, &hdfs)); - *fs = std::make_shared("", hdfs); + *fs = hdfs; } break; #endif case FileSystemType::LOCAL: { auto local_fs = std::make_shared(); - *fs = std::make_shared("", local_fs); + *fs = local_fs; } break; #ifdef ARROW_S3 case FileSystemType::S3: @@ -197,7 +196,5 @@ Status MakeFileSystem(const std::string& full_path, std::shared_ptr* return Status::OK(); } -} // namespace factory - } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_utils.h b/cpp/src/arrow/filesystem/filesystem_utils.h index 9e3aef9f77f..0dd84953ad6 100644 --- a/cpp/src/arrow/filesystem/filesystem_utils.h +++ b/cpp/src/arrow/filesystem/filesystem_utils.h @@ -25,9 +25,6 @@ namespace arrow { namespace fs { -enum class FileSystemType { HDFS, LOCAL, S3, UNKNOWN }; - -namespace factory { /// \brief Creates a new FileSystem by path /// /// \param[in] full_path a URI-based path, ex: hdfs:///some/path?replication=3 @@ -35,6 +32,5 @@ namespace factory { /// \return Status ARROW_EXPORT Status MakeFileSystem(const std::string& full_path, std::shared_ptr* fs); -} // namespace factory } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_utils_test.cc b/cpp/src/arrow/filesystem/filesystem_utils_test.cc index 211ce9aa8aa..a251dda6ccb 100644 --- a/cpp/src/arrow/filesystem/filesystem_utils_test.cc +++ b/cpp/src/arrow/filesystem/filesystem_utils_test.cc @@ -96,7 +96,7 @@ class TestLocalFileSystem : public TestFileSystem { arrow::internal::Uri uri; ASSERT_OK(uri.Parse(full_path)); file_name_ = uri.path(); - ASSERT_OK(factory::MakeFileSystem(full_path, &fs_)); + ASSERT_OK(MakeFileSystem(full_path, &fs_)); } protected: diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc index 0accd667f84..67869f37044 100644 --- a/cpp/src/arrow/filesystem/hdfs.cc +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -65,13 +65,17 @@ class HadoopFileSystem::Impl { Status GetTargetStats(const std::string& path, FileStats* out) { io::HdfsPathInfo info; - RETURN_NOT_OK(client_->GetPathInfo(path, &info)); - + auto status = client_->GetPathInfo(path, &info); out->set_path(path); + if (status.IsIOError()) { + out->set_type(FileType::NonExistent); + return Status::OK(); + } + if (info.kind == io::ObjectType::DIRECTORY) { out->set_type(FileType::Directory); out->set_size(kNoSize); - } else { + } else if (info.kind == io::ObjectType::FILE) { out->set_type(FileType::File); out->set_size(info.size); } @@ -79,17 +83,54 @@ class HadoopFileSystem::Impl { return Status::OK(); } - Status GetTargetStats(const Selector& select, std::vector* out) { + Status StatSelector(std::string path, const Selector& select, int nesting_depth, + std::vector* out) { + FileStats stat; + RETURN_NOT_OK(GetTargetStats(path, &stat)); + if (stat.type() == FileType::NonExistent) { + if (select.allow_non_existent) { + return Status::OK(); + } else { + return Status::IOError("Directory does not exist: '", path, "'"); + } + } + + if (nesting_depth > 1 && + (!select.recursive || nesting_depth > select.max_recursion)) { + // if this selector dosn't support recursive or nesting_depth is over max_recursive, + // return without add this filestat into out. + return Status::OK(); + } + + if (stat.type() == FileType::Directory && path != select.base_dir) { + out->push_back(stat); + } else if (stat.type() == FileType::File) { + out->push_back(stat); + return Status::OK(); + } + std::vector file_list; - RETURN_NOT_OK(client_->GetChildren(select.base_dir, &file_list)); + RETURN_NOT_OK(client_->GetChildren(path, &file_list)); for (auto file : file_list) { - FileStats stat; - RETURN_NOT_OK(GetTargetStats(file, &stat)); - out->push_back(stat); + // In hdfs case, file will be absolute path, trim here. + RETURN_NOT_OK(StatSelector(file, select, nesting_depth + 1, out)); } return Status::OK(); } + Status GetTargetStats(const Selector& select, std::vector* out) { + out->clear(); + FileStats stat; + RETURN_NOT_OK(GetTargetStats(select.base_dir, &stat)); + if (stat.type() == FileType::File) { + return Status::Invalid( + "GetTargetStates expects base_dir of selector to be a directory, while '", + select.base_dir, "' is a file"); + } + RETURN_NOT_OK(StatSelector(select.base_dir, select, 0, out)); + return Status::OK(); + } + Status CreateDir(const std::string& path, bool recursive) { if (client_->Exists(path)) { return Status::OK(); @@ -116,7 +157,7 @@ class HadoopFileSystem::Impl { std::vector file_list; RETURN_NOT_OK(client_->GetChildren(path, &file_list)); for (auto file : file_list) { - RETURN_NOT_OK(client_->Delete(path + file)); + RETURN_NOT_OK(client_->Delete(file, true)); } return Status::OK(); } @@ -140,9 +181,10 @@ class HadoopFileSystem::Impl { } Status OpenInputStream(const std::string& path, std::shared_ptr* out) { - // TODO - return Status::NotImplemented( - "HadoopFileSystem::OpenInputStream is not supported yet"); + std::shared_ptr file; + RETURN_NOT_OK(OpenInputFile(path, &file)); + *out = file; + return Status::OK(); } Status OpenInputFile(const std::string& path, diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 99cd0f736cf..0f4753590c4 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -35,11 +35,7 @@ struct ARROW_EXPORT HdfsOptions { /// Hdfs configuration options, contains host, port, driver io::HdfsConnectionConfig hdfs_config; - /// Directory and file_name - std::string directory; - std::string file_name; - - /// Used by Hdfs OpenWritale Interface. + /// Used by Hdfs OpenWritable Interface. int32_t buffer_size = 0; int16_t replication = 0; int64_t default_block_size = 0; diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc index 8e4dff81421..82e67e359cd 100644 --- a/cpp/src/arrow/filesystem/hdfs_test.cc +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -29,8 +29,6 @@ namespace arrow { namespace fs { namespace internal { -#ifndef _WIN32 - #define SOME_DATA_SIZE 9 using HdfsDriver = arrow::io::HdfsDriver; @@ -71,6 +69,10 @@ class TestHadoopFileSystem : public ::testing::Test { std::shared_ptr hdfs; auto status = HadoopFileSystem::Make(hdfs_options, &hdfs); if (!status.ok()) { + ARROW_LOG(INFO) + << "HadoopFileSystem::Make failed, it is possible when we don't have " + "proper driver on this node, err msg is " + << status.message(); loaded_driver_ = false; return; } @@ -90,43 +92,32 @@ typedef ::testing::Types DriverTypes; TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); -#define SKIP_IF_NO_DRIVER() \ - if (!this->loaded_driver_) { \ - std::cout << "Driver not loaded, skipping" << std::endl; \ - return; \ +#define SKIP_IF_NO_DRIVER() \ + if (!this->loaded_driver_) { \ + ARROW_LOG(INFO) << "Driver not loaded, skipping"; \ + return; \ } -TYPED_TEST(TestHadoopFileSystem, CreateDir) { - SKIP_IF_NO_DRIVER(); - - std::string directory_name = "AB"; - ASSERT_OK(this->fs_->CreateDir(directory_name)); -} - -TYPED_TEST(TestHadoopFileSystem, GetTargetStats) { +TYPED_TEST(TestHadoopFileSystem, CreateAndDeleteDir) { SKIP_IF_NO_DRIVER(); - std::string directory_name = "AB"; FileStats stat; + std::string directory_name = "/AB"; + ASSERT_OK(this->fs_->CreateDir(directory_name)); ASSERT_OK(this->fs_->GetTargetStats(directory_name, &stat)); - AssertFileStats(stat, "AB", FileType::Directory); -} - -TYPED_TEST(TestHadoopFileSystem, MoveDir) { - SKIP_IF_NO_DRIVER(); + AssertFileStats(stat, directory_name, FileType::Directory); - std::string directory_name_src = "AB"; - std::string directory_name_dest = "CD"; - ASSERT_OK(this->fs_->Move(directory_name_src, directory_name_dest)); - FileStats stat; - ASSERT_OK(this->fs_->GetTargetStats(directory_name_dest, &stat)); - AssertFileStats(stat, "CD", FileType::Directory); + ASSERT_OK(this->fs_->DeleteDir(directory_name)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name, &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); } TYPED_TEST(TestHadoopFileSystem, WriteReadFile) { SKIP_IF_NO_DRIVER(); - std::string file_name = "CD/abc"; + ASSERT_OK(this->fs_->CreateDir("/CD")); + constexpr int kDataSize = 9; + std::string file_name = "/CD/abc"; std::string data = "some data"; std::shared_ptr stream; ASSERT_OK(this->fs_->OpenOutputStream(file_name, &stream)); @@ -138,31 +129,87 @@ TYPED_TEST(TestHadoopFileSystem, WriteReadFile) { ASSERT_OK(this->fs_->OpenInputFile(file_name, &file)); int64_t file_size; ASSERT_OK(file->GetSize(&file_size)); - ASSERT_EQ(SOME_DATA_SIZE, file_size); - uint8_t buffer[SOME_DATA_SIZE]; + ASSERT_EQ(kDataSize, file_size); + uint8_t buffer[kDataSize]; int64_t bytes_read = 0; - ASSERT_OK(file->Read(SOME_DATA_SIZE, &bytes_read, buffer)); - ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), SOME_DATA_SIZE)); + ASSERT_OK(file->Read(kDataSize, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), kDataSize)); } -TYPED_TEST(TestHadoopFileSystem, DeleteFile) { +TYPED_TEST(TestHadoopFileSystem, GetTargetStats) { SKIP_IF_NO_DRIVER(); - std::string file_name = "CD/abc"; - ASSERT_OK(this->fs_->DeleteFile(file_name)); + std::vector stats; + + ASSERT_OK(this->fs_->CreateDir("/AB")); + ASSERT_OK(this->fs_->CreateDir("/AB/CD")); + ASSERT_OK(this->fs_->CreateDir("/AB/EF")); + ASSERT_OK(this->fs_->CreateDir("/AB/EF/GH")); + + std::shared_ptr stream; + ASSERT_OK(this->fs_->OpenOutputStream("/AB/data", &stream)); + + Selector selector; + selector.base_dir = "/AB"; + selector.recursive = false; + + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], "/AB/CD", FileType::Directory); + AssertFileStats(stats[1], "/AB/EF", FileType::Directory); + AssertFileStats(stats[2], "/AB/data", FileType::File); + + selector.recursive = true; + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 4); + AssertFileStats(stats[0], "/AB/CD", FileType::Directory); + AssertFileStats(stats[1], "/AB/EF", FileType::Directory); + AssertFileStats(stats[2], "/AB/EF/GH", FileType::Directory); + AssertFileStats(stats[3], "/AB/data", FileType::File); + + selector.max_recursion = 1; + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], "/AB/CD", FileType::Directory); + AssertFileStats(stats[1], "/AB/EF", FileType::Directory); + AssertFileStats(stats[2], "/AB/data", FileType::File); + + selector.base_dir = "XYZ"; + selector.allow_non_existent = true; + stats.clear(); + ASSERT_OK(this->fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 0); + + ASSERT_OK(stream->Close()); + ASSERT_OK(this->fs_->DeleteDirContents("/AB")); + ASSERT_OK(this->fs_->DeleteDir("/AB")); FileStats stat; - ASSERT_TRUE(Status::OK() != this->fs_->GetTargetStats(file_name, &stat)); + ASSERT_OK(this->fs_->GetTargetStats("/AB", &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); } -TYPED_TEST(TestHadoopFileSystem, DeleteDir) { +TYPED_TEST(TestHadoopFileSystem, MoveDir) { SKIP_IF_NO_DRIVER(); - std::string directory_name = "CD"; - ASSERT_OK(this->fs_->DeleteDir(directory_name)); FileStats stat; - ASSERT_TRUE(Status::OK() != this->fs_->GetTargetStats(directory_name, &stat)); + std::string directory_name_src = "/AB"; + std::string directory_name_dest = "/CD"; + ASSERT_OK(this->fs_->CreateDir(directory_name_src)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name_src, &stat)); + AssertFileStats(stat, directory_name_src, FileType::Directory); + + // move file + ASSERT_OK(this->fs_->Move(directory_name_src, directory_name_dest)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name_src, &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); + + ASSERT_OK(this->fs_->GetTargetStats(directory_name_dest, &stat)); + AssertFileStats(stat, directory_name_dest, FileType::Directory); + ASSERT_OK(this->fs_->DeleteDir(directory_name_dest)); } -#endif // _WIN32 } // namespace internal } // namespace fs