diff --git a/ci/scripts/integration_hdfs.sh b/ci/scripts/integration_hdfs.sh index fa63d4742aa..0eb35e73a08 100755 --- a/ci/scripts/integration_hdfs.sh +++ b/ci/scripts/integration_hdfs.sh @@ -30,6 +30,7 @@ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/ # execute cpp tests pushd ${build_dir} debug/arrow-io-hdfs-test +debug/arrow-hdfs-test popd # cannot use --pyargs with custom arguments like --hdfs or --only-hdfs, because diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d9faa772d7c..f45db2b66b8 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -294,6 +294,14 @@ endif() if(ARROW_FILESYSTEM) add_subdirectory(filesystem) + if(ARROW_HDFS) + add_definitions(-DARROW_HDFS) + endif() + + if(ARROW_S3) + add_definitions(-DARROW_S3) + endif() + list(APPEND ARROW_SRCS filesystem/filesystem.cc filesystem/localfs.cc @@ -306,6 +314,10 @@ if(ARROW_FILESYSTEM) list(APPEND ARROW_SRCS filesystem/s3fs.cc) endif() + if(ARROW_HDFS) + list(APPEND ARROW_SRCS filesystem/hdfs.cc) + endif() + list(APPEND ARROW_TESTING_SRCS filesystem/test_util.cc) endif() diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index efb78055a9a..b441d1066a6 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -32,3 +32,7 @@ if(ARROW_S3) add_dependencies(arrow-tests arrow-s3fs-narrative-test) endif() endif() + +if(ARROW_HDFS) + add_arrow_test(hdfs_test) +endif() diff --git a/cpp/src/arrow/filesystem/api.h b/cpp/src/arrow/filesystem/api.h index fd8f566a78e..65cc3319dff 100644 --- a/cpp/src/arrow/filesystem/api.h +++ b/cpp/src/arrow/filesystem/api.h @@ -19,6 +19,7 @@ #define ARROW_FILESYSTEM_API_H #include "arrow/filesystem/filesystem.h" // IWYU pragma: export +#include "arrow/filesystem/hdfs.h" // IWYU pragma: export #include "arrow/filesystem/localfs.h" // IWYU pragma: export #include "arrow/filesystem/mockfs.h" // IWYU pragma: export #include "arrow/filesystem/s3fs.h" // IWYU pragma: export diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index aa9677e45f2..2573fd71e30 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -18,18 +18,30 @@ #include #include "arrow/filesystem/filesystem.h" +#ifdef ARROW_HDFS +#include "arrow/filesystem/hdfs.h" +#endif +#include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/io/slow.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" +#include "arrow/util/uri.h" namespace arrow { + +using internal::Uri; + namespace fs { using internal::ConcatAbstractPath; using internal::EnsureTrailingSlash; using internal::GetAbstractPathParent; using internal::kSep; +using internal::RemoveLeadingSlash; +using internal::RemoveTrailingSlash; std::string ToString(FileType ftype) { switch (ftype) { @@ -328,5 +340,55 @@ Status SlowFileSystem::OpenAppendStream(const std::string& path, return base_fs_->OpenAppendStream(path, out); } +Status FileSystemFromUri(const std::string& uri_string, + std::shared_ptr* out_fs, std::string* out_path) { + Uri uri; + RETURN_NOT_OK(uri.Parse(uri_string)); + if (out_path != nullptr) { + *out_path = std::string(uri.path()); + } + + const auto scheme = uri.scheme(); +#ifdef _WIN32 + if (scheme.size() == 1) { + // Assuming a plain local path starting with a drive letter, e.g "C:/..." + if (out_path != nullptr) { + *out_path = uri_string; + } + *out_fs = std::make_shared(); + return Status::OK(); + } +#endif + if (scheme == "" || scheme == "file") { + *out_fs = std::make_shared(); + return Status::OK(); + } + + if (scheme == "hdfs") { +#ifdef ARROW_HDFS + ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri)); + ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options)); + *out_fs = hdfs; + return Status::OK(); +#else + return Status::NotImplemented("Arrow compiled without HDFS support"); +#endif + } + + // Other filesystems below do not have an absolute / relative path distinction, + // normalize path by removing leading slash. + // XXX perhaps each filesystem should have a path normalization method? + if (out_path != nullptr) { + *out_path = std::string(RemoveLeadingSlash(*out_path)); + } + if (scheme == "mock") { + *out_fs = std::make_shared(internal::CurrentTimePoint()); + return Status::OK(); + } + + // TODO add support for S3 URIs + return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string); +} + } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 7084f839a69..5064d3c71f2 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -25,6 +25,7 @@ #include #include "arrow/status.h" +#include "arrow/util/macros.h" #include "arrow/util/visibility.h" // The Windows API defines macros from *File resolving to either @@ -318,5 +319,18 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { std::shared_ptr latencies_; }; +/// \brief EXPERIMENTAL: Create a new FileSystem by URI +/// +/// A scheme-less URI is considered a local filesystem path. +/// Recognized schemes are "file", "mock" and "hdfs". +/// +/// \param[in] uri a URI-based path, ex: file:///some/local/path +/// \param[out] out_fs FileSystem instance. +/// \param[out] out_path (optional) Path inside the filesystem. +/// \return Status +ARROW_EXPORT +Status FileSystemFromUri(const std::string& uri, std::shared_ptr* out_fs, + std::string* out_path = NULLPTR); + } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/filesystem_test.cc b/cpp/src/arrow/filesystem/filesystem_test.cc index 9f479e99e49..8366b8314c3 100644 --- a/cpp/src/arrow/filesystem/filesystem_test.cc +++ b/cpp/src/arrow/filesystem/filesystem_test.cc @@ -167,6 +167,51 @@ TEST(PathUtil, RemoveTrailingSlash) { ASSERT_EQ("/abc/def", std::string(RemoveTrailingSlash("/abc/def//"))); } +TEST(PathUtil, EnsureLeadingSlash) { + ASSERT_EQ("/", EnsureLeadingSlash("")); + ASSERT_EQ("/", EnsureLeadingSlash("/")); + ASSERT_EQ("/abc", EnsureLeadingSlash("abc")); + ASSERT_EQ("/abc/", EnsureLeadingSlash("abc/")); + ASSERT_EQ("/abc", EnsureLeadingSlash("/abc")); + ASSERT_EQ("/abc/", EnsureLeadingSlash("/abc/")); +} + +TEST(PathUtil, RemoveLeadingSlash) { + ASSERT_EQ("", std::string(RemoveLeadingSlash(""))); + ASSERT_EQ("", std::string(RemoveLeadingSlash("/"))); + ASSERT_EQ("", std::string(RemoveLeadingSlash("//"))); + ASSERT_EQ("abc/def", std::string(RemoveLeadingSlash("abc/def"))); + ASSERT_EQ("abc/def", std::string(RemoveLeadingSlash("/abc/def"))); + ASSERT_EQ("abc/def", std::string(RemoveLeadingSlash("//abc/def"))); + ASSERT_EQ("abc/def/", std::string(RemoveLeadingSlash("abc/def/"))); + ASSERT_EQ("abc/def/", std::string(RemoveLeadingSlash("/abc/def/"))); + ASSERT_EQ("abc/def/", std::string(RemoveLeadingSlash("//abc/def/"))); +} + +TEST(PathUtil, MakeAbstractPathRelative) { + std::string s; + + ASSERT_OK_AND_EQ("", MakeAbstractPathRelative("/", "/")); + ASSERT_OK_AND_EQ("foo/bar", MakeAbstractPathRelative("/", "/foo/bar")); + + ASSERT_OK_AND_EQ("", MakeAbstractPathRelative("/foo", "/foo")); + ASSERT_OK_AND_EQ("", MakeAbstractPathRelative("/foo/", "/foo")); + ASSERT_OK_AND_EQ("", MakeAbstractPathRelative("/foo", "/foo/")); + ASSERT_OK_AND_EQ("", MakeAbstractPathRelative("/foo/", "/foo/")); + + ASSERT_OK_AND_EQ("bar", MakeAbstractPathRelative("/foo", "/foo/bar")); + ASSERT_OK_AND_EQ("bar", MakeAbstractPathRelative("/foo/", "/foo/bar")); + ASSERT_OK_AND_EQ("bar/", MakeAbstractPathRelative("/foo/", "/foo/bar/")); + + // Not relative to base + ASSERT_RAISES(Invalid, MakeAbstractPathRelative("/xxx", "/foo/bar")); + ASSERT_RAISES(Invalid, MakeAbstractPathRelative("/xxx", "/xxxx")); + + // Base is not absolute + ASSERT_RAISES(Invalid, MakeAbstractPathRelative("foo/bar", "foo/bar/baz")); + ASSERT_RAISES(Invalid, MakeAbstractPathRelative("", "foo/bar/baz")); +} + //////////////////////////////////////////////////////////////////////////// // Generic MockFileSystem tests @@ -381,6 +426,28 @@ TEST_F(TestMockFS, Make) { CheckFiles({{"A/a", time_, ""}}); } +TEST_F(TestMockFS, FileSystemFromUri) { + std::string path; + ASSERT_OK(FileSystemFromUri("mock:", &fs_, &path)); + ASSERT_EQ(path, ""); + CheckDirs({}); // Ensures it's a MockFileSystem + ASSERT_OK(FileSystemFromUri("mock:foo/bar", &fs_, &path)); + ASSERT_EQ(path, "foo/bar"); + CheckDirs({}); + ASSERT_OK(FileSystemFromUri("mock:/foo/bar", &fs_, &path)); + ASSERT_EQ(path, "foo/bar"); + CheckDirs({}); + ASSERT_OK(FileSystemFromUri("mock:/foo/bar/?q=xxx", &fs_, &path)); + ASSERT_EQ(path, "foo/bar/"); + CheckDirs({}); + ASSERT_OK(FileSystemFromUri("mock:///foo/bar", &fs_, &path)); + ASSERT_EQ(path, "foo/bar"); + CheckDirs({}); + ASSERT_OK(FileSystemFromUri("mock:///foo/bar?q=zzz", &fs_, &path)); + ASSERT_EQ(path, "foo/bar"); + CheckDirs({}); +} + //////////////////////////////////////////////////////////////////////////// // Concrete SubTreeFileSystem tests diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc new file mode 100644 index 00000000000..99fba12a93a --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -0,0 +1,415 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/io/hdfs.h" +#include "arrow/io/hdfs_internal.h" +#include "arrow/util/logging.h" +#include "arrow/util/parsing.h" + +#ifdef _WIN32 +#ifdef DeleteFile +#undef DeleteFile +#endif +#ifdef CopyFile +#undef CopyFile +#endif +#endif + +namespace arrow { + +using internal::Uri; + +namespace fs { + +using internal::GetAbstractPathParent; +using internal::MakeAbstractPathRelative; +using internal::RemoveLeadingSlash; + +static constexpr int32_t kDefaultHdfsPort = 8020; + +class HadoopFileSystem::Impl { + public: + explicit Impl(HdfsOptions options) : options_(std::move(options)) {} + + ~Impl() { + Status st = Close(); + if (!st.ok()) { + ARROW_LOG(WARNING) << "Failed to disconnect hdfs client: " << st.ToString(); + } + } + + Status Init() { + io::internal::LibHdfsShim* driver_shim; + if (options_.connection_config.driver == io::HdfsDriver::LIBHDFS3) { + RETURN_NOT_OK(ConnectLibHdfs3(&driver_shim)); + } else { + RETURN_NOT_OK(ConnectLibHdfs(&driver_shim)); + } + RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.connection_config, &client_)); + return Status::OK(); + } + + Status Close() { + if (client_) { + RETURN_NOT_OK(client_->Disconnect()); + } + return Status::OK(); + } + + Status GetTargetStats(const std::string& path, FileStats* out) { + io::HdfsPathInfo info; + auto status = client_->GetPathInfo(path, &info); + out->set_path(path); + if (status.IsIOError()) { + out->set_type(FileType::NonExistent); + return Status::OK(); + } + + PathInfoToFileStats(info, out); + return Status::OK(); + } + + Status StatSelector(const std::string& wd, const std::string& path, + const Selector& select, int nesting_depth, + std::vector* out) { + std::vector children; + Status st = client_->ListDirectory(path, &children); + if (!st.ok()) { + if (select.allow_non_existent) { + FileStats stat; + RETURN_NOT_OK(GetTargetStats(path, &stat)); + if (stat.type() == FileType::NonExistent) { + return Status::OK(); + } + } + return st; + } + for (const auto& child_info : children) { + // HDFS returns an absolute URI here, need to extract path relative to wd + Uri uri; + RETURN_NOT_OK(uri.Parse(child_info.name)); + std::string child_path = uri.path(); + if (!wd.empty()) { + ARROW_ASSIGN_OR_RAISE(child_path, MakeAbstractPathRelative(wd, child_path)); + } + + FileStats stat; + stat.set_path(child_path); + PathInfoToFileStats(child_info, &stat); + const bool is_dir = stat.type() == FileType::Directory; + out->push_back(std::move(stat)); + if (is_dir && select.recursive && nesting_depth < select.max_recursion) { + RETURN_NOT_OK(StatSelector(wd, child_path, select, nesting_depth + 1, out)); + } + } + return Status::OK(); + } + + Status GetTargetStats(const Selector& select, std::vector* out) { + out->clear(); + + std::string wd; + if (select.base_dir.empty() || select.base_dir.front() != '/') { + // Fetch working directory, because we need to trim it from the start + // of paths returned by ListDirectory as select.base_dir is relative. + RETURN_NOT_OK(client_->GetWorkingDirectory(&wd)); + Uri wd_uri; + RETURN_NOT_OK(wd_uri.Parse(wd)); + wd = wd_uri.path(); + } + + FileStats stat; + RETURN_NOT_OK(GetTargetStats(select.base_dir, &stat)); + if (stat.type() == FileType::File) { + return Status::Invalid( + "GetTargetStates expects base_dir of selector to be a directory, while '", + select.base_dir, "' is a file"); + } + RETURN_NOT_OK(StatSelector(wd, select.base_dir, select, 0, out)); + return Status::OK(); + } + + Status CreateDir(const std::string& path, bool recursive) { + if (IsDirectory(path)) { + return Status::OK(); + } + if (!recursive) { + const auto parent = GetAbstractPathParent(path).first; + if (!parent.empty() && !IsDirectory(parent)) { + return Status::IOError("Cannot create directory '", path, + "': parent is not a directory"); + } + } + RETURN_NOT_OK(client_->MakeDirectory(path)); + return Status::OK(); + } + + Status DeleteDir(const std::string& path) { + if (!IsDirectory(path)) { + return Status::IOError("Cannot delete directory '", path, "': not a directory"); + } + RETURN_NOT_OK(client_->DeleteDirectory(path)); + return Status::OK(); + } + + Status DeleteDirContents(const std::string& path) { + std::vector file_list; + RETURN_NOT_OK(client_->GetChildren(path, &file_list)); + for (auto file : file_list) { + RETURN_NOT_OK(client_->Delete(file, /*recursive=*/true)); + } + return Status::OK(); + } + + Status DeleteFile(const std::string& path) { + if (IsDirectory(path)) { + return Status::IOError("path is a directory"); + } + RETURN_NOT_OK(client_->Delete(path)); + return Status::OK(); + } + + Status Move(const std::string& src, const std::string& dest) { + RETURN_NOT_OK(client_->Rename(src, dest)); + return Status::OK(); + } + + Status CopyFile(const std::string& src, const std::string& dest) { + // TODO implement this (but only if HDFS supports on-server copy) + return Status::NotImplemented("HadoopFileSystem::CopyFile is not supported yet"); + } + + Status OpenInputStream(const std::string& path, std::shared_ptr* out) { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenReadable(path, &file)); + *out = file; + return Status::OK(); + } + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenReadable(path, &file)); + *out = file; + return Status::OK(); + } + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + bool append = false; + return OpenOutputStreamGeneric(path, append, out); + } + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + bool append = true; + return OpenOutputStreamGeneric(path, append, out); + } + + protected: + HdfsOptions options_; + std::shared_ptr<::arrow::io::HadoopFileSystem> client_; + + void PathInfoToFileStats(const io::HdfsPathInfo& info, FileStats* out) { + if (info.kind == io::ObjectType::DIRECTORY) { + out->set_type(FileType::Directory); + out->set_size(kNoSize); + } else if (info.kind == io::ObjectType::FILE) { + out->set_type(FileType::File); + out->set_size(info.size); + } + out->set_mtime(ToTimePoint(info.last_modified_time)); + } + + Status OpenOutputStreamGeneric(const std::string& path, bool append, + std::shared_ptr* out) { + std::shared_ptr stream; + RETURN_NOT_OK(client_->OpenWritable(path, append, options_.buffer_size, + options_.replication, options_.default_block_size, + &stream)); + *out = stream; + return Status::OK(); + } + + bool IsDirectory(const std::string& path) { + io::HdfsPathInfo info; + Status status = client_->GetPathInfo(path, &info); + if (!status.ok()) { + return false; + } + if (info.kind == io::ObjectType::DIRECTORY) { + return true; + } + return false; + } + + TimePoint ToTimePoint(int secs) { + std::chrono::nanoseconds ns_count(static_cast(secs) * 1000000000); + return TimePoint(std::chrono::duration_cast(ns_count)); + } +}; + +void HdfsOptions::ConfigureEndPoint(const std::string& host, int port) { + connection_config.host = host; + connection_config.port = port; +} + +void HdfsOptions::ConfigureHdfsDriver(bool use_hdfs3) { + if (use_hdfs3) { + connection_config.driver = ::arrow::io::HdfsDriver::LIBHDFS3; + } else { + connection_config.driver = ::arrow::io::HdfsDriver::LIBHDFS; + } +} + +void HdfsOptions::ConfigureHdfsUser(const std::string& user_name) { + connection_config.user = user_name; +} + +void HdfsOptions::ConfigureHdfsReplication(int16_t replication) { + this->replication = replication; +} + +void HdfsOptions::ConfigureHdfsBufferSize(int32_t buffer_size) { + this->buffer_size = buffer_size; +} + +void HdfsOptions::ConfigureHdfsBlockSize(int64_t default_block_size) { + this->default_block_size = default_block_size; +} + +Result HdfsOptions::FromUri(const Uri& uri) { + HdfsOptions options; + + std::unordered_map options_map; + ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items()); + for (const auto& kv : options_items) { + options_map.emplace(kv.first, kv.second); + } + + const auto port = uri.port(); + if (port == -1) { + options.ConfigureEndPoint(uri.host(), kDefaultHdfsPort); + } else { + options.ConfigureEndPoint(uri.host(), port); + } + + auto it = options_map.find("use_hdfs3"); + if (it != options_map.end()) { + const auto& v = it->second; + if (v == "1") { + options.ConfigureHdfsDriver(true); + } else if (v == "0") { + options.ConfigureHdfsDriver(false); + } else { + return Status::Invalid( + "Invalid value for option 'use_hdfs3' (allowed values are '0' and '1'): '", v, + "'"); + } + } + it = options_map.find("replication"); + if (it != options_map.end()) { + const auto& v = it->second; + ::arrow::internal::StringConverter converter; + int16_t reps; + if (!converter(v.data(), v.size(), &reps)) { + return Status::Invalid("Invalid value for option 'replication': '", v, "'"); + } + options.ConfigureHdfsReplication(reps); + } + it = options_map.find("user"); + if (it != options_map.end()) { + const auto& v = it->second; + options.ConfigureHdfsUser(v); + } + return options; +} + +HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options) + : impl_(new Impl{options}) {} + +HadoopFileSystem::~HadoopFileSystem() {} + +Result> HadoopFileSystem::Make( + const HdfsOptions& options) { + std::shared_ptr ptr(new HadoopFileSystem(options)); + RETURN_NOT_OK(ptr->impl_->Init()); + return ptr; +} + +Status HadoopFileSystem::GetTargetStats(const std::string& path, FileStats* out) { + return impl_->GetTargetStats(path, out); +} + +Status HadoopFileSystem::GetTargetStats(const Selector& select, + std::vector* out) { + return impl_->GetTargetStats(select, out); +} + +Status HadoopFileSystem::CreateDir(const std::string& path, bool recursive) { + return impl_->CreateDir(path, recursive); +} + +Status HadoopFileSystem::DeleteDir(const std::string& path) { + return impl_->DeleteDir(path); +} + +Status HadoopFileSystem::DeleteDirContents(const std::string& path) { + return impl_->DeleteDirContents(path); +} + +Status HadoopFileSystem::DeleteFile(const std::string& path) { + return impl_->DeleteFile(path); +} + +Status HadoopFileSystem::Move(const std::string& src, const std::string& dest) { + return impl_->Move(src, dest); +} + +Status HadoopFileSystem::CopyFile(const std::string& src, const std::string& dest) { + return impl_->CopyFile(src, dest); +} + +Status HadoopFileSystem::OpenInputStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenInputStream(path, out); +} + +Status HadoopFileSystem::OpenInputFile(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenInputFile(path, out); +} + +Status HadoopFileSystem::OpenOutputStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenOutputStream(path, out); +} + +Status HadoopFileSystem::OpenAppendStream(const std::string& path, + std::shared_ptr* out) { + return impl_->OpenAppendStream(path, out); +} + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h new file mode 100644 index 00000000000..e84868abeb6 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/hdfs.h" +#include "arrow/result.h" +#include "arrow/util/uri.h" + +namespace arrow { +namespace fs { + +/// Options for the HDFS implementation. +struct ARROW_EXPORT HdfsOptions { + HdfsOptions() = default; + ~HdfsOptions() = default; + + /// Hdfs configuration options, contains host, port, driver + io::HdfsConnectionConfig connection_config; + + /// Used by Hdfs OpenWritable Interface. + int32_t buffer_size = 0; + int16_t replication = 3; + int64_t default_block_size = 0; + + void ConfigureEndPoint(const std::string& host, int port); + /// Be cautious that libhdfs3 is a unmaintained project + void ConfigureHdfsDriver(bool use_hdfs3); + void ConfigureHdfsReplication(int16_t replication); + void ConfigureHdfsUser(const std::string& user_name); + void ConfigureHdfsBufferSize(int32_t buffer_size); + void ConfigureHdfsBlockSize(int64_t default_block_size); + + static Result FromUri(const ::arrow::internal::Uri& uri); +}; + +/// HDFS-backed FileSystem implementation. +/// +/// implementation notes: +/// - This is a wrapper of arrow/io/hdfs, so we can use FileSystem API to handle hdfs. +class ARROW_EXPORT HadoopFileSystem : public FileSystem { + public: + ~HadoopFileSystem() override; + + /// \cond FALSE + using FileSystem::GetTargetStats; + /// \endcond + Status GetTargetStats(const std::string& path, FileStats* out) override; + Status GetTargetStats(const Selector& select, std::vector* out) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + + Status DeleteDirContents(const std::string& path) override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Status OpenInputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) override; + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) override; + + /// Create a HdfsFileSystem instance from the given options. + static Result> Make(const HdfsOptions& options); + + protected: + explicit HadoopFileSystem(const HdfsOptions& options); + + class Impl; + std::unique_ptr impl_; +}; + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc new file mode 100644 index 00000000000..66f3e98a334 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include + +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" +#include "arrow/util/uri.h" + +namespace arrow { + +using internal::Uri; +using io::HdfsDriver; + +namespace fs { + +TEST(TestHdfsOptions, FromUri) { + HdfsOptions options; + internal::Uri uri; + + ASSERT_OK(uri.Parse("hdfs://localhost")); + ASSERT_OK_AND_ASSIGN(options, HdfsOptions::FromUri(uri)); + ASSERT_EQ(options.replication, 3); + ASSERT_EQ(options.connection_config.host, "localhost"); + ASSERT_EQ(options.connection_config.port, 8020); + ASSERT_EQ(options.connection_config.user, ""); + ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS); + + ASSERT_OK(uri.Parse("hdfs://otherhost:9999/?use_hdfs3=0&replication=2")); + ASSERT_OK_AND_ASSIGN(options, HdfsOptions::FromUri(uri)); + ASSERT_EQ(options.replication, 2); + ASSERT_EQ(options.connection_config.host, "otherhost"); + ASSERT_EQ(options.connection_config.port, 9999); + ASSERT_EQ(options.connection_config.user, ""); + ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS); + + ASSERT_OK(uri.Parse("hdfs://otherhost:9999/?use_hdfs3=1&user=stevereich")); + ASSERT_OK_AND_ASSIGN(options, HdfsOptions::FromUri(uri)); + ASSERT_EQ(options.replication, 3); + ASSERT_EQ(options.connection_config.host, "otherhost"); + ASSERT_EQ(options.connection_config.port, 9999); + ASSERT_EQ(options.connection_config.user, "stevereich"); + ASSERT_EQ(options.connection_config.driver, HdfsDriver::LIBHDFS3); +} + +struct JNIDriver { + static HdfsDriver type; +}; + +struct PivotalDriver { + static HdfsDriver type; +}; + +template +class TestHadoopFileSystem : public ::testing::Test { + public: + void SetUp() override { + const char* host = std::getenv("ARROW_HDFS_TEST_HOST"); + const char* port = std::getenv("ARROW_HDFS_TEST_PORT"); + const char* user = std::getenv("ARROW_HDFS_TEST_USER"); + + std::string hdfs_host = host == nullptr ? "localhost" : std::string(host); + int hdfs_port = port == nullptr ? 20500 : atoi(port); + std::string hdfs_user = user == nullptr ? "root" : std::string(user); + + if (DRIVER::type == HdfsDriver::LIBHDFS) { + use_hdfs3_ = false; + } else { + use_hdfs3_ = true; + } + + options_.ConfigureEndPoint(hdfs_host, hdfs_port); + options_.ConfigureHdfsUser(hdfs_user); + options_.ConfigureHdfsDriver(use_hdfs3_); + options_.ConfigureHdfsReplication(0); + + auto result = HadoopFileSystem::Make(options_); + if (!result.ok()) { + ARROW_LOG(INFO) + << "HadoopFileSystem::Make failed, it is possible when we don't have " + "proper driver on this node, err msg is " + << result.status().ToString(); + loaded_driver_ = false; + return; + } + loaded_driver_ = true; + fs_ = std::make_shared("", *result); + } + + void TestFileSystemFromUri() { + std::stringstream ss; + ss << "hdfs://" << options_.connection_config.host << ":" + << options_.connection_config.port << "/" + << "?replication=0&user=" << options_.connection_config.user; + if (use_hdfs3_) { + ss << "&use_hdfs3=1"; + } + + std::shared_ptr uri_fs; + std::string path; + ARROW_LOG(INFO) << "!!! uri = " << ss.str(); + ASSERT_OK(FileSystemFromUri(ss.str(), &uri_fs, &path)); + ASSERT_EQ(path, "/"); + + // Sanity check + ASSERT_OK(uri_fs->CreateDir("AB")); + AssertFileStats(uri_fs.get(), "AB", FileType::Directory); + ASSERT_OK(uri_fs->DeleteDir("AB")); + AssertFileStats(uri_fs.get(), "AB", FileType::NonExistent); + } + + void TestGetTargetStats(const std::string& base_dir) { + std::vector stats; + + ASSERT_OK(fs_->CreateDir(base_dir + "AB")); + ASSERT_OK(fs_->CreateDir(base_dir + "AB/CD")); + ASSERT_OK(fs_->CreateDir(base_dir + "AB/EF")); + ASSERT_OK(fs_->CreateDir(base_dir + "AB/EF/GH")); + ASSERT_OK(fs_->CreateDir(base_dir + "AB/EF/GH/IJ")); + CreateFile(fs_.get(), base_dir + "AB/data", "some data"); + + // With single path + FileStats st; + ASSERT_OK(fs_->GetTargetStats(base_dir + "AB", &st)); + AssertFileStats(st, base_dir + "AB", FileType::Directory); + ASSERT_OK(fs_->GetTargetStats(base_dir + "AB/data", &st)); + AssertFileStats(st, base_dir + "AB/data", FileType::File, 9); + + // With selector + Selector selector; + selector.base_dir = base_dir + "AB"; + selector.recursive = false; + + ASSERT_OK(fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], base_dir + "AB/CD", FileType::Directory); + AssertFileStats(stats[1], base_dir + "AB/EF", FileType::Directory); + AssertFileStats(stats[2], base_dir + "AB/data", FileType::File); + + selector.recursive = true; + ASSERT_OK(fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 5); + AssertFileStats(stats[0], base_dir + "AB/CD", FileType::Directory); + AssertFileStats(stats[1], base_dir + "AB/EF", FileType::Directory); + AssertFileStats(stats[2], base_dir + "AB/EF/GH", FileType::Directory); + AssertFileStats(stats[3], base_dir + "AB/EF/GH/IJ", FileType::Directory); + AssertFileStats(stats[4], base_dir + "AB/data", FileType::File, 9); + + selector.max_recursion = 0; + ASSERT_OK(fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 3); + AssertFileStats(stats[0], base_dir + "AB/CD", FileType::Directory); + AssertFileStats(stats[1], base_dir + "AB/EF", FileType::Directory); + AssertFileStats(stats[2], base_dir + "AB/data", FileType::File); + + selector.max_recursion = 1; + ASSERT_OK(fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 4); + AssertFileStats(stats[0], base_dir + "AB/CD", FileType::Directory); + AssertFileStats(stats[1], base_dir + "AB/EF", FileType::Directory); + AssertFileStats(stats[2], base_dir + "AB/EF/GH", FileType::Directory); + AssertFileStats(stats[3], base_dir + "AB/data", FileType::File); + + selector.base_dir = base_dir + "XYZ"; + selector.allow_non_existent = true; + ASSERT_OK(fs_->GetTargetStats(selector, &stats)); + ASSERT_EQ(stats.size(), 0); + + selector.allow_non_existent = false; + ASSERT_RAISES(IOError, fs_->GetTargetStats(selector, &stats)); + + ASSERT_OK(fs_->DeleteDir(base_dir + "AB")); + AssertFileStats(fs_.get(), base_dir + "AB", FileType::NonExistent); + } + + protected: + std::shared_ptr fs_; + bool use_hdfs3_; + HdfsOptions options_; + bool loaded_driver_ = false; +}; + +HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; +HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; + +typedef ::testing::Types DriverTypes; + +TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); + +#define SKIP_IF_NO_DRIVER() \ + if (!this->loaded_driver_) { \ + ARROW_LOG(INFO) << "Driver not loaded, skipping"; \ + return; \ + } + +TYPED_TEST(TestHadoopFileSystem, CreateDirDeleteDir) { + SKIP_IF_NO_DRIVER(); + + // recursive = true + ASSERT_OK(this->fs_->CreateDir("AB/CD")); + CreateFile(this->fs_.get(), "AB/CD/data", "some data"); + AssertFileStats(this->fs_.get(), "AB", FileType::Directory); + AssertFileStats(this->fs_.get(), "AB/CD", FileType::Directory); + AssertFileStats(this->fs_.get(), "AB/CD/data", FileType::File, 9); + + ASSERT_OK(this->fs_->DeleteDir("AB")); + AssertFileStats(this->fs_.get(), "AB", FileType::NonExistent); + + // recursive = false + ASSERT_RAISES(IOError, this->fs_->CreateDir("AB/CD", /*recursive=*/false)); + ASSERT_OK(this->fs_->CreateDir("AB", /*recursive=*/false)); + ASSERT_OK(this->fs_->CreateDir("AB/CD", /*recursive=*/false)); + + ASSERT_OK(this->fs_->DeleteDir("AB")); + AssertFileStats(this->fs_.get(), "AB", FileType::NonExistent); + ASSERT_RAISES(IOError, this->fs_->DeleteDir("AB")); +} + +TYPED_TEST(TestHadoopFileSystem, DeleteDirContents) { + SKIP_IF_NO_DRIVER(); + + ASSERT_OK(this->fs_->CreateDir("AB/CD")); + CreateFile(this->fs_.get(), "AB/CD/data", "some data"); + AssertFileStats(this->fs_.get(), "AB", FileType::Directory); + AssertFileStats(this->fs_.get(), "AB/CD", FileType::Directory); + AssertFileStats(this->fs_.get(), "AB/CD/data", FileType::File, 9); + + ASSERT_OK(this->fs_->DeleteDirContents("AB")); + AssertFileStats(this->fs_.get(), "AB", FileType::Directory); + AssertFileStats(this->fs_.get(), "AB/CD", FileType::NonExistent); + AssertFileStats(this->fs_.get(), "AB/CD/data", FileType::NonExistent); + + ASSERT_OK(this->fs_->DeleteDirContents("AB")); + AssertFileStats(this->fs_.get(), "AB", FileType::Directory); + ASSERT_OK(this->fs_->DeleteDir("AB")); +} + +TYPED_TEST(TestHadoopFileSystem, WriteReadFile) { + SKIP_IF_NO_DRIVER(); + + ASSERT_OK(this->fs_->CreateDir("CD")); + constexpr int kDataSize = 9; + std::string file_name = "CD/abc"; + std::string data = "some data"; + std::shared_ptr stream; + ASSERT_OK(this->fs_->OpenOutputStream(file_name, &stream)); + auto data_size = static_cast(data.size()); + ASSERT_OK(stream->Write(data.data(), data_size)); + ASSERT_OK(stream->Close()); + + std::shared_ptr file; + ASSERT_OK(this->fs_->OpenInputFile(file_name, &file)); + int64_t file_size; + ASSERT_OK(file->GetSize(&file_size)); + ASSERT_EQ(kDataSize, file_size); + uint8_t buffer[kDataSize]; + int64_t bytes_read = 0; + ASSERT_OK(file->Read(kDataSize, &bytes_read, buffer)); + ASSERT_EQ(0, std::memcmp(buffer, data.c_str(), kDataSize)); + + ASSERT_OK(this->fs_->DeleteDir("CD")); +} + +TYPED_TEST(TestHadoopFileSystem, GetTargetStatsRelative) { + // Test GetTargetStats() with relative paths + SKIP_IF_NO_DRIVER(); + + this->TestGetTargetStats(""); +} + +TYPED_TEST(TestHadoopFileSystem, GetTargetStatsAbsolute) { + // Test GetTargetStats() with absolute paths + SKIP_IF_NO_DRIVER(); + + this->TestGetTargetStats("/"); +} + +TYPED_TEST(TestHadoopFileSystem, RelativeVsAbsolutePaths) { + SKIP_IF_NO_DRIVER(); + + // XXX This test assumes the current working directory is not "/" + + ASSERT_OK(this->fs_->CreateDir("AB")); + AssertFileStats(this->fs_.get(), "AB", FileType::Directory); + AssertFileStats(this->fs_.get(), "/AB", FileType::NonExistent); + + ASSERT_OK(this->fs_->CreateDir("/CD")); + AssertFileStats(this->fs_.get(), "/CD", FileType::Directory); + AssertFileStats(this->fs_.get(), "CD", FileType::NonExistent); +} + +TYPED_TEST(TestHadoopFileSystem, MoveDir) { + SKIP_IF_NO_DRIVER(); + + FileStats stat; + std::string directory_name_src = "AB"; + std::string directory_name_dest = "CD"; + ASSERT_OK(this->fs_->CreateDir(directory_name_src)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name_src, &stat)); + AssertFileStats(stat, directory_name_src, FileType::Directory); + + // move file + ASSERT_OK(this->fs_->Move(directory_name_src, directory_name_dest)); + ASSERT_OK(this->fs_->GetTargetStats(directory_name_src, &stat)); + ASSERT_TRUE(stat.type() == FileType::NonExistent); + + ASSERT_OK(this->fs_->GetTargetStats(directory_name_dest, &stat)); + AssertFileStats(stat, directory_name_dest, FileType::Directory); + ASSERT_OK(this->fs_->DeleteDir(directory_name_dest)); +} + +TYPED_TEST(TestHadoopFileSystem, FileSystemFromUri) { + SKIP_IF_NO_DRIVER(); + + this->TestFileSystemFromUri(); +} + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index d92caf99c88..75485e60b4a 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -25,7 +25,9 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" @@ -36,12 +38,6 @@ namespace internal { using ::arrow::internal::PlatformFilename; using ::arrow::internal::TemporaryDir; -TimePoint CurrentTimePoint() { - auto now = std::chrono::system_clock::now(); - return TimePoint( - std::chrono::duration_cast(now.time_since_epoch())); -} - class LocalFSTestMixin : public ::testing::Test { public: void SetUp() override { ASSERT_OK(TemporaryDir::Make("test-localfs-", &temp_dir_)); } @@ -110,13 +106,35 @@ class TestLocalFS : public LocalFSTestMixin { void SetUp() { LocalFSTestMixin::SetUp(); local_fs_ = std::make_shared(); - auto path = PathFormatter()(temp_dir_->path()); - fs_ = std::make_shared(path, local_fs_); + local_path_ = EnsureTrailingSlash(PathFormatter()(temp_dir_->path())); + fs_ = std::make_shared(local_path_, local_fs_); + } + + void TestFileSystemFromUri(const std::string& uri) { + std::string path; + ASSERT_OK(FileSystemFromUri(uri, &fs_, &path)); + + // Test that the right location on disk is accessed + CreateFile(fs_.get(), local_path_ + "abc", "some data"); + CheckConcreteFile(this->temp_dir_->path().ToString() + "abc", 9); + } + + void CheckConcreteFile(const std::string& path, int64_t expected_size) { + PlatformFilename fn; + int fd; + int64_t size = -1; + ASSERT_OK(PlatformFilename::FromString(path, &fn)); + ASSERT_OK(::arrow::internal::FileOpenReadable(fn, &fd)); + Status st = ::arrow::internal::FileGetSize(fd, &size); + ASSERT_OK(::arrow::internal::FileClose(fd)); + ASSERT_OK(st); + ASSERT_EQ(size, expected_size); } protected: std::shared_ptr local_fs_; std::shared_ptr fs_; + std::string local_path_; }; TYPED_TEST_CASE(TestLocalFS, PathFormatters); @@ -131,16 +149,15 @@ TYPED_TEST(TestLocalFS, CorrectPathExists) { ASSERT_OK(stream->Close()); // Now check the file's existence directly, bypassing the FileSystem abstraction - auto path = this->temp_dir_->path().ToString() + "/abc"; - PlatformFilename fn; - int fd; - int64_t size = -1; - ASSERT_OK(PlatformFilename::FromString(path, &fn)); - ASSERT_OK(::arrow::internal::FileOpenReadable(fn, &fd)); - Status st = ::arrow::internal::FileGetSize(fd, &size); - ASSERT_OK(::arrow::internal::FileClose(fd)); - ASSERT_OK(st); - ASSERT_EQ(size, data_size); + this->CheckConcreteFile(this->temp_dir_->path().ToString() + "abc", data_size); +} + +TYPED_TEST(TestLocalFS, FileSystemFromUriFile) { + this->TestFileSystemFromUri("file:" + this->local_path_); +} + +TYPED_TEST(TestLocalFS, FileSystemFromUriNoScheme) { + this->TestFileSystemFromUri(this->local_path_); } TYPED_TEST(TestLocalFS, DirectoryMTime) { diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 2ba143e59b9..a5d54924504 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -104,16 +104,24 @@ std::string ConcatAbstractPath(const std::string& base, const std::string& stem) } } -std::string EnsureTrailingSlash(const std::string& s) { - if (s.length() > 0 && s.back() != kSep) { +std::string EnsureTrailingSlash(util::string_view v) { + if (v.length() > 0 && v.back() != kSep) { // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"... // Unless the local filesystem always uses absolute paths - return s + kSep; + return std::string(v) + kSep; } else { - return s; + return std::string(v); } } +std::string EnsureLeadingSlash(util::string_view v) { + if (v.length() == 0 || v.front() != kSep) { + // XXX How about "C:" on Windows? We probably don't want to turn it into "/C:"... + return kSep + std::string(v); + } else { + return std::string(v); + } +} util::string_view RemoveTrailingSlash(util::string_view key) { while (!key.empty() && key.back() == kSep) { key.remove_suffix(1); @@ -121,6 +129,31 @@ util::string_view RemoveTrailingSlash(util::string_view key) { return key; } +util::string_view RemoveLeadingSlash(util::string_view key) { + while (!key.empty() && key.front() == kSep) { + key.remove_prefix(1); + } + return key; +} + +Result MakeAbstractPathRelative(const std::string& base, + const std::string& path) { + if (base.empty() || base.front() != kSep) { + return Status::Invalid("MakeAbstractPathRelative called with non-absolute base '", + base, "'"); + } + auto b = EnsureLeadingSlash(RemoveTrailingSlash(base)); + auto p = util::string_view(path); + if (p.substr(0, b.size()) != util::string_view(b)) { + return Status::Invalid("Path '", path, "' is not relative to '", base, "'"); + } + p = p.substr(b.size()); + if (!p.empty() && p.front() != kSep && b.back() != kSep) { + return Status::Invalid("Path '", path, "' is not relative to '", base, "'"); + } + return std::string(RemoveLeadingSlash(p)); +} + } // namespace internal } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/path_util.h b/cpp/src/arrow/filesystem/path_util.h index d90c3510e67..a223f165cc6 100644 --- a/cpp/src/arrow/filesystem/path_util.h +++ b/cpp/src/arrow/filesystem/path_util.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/string_view.h" @@ -54,8 +55,19 @@ Status ValidateAbstractPathParts(const std::vector& parts); ARROW_EXPORT std::string ConcatAbstractPath(const std::string& base, const std::string& stem); +// Make path relative to base, if it starts with base. Otherwise error out. ARROW_EXPORT -std::string EnsureTrailingSlash(const std::string& s); +Result MakeAbstractPathRelative(const std::string& base, + const std::string& path); + +ARROW_EXPORT +std::string EnsureLeadingSlash(util::string_view s); + +ARROW_EXPORT +util::string_view RemoveLeadingSlash(util::string_view s); + +ARROW_EXPORT +std::string EnsureTrailingSlash(util::string_view s); ARROW_EXPORT util::string_view RemoveTrailingSlash(util::string_view s); diff --git a/cpp/src/arrow/filesystem/test_util.cc b/cpp/src/arrow/filesystem/test_util.cc index 6786e3973a4..66c3a1a2674 100644 --- a/cpp/src/arrow/filesystem/test_util.cc +++ b/cpp/src/arrow/filesystem/test_util.cc @@ -25,6 +25,7 @@ #include #include "arrow/filesystem/test_util.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/io/interfaces.h" #include "arrow/testing/gtest_util.h" diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index ad3937824db..e4d3dcfa44f 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -22,6 +22,12 @@ namespace arrow { namespace fs { namespace internal { +TimePoint CurrentTimePoint() { + auto now = std::chrono::system_clock::now(); + return TimePoint( + std::chrono::duration_cast(now.time_since_epoch())); +} + Status CopyStream(const std::shared_ptr& src, const std::shared_ptr& dest, int64_t chunk_size) { std::shared_ptr chunk; diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index eabdad4a6fa..46f99202156 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -19,6 +19,7 @@ #include +#include "arrow/filesystem/filesystem.h" #include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/util/visibility.h" @@ -27,6 +28,9 @@ namespace arrow { namespace fs { namespace internal { +ARROW_EXPORT +TimePoint CurrentTimePoint(); + ARROW_EXPORT Status CopyStream(const std::shared_ptr& src, const std::shared_ptr& dest, int64_t chunk_size); diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 8845aaf55b9..3125848ea61 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include - -#include #include #include #include @@ -416,6 +413,16 @@ class HadoopFileSystem::HadoopFileSystemImpl { return Status::OK(); } + Status GetWorkingDirectory(std::string* out) { + char buffer[2048]; + if (driver_->GetWorkingDirectory(fs_, buffer, sizeof(buffer) - 1) == nullptr) { + return Status::IOError("HDFS GetWorkingDirectory failed, errno: ", + TranslateErrno(errno)); + } + *out = buffer; + return Status::OK(); + } + Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); @@ -597,6 +604,10 @@ Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } +Status HadoopFileSystem::GetWorkingDirectory(std::string* out) { + return impl_->GetWorkingDirectory(out); +} + Status HadoopFileSystem::GetChildren(const std::string& path, std::vector* listing) { return impl_->GetChildren(path, listing); diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 133142c33ba..3cf8f2a5c2b 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -66,6 +66,8 @@ struct HdfsConnectionConfig { std::string kerb_ticket; std::unordered_map extra_conf; HdfsDriver driver; + + HdfsConnectionConfig() : driver(HdfsDriver::LIBHDFS) {} }; class ARROW_EXPORT HadoopFileSystem : public FileSystem { @@ -118,8 +120,19 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { Status GetChildren(const std::string& path, std::vector* listing) override; + /// List directory contents + /// + /// If path is a relative path, returned values will be absolute paths or URIs + /// starting from the current working directory. Status ListDirectory(const std::string& path, std::vector* listing); + /// Return the filesystem's current working directory. + /// + /// The working directory is the base path for all relative paths given to + /// other APIs. + /// NOTE: this actually returns a URI. + Status GetWorkingDirectory(std::string* out); + /// Change /// /// @param path file path to change diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 7a3cb18595d..81d7266168f 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -134,6 +134,12 @@ inline Status GenericToStatus(const Result& res) { ASSERT_OK_AND_ASSIGN_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \ lhs, rexpr); +#define ASSERT_OK_AND_EQ(expected, expr) \ + do { \ + ASSERT_OK_AND_ASSIGN(auto _actual, (expr)); \ + ASSERT_EQ(expected, _actual); \ + } while (0) + namespace arrow { // ----------------------------------------------------------------------