diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 9acd7083437..63c9bab0859 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -408,19 +408,11 @@ arrow_add_object_library(ARROW_IO io/caching.cc io/compressed.cc io/file.cc - io/hdfs.cc - io/hdfs_internal.cc io/interfaces.cc io/memory.cc io/slow.cc io/stdio.cc io/transform.cc) -foreach(ARROW_IO_TARGET ${ARROW_IO_TARGETS}) - target_link_libraries(${ARROW_IO_TARGET} PRIVATE arrow::hadoop) - if(NOT MSVC) - target_link_libraries(${ARROW_IO_TARGET} PRIVATE ${CMAKE_DL_LIBS}) - endif() -endforeach() set(ARROW_MEMORY_POOL_SRCS memory_pool.cc) if(ARROW_JEMALLOC) @@ -906,7 +898,7 @@ if(ARROW_FILESYSTEM) PROPERTIES SKIP_UNITY_BUILD_INCLUSION ON) endif() if(ARROW_HDFS) - list(APPEND ARROW_FILESYSTEM_SRCS filesystem/hdfs.cc) + list(APPEND ARROW_FILESYSTEM_SRCS filesystem/hdfs.cc filesystem/hdfs_internal.cc) endif() if(ARROW_S3) list(APPEND ARROW_FILESYSTEM_SRCS filesystem/s3fs.cc) diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index 5250ed2a887..32f210b7fb0 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -143,4 +143,12 @@ endif() if(ARROW_HDFS) add_arrow_test(hdfs_test EXTRA_LABELS filesystem) + add_arrow_test(hdfs_internal_test + NO_VALGRIND + EXTRA_LABELS + filesystem + EXTRA_LINK_LIBS + arrow::hadoop + Boost::filesystem + Boost::system) endif() diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc index d59b2a342d7..41048c1f512 100644 --- a/cpp/src/arrow/filesystem/hdfs.cc +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -21,12 +21,11 @@ #include #include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/hdfs_internal.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" -#include "arrow/io/hdfs.h" -#include "arrow/io/hdfs_internal.h" +#include "arrow/result.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/io_util.h" #include "arrow/util/logging.h" #include "arrow/util/value_parsing.h" #include "arrow/util/windows_fixup.h" @@ -39,28 +38,119 @@ using util::Uri; namespace fs { +using internal::HdfsOutputStream; +using internal::HdfsPathInfo; +using internal::HdfsReadableFile; + +#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ + do { \ + if (RETURN_VALUE == -1) { \ + return IOErrorFromErrno(errno, "HDFS ", WHAT, " failed"); \ + } \ + } while (0) + +static constexpr int kDefaultHdfsBufferSize = 1 << 16; + using internal::GetAbstractPathParent; using internal::MakeAbstractPathRelative; using internal::RemoveLeadingSlash; +// ---------------------------------------------------------------------- +// HDFS client + +// TODO(wesm): this could throw std::bad_alloc in the course of copying strings +// into the path info object +static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { + out->kind = input->mKind == kObjectKindFile ? arrow::fs::FileType::File + : arrow::fs::FileType::Directory; + out->name = std::string(input->mName); + out->owner = std::string(input->mOwner); + out->group = std::string(input->mGroup); + + out->last_access_time = static_cast(input->mLastAccess); + out->last_modified_time = static_cast(input->mLastMod); + out->size = static_cast(input->mSize); + + out->replication = input->mReplication; + out->block_size = input->mBlockSize; + + out->permissions = input->mPermissions; +} + +namespace { + +Status GetPathInfoFailed(const std::string& path) { + return IOErrorFromErrno(errno, "Calling GetPathInfo for '", path, "' failed"); +} + +} // namespace + class HadoopFileSystem::Impl { public: Impl(HdfsOptions options, const io::IOContext& io_context) - : options_(std::move(options)), io_context_(io_context) {} + : options_(std::move(options)), + io_context_(io_context), + driver_(NULLPTR), + port_(0), + client_(NULLPTR) {} ~Impl() { ARROW_WARN_NOT_OK(Close(), "Failed to disconnect hdfs client"); } Status Init() { - io::internal::LibHdfsShim* driver_shim; - RETURN_NOT_OK(ConnectLibHdfs(&driver_shim)); - RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.connection_config, &client_)); + const HdfsConnectionConfig* config = &options_.connection_config; + RETURN_NOT_OK(ConnectLibHdfs(&driver_)); + + if (!driver_) { + return Status::Invalid("Failed to initialize HDFS driver"); + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver_->NewBuilder(); + if (!config->host.empty()) { + driver_->BuilderSetNameNode(builder, config->host.c_str()); + } + driver_->BuilderSetNameNodePort(builder, static_cast(config->port)); + if (!config->user.empty()) { + driver_->BuilderSetUserName(builder, config->user.c_str()); + } + if (!config->kerb_ticket.empty()) { + driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); + } + + for (const auto& kv : config->extra_conf) { + int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); + CHECK_FAILURE(ret, "confsetstr"); + } + + driver_->BuilderSetForceNewInstance(builder); + client_ = driver_->BuilderConnect(builder); + + if (client_ == nullptr) { + return Status::IOError("HDFS connection failed"); + } + + namenode_host_ = config->host; + port_ = config->port; + user_ = config->user; + kerb_ticket_ = config->kerb_ticket; + return Status::OK(); } Status Close() { - if (client_) { - RETURN_NOT_OK(client_->Disconnect()); + if (client_ == nullptr) { + return Status::OK(); // already closed } + + if (!driver_) { + return Status::Invalid("driver_ is null in Close()"); + } + + int ret = driver_->Disconnect(client_); + CHECK_FAILURE(ret, "hdfsFS::Disconnect"); + + client_ = nullptr; // prevent double-disconnect + return Status::OK(); } @@ -76,8 +166,8 @@ class HadoopFileSystem::Impl { return Status::Invalid("GetFileInfo must not be passed a URI, got: ", path); } FileInfo info; - io::HdfsPathInfo path_info; - auto status = client_->GetPathInfo(path, &path_info); + HdfsPathInfo path_info; + auto status = GetPathInfoStatus(path, &path_info); info.set_path(path); if (status.IsIOError()) { info.set_type(FileType::NotFound); @@ -88,11 +178,18 @@ class HadoopFileSystem::Impl { return info; } + bool Exists(const std::string& path) { + // hdfsExists does not distinguish between RPC failure and the file not + // existing + int ret = driver_->Exists(client_, path.c_str()); + return ret == 0; + } + Status StatSelector(const std::string& wd, const std::string& path, const FileSelector& select, int nesting_depth, std::vector* out) { - std::vector children; - Status st = client_->ListDirectory(path, &children); + std::vector children; + Status st = ListDirectory(path, &children); if (!st.ok()) { if (select.allow_not_found) { ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(path)); @@ -129,6 +226,15 @@ class HadoopFileSystem::Impl { return Status::OK(); } + Status GetWorkingDirectory(std::string* out) { + char buffer[2048]; + if (driver_->GetWorkingDirectory(client_, buffer, sizeof(buffer) - 1) == nullptr) { + return IOErrorFromErrno(errno, "HDFS GetWorkingDirectory failed"); + } + *out = buffer; + return Status::OK(); + } + Result> GetFileInfo(const FileSelector& select) { // See GetFileInfo(const std::string&) above. if (select.base_dir.substr(0, 5) == "hdfs:") { @@ -143,7 +249,7 @@ class HadoopFileSystem::Impl { // If select.base_dir is absolute, we need to trim the "URI authority" // portion of the working directory. std::string wd; - RETURN_NOT_OK(client_->GetWorkingDirectory(&wd)); + RETURN_NOT_OK(GetWorkingDirectory(&wd)); if (!select.base_dir.empty() && select.base_dir.front() == '/') { // base_dir is absolute, only keep the URI authority portion. @@ -186,24 +292,102 @@ class HadoopFileSystem::Impl { "': parent is not a directory"); } } - RETURN_NOT_OK(client_->MakeDirectory(path)); + RETURN_NOT_OK(MakeDirectory(path)); + return Status::OK(); + } + + Status MakeDirectory(const std::string& path) { + if (IsDirectory(path)) { + return Status::OK(); + } + RETURN_NOT_OK(MakeDirectory(path)); + return Status::OK(); + } + + Status GetPathInfoStatus(const std::string& path, HdfsPathInfo* info) { + hdfsFileInfo* entry = driver_->GetPathInfo(client_, path.c_str()); + + if (entry == nullptr) { + return GetPathInfoFailed(path); + } + + SetPathInfo(entry, info); + driver_->FreeFileInfo(entry, 1); + + return Status::OK(); + } + + Status Stat(const std::string& path, arrow::fs::FileInfo* file_info) { + HdfsPathInfo info; + RETURN_NOT_OK(GetPathInfoStatus(path, &info)); + + file_info->set_size(info.size); + file_info->set_type(info.kind); return Status::OK(); } Status CheckForDirectory(const std::string& path, const char* action) { // Check existence of path, and that it's a directory - io::HdfsPathInfo info; - RETURN_NOT_OK(client_->GetPathInfo(path, &info)); - if (info.kind != io::ObjectType::DIRECTORY) { + HdfsPathInfo info; + RETURN_NOT_OK(GetPathInfoStatus(path, &info)); + if (info.kind != FileType::Directory) { return Status::IOError("Cannot ", action, " directory '", path, "': not a directory"); } return Status::OK(); } + Status GetChildren(const std::string& path, std::vector* listing) { + std::vector detailed_listing; + RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); + for (const auto& info : detailed_listing) { + listing->push_back(info.name); + } + return Status::OK(); + } + + Status ListDirectory(const std::string& path, std::vector* listing) { + int num_entries = 0; + errno = 0; + hdfsFileInfo* entries = driver_->ListDirectory(client_, path.c_str(), &num_entries); + + if (entries == nullptr) { + // If the directory is empty, entries is NULL but errno is 0. Non-zero + // errno indicates error + // + // Note: errno is thread-local + // + // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set + // errno 2/ENOENT for empty directories. To be more robust to this we + // double check this case + if ((errno == 0) || (errno == ENOENT && Exists(path))) { + num_entries = 0; + } else { + return IOErrorFromErrno(errno, "HDFS list directory failed"); + } + } + + // Allocate additional space for elements + int vec_offset = static_cast(listing->size()); + listing->resize(vec_offset + num_entries); + + for (int i = 0; i < num_entries; ++i) { + SetPathInfo(entries + i, &(*listing)[vec_offset + i]); + } + + // Free libhdfs file info + driver_->FreeFileInfo(entries, num_entries); + + return Status::OK(); + } + + Status DeleteDirectory(const std::string& path) { + return Delete(path, /*recursive=*/true); + } + Status DeleteDir(const std::string& path) { RETURN_NOT_OK(CheckForDirectory(path, "delete")); - return client_->DeleteDirectory(path); + return DeleteDirectory(path); } Status DeleteDirContents(const std::string& path, bool missing_dir_ok) { @@ -216,9 +400,9 @@ class HadoopFileSystem::Impl { } std::vector file_list; - RETURN_NOT_OK(client_->GetChildren(path, &file_list)); + RETURN_NOT_OK(GetChildren(path, &file_list)); for (const auto& file : file_list) { - RETURN_NOT_OK(client_->Delete(file, /*recursive=*/true)); + RETURN_NOT_OK(Delete(file, /*recursive=*/true)); } return Status::OK(); } @@ -227,35 +411,126 @@ class HadoopFileSystem::Impl { if (IsDirectory(path)) { return Status::IOError("path is a directory"); } - RETURN_NOT_OK(client_->Delete(path)); + RETURN_NOT_OK(Delete(path, /*recursive=*/false)); + return Status::OK(); + } + + Status Delete(const std::string& path, bool recursive) { + int ret = driver_->Delete(client_, path.c_str(), static_cast(recursive)); + CHECK_FAILURE(ret, "delete"); + return Status::OK(); + } + + Status Rename(const std::string& src, const std::string& dst) { + int ret = driver_->Rename(client_, src.c_str(), dst.c_str()); + CHECK_FAILURE(ret, "Rename"); return Status::OK(); } Status Move(const std::string& src, const std::string& dest) { - auto st = client_->Rename(src, dest); + auto st = Rename(src, dest); if (st.IsIOError() && IsFile(src) && IsFile(dest)) { // Allow file -> file clobber - RETURN_NOT_OK(client_->Delete(dest)); - st = client_->Rename(src, dest); + RETURN_NOT_OK(Delete(dest, /*recursive=*/false)); + st = Rename(src, dest); } return st; } + Status Copy(const std::string& src, const std::string& dst) { + int ret = driver_->Copy(client_, src.c_str(), client_, dst.c_str()); + CHECK_FAILURE(ret, "Rename"); + return Status::OK(); + } + Status CopyFile(const std::string& src, const std::string& dest) { - return client_->Copy(src, dest); + return Copy(src, dest); + } + + Status Chmod(const std::string& path, int mode) { + int ret = driver_->Chmod(client_, path.c_str(), static_cast(mode)); // NOLINT + CHECK_FAILURE(ret, "Chmod"); + return Status::OK(); + } + + Status Chown(const std::string& path, const char* owner, const char* group) { + int ret = driver_->Chown(client_, path.c_str(), owner, group); + CHECK_FAILURE(ret, "Chown"); + return Status::OK(); + } + + Status OpenReadable(const std::string& path, int32_t buffer_size, + const io::IOContext& io_context, + std::shared_ptr* file) { + errno = 0; + hdfsFile handle = + driver_->OpenFile(client_, path.c_str(), O_RDONLY, buffer_size, 0, 0); + + if (handle == nullptr) { + return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed"); + } + + // std::make_shared does not work with private ctors + std::shared_ptr file_tmp; + ARROW_ASSIGN_OR_RAISE(file_tmp, HdfsReadableFile::Make(path, buffer_size, io_context, + driver_, client_, handle)); + *file = std::move(file_tmp); + return Status::OK(); + } + + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file) { + return OpenReadable(path, buffer_size, io::default_io_context(), file); + } + + Status OpenReadable(const std::string& path, std::shared_ptr* file) { + return OpenReadable(path, kDefaultHdfsBufferSize, io::default_io_context(), file); + } + + Status OpenReadable(const std::string& path, const io::IOContext& io_context, + std::shared_ptr* file) { + return OpenReadable(path, kDefaultHdfsBufferSize, io_context, file); + } + + Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, + int16_t replication, int64_t default_block_size, + std::shared_ptr* file) { + int flags = O_WRONLY; + if (append) flags |= O_APPEND; + + errno = 0; + hdfsFile handle = + driver_->OpenFile(client_, path.c_str(), flags, buffer_size, replication, + static_cast(default_block_size)); + + if (handle == nullptr) { + return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed"); + } + + // std::make_shared does not work with private ctors + std::shared_ptr file_tmp; + ARROW_ASSIGN_OR_RAISE( + file_tmp, HdfsOutputStream::Make(path, buffer_size, driver_, client_, handle)); + *file = std::move(file_tmp); + return Status::OK(); + } + + Status OpenWritable(const std::string& path, bool append, + std::shared_ptr* file) { + return OpenWritable(path, append, 0, 0, 0, file); } Result> OpenInputStream(const std::string& path) { ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path)); - std::shared_ptr file; - RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file)); + std::shared_ptr file; + RETURN_NOT_OK(OpenReadable(path, io_context_, &file)); return file; } Result> OpenInputFile(const std::string& path) { ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path)); - std::shared_ptr file; - RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file)); + std::shared_ptr file; + RETURN_NOT_OK(OpenReadable(path, io_context_, &file)); return file; } @@ -269,16 +544,37 @@ class HadoopFileSystem::Impl { return OpenOutputStreamGeneric(path, append); } + Status GetCapacity(int64_t* nbytes) { + tOffset ret = driver_->GetCapacity(client_); + CHECK_FAILURE(ret, "GetCapacity"); + *nbytes = ret; + return Status::OK(); + } + + Status GetUsed(int64_t* nbytes) { + tOffset ret = driver_->GetUsed(client_); + CHECK_FAILURE(ret, "GetUsed"); + *nbytes = ret; + return Status::OK(); + } + protected: const HdfsOptions options_; const io::IOContext io_context_; - std::shared_ptr<::arrow::io::HadoopFileSystem> client_; + internal::LibHdfsShim* driver_; - void PathInfoToFileInfo(const io::HdfsPathInfo& info, FileInfo* out) { - if (info.kind == io::ObjectType::DIRECTORY) { + std::string namenode_host_; + std::string user_; + int port_; + std::string kerb_ticket_; + + hdfsFS client_; + + void PathInfoToFileInfo(const HdfsPathInfo& info, FileInfo* out) { + if (info.kind == FileType::Directory) { out->set_type(FileType::Directory); out->set_size(kNoSize); - } else if (info.kind == io::ObjectType::FILE) { + } else if (info.kind == FileType::File) { out->set_type(FileType::File); out->set_size(info.size); } @@ -288,25 +584,24 @@ class HadoopFileSystem::Impl { Result> OpenOutputStreamGeneric( const std::string& path, bool append) { ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path)); - std::shared_ptr stream; - RETURN_NOT_OK(client_->OpenWritable(path, append, options_.buffer_size, - options_.replication, options_.default_block_size, - &stream)); + std::shared_ptr stream; + RETURN_NOT_OK(OpenWritable(path, append, options_.buffer_size, options_.replication, + options_.default_block_size, &stream)); return stream; } bool IsDirectory(const std::string& path) { - io::HdfsPathInfo info; - return GetPathInfo(path, &info) && info.kind == io::ObjectType::DIRECTORY; + HdfsPathInfo info; + return GetPathInfo(path, &info) && info.kind == FileType::Directory; } bool IsFile(const std::string& path) { - io::HdfsPathInfo info; - return GetPathInfo(path, &info) && info.kind == io::ObjectType::FILE; + HdfsPathInfo info; + return GetPathInfo(path, &info) && info.kind == FileType::File; } - bool GetPathInfo(const std::string& path, io::HdfsPathInfo* info) { - return client_->GetPathInfo(path, info).ok(); + bool GetPathInfo(const std::string& path, HdfsPathInfo* info) { + return GetPathInfoStatus(path, info).ok(); } TimePoint ToTimePoint(int secs) { @@ -534,5 +829,90 @@ Result> HadoopFileSystem::OpenAppendStream( return impl_->OpenAppendStream(path); } +Status HadoopFileSystem::MakeDirectory(const std::string& path) { + return impl_->MakeDirectory(path); +} + +Status HadoopFileSystem::Delete(const std::string& path, bool recursive) { + return impl_->Delete(path, recursive); +} + +bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } + +Status HadoopFileSystem::GetPathInfoStatus(const std::string& path, HdfsPathInfo* info) { + return impl_->GetPathInfoStatus(path, info); +} + +Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { + return impl_->GetCapacity(nbytes); +} + +Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } + +Status HadoopFileSystem::ListDirectory(const std::string& path, + std::vector* listing) { + return impl_->ListDirectory(path, listing); +} + +Status HadoopFileSystem::Chmod(const std::string& path, int mode) { + return impl_->Chmod(path, mode); +} + +Status HadoopFileSystem::Chown(const std::string& path, const char* owner, + const char* group) { + return impl_->Chown(path, owner, group); +} + +Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) { + return impl_->Rename(src, dst); +} + +Status HadoopFileSystem::Copy(const std::string& src, const std::string& dst) { + return impl_->Copy(src, dst); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file) { + return impl_->OpenReadable(path, buffer_size, file); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, + const io::IOContext& io_context, + std::shared_ptr* file) { + return impl_->OpenReadable(path, buffer_size, io_context, file); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, + std::shared_ptr* file) { + return impl_->OpenReadable(path, file); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, + const io::IOContext& io_context, + std::shared_ptr* file) { + return impl_->OpenReadable(path, io_context, file); +} + +Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, + int32_t buffer_size, int16_t replication, + int64_t default_block_size, + std::shared_ptr* file) { + return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size, + file); +} + +Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, + std::shared_ptr* file) { + return impl_->OpenWritable(path, append, file); +} + +// ---------------------------------------------------------------------- +// Allow public API users to check whether we are set up correctly + +Status HaveLibHdfs() { + internal::LibHdfsShim* driver; + return internal::ConnectLibHdfs(&driver); +} + } // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 25604a39e3a..94091549f7b 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -22,18 +22,32 @@ #include #include "arrow/filesystem/filesystem.h" -#include "arrow/io/hdfs.h" #include "arrow/util/uri.h" namespace arrow::fs { +namespace internal { +class HdfsReadableFile; +class HdfsOutputStream; + +struct HdfsPathInfo; +} // namespace internal + +struct HdfsConnectionConfig { + std::string host; + int port; + std::string user; + std::string kerb_ticket; + std::unordered_map extra_conf; +}; + /// Options for the HDFS implementation. struct ARROW_EXPORT HdfsOptions { HdfsOptions() = default; ~HdfsOptions() = default; /// Hdfs configuration options, contains host, port, driver - io::HdfsConnectionConfig connection_config; + HdfsConnectionConfig connection_config; /// Used by Hdfs OpenWritable Interface. int32_t buffer_size = 0; @@ -55,9 +69,6 @@ struct ARROW_EXPORT HdfsOptions { }; /// HDFS-backed FileSystem implementation. -/// -/// implementation notes: -/// - This is a wrapper of arrow/io/hdfs, so we can use FileSystem API to handle hdfs. class ARROW_EXPORT HadoopFileSystem : public FileSystem { public: ~HadoopFileSystem() override; @@ -88,10 +99,49 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { Status DeleteFile(const std::string& path) override; + Status MakeDirectory(const std::string& path); + + bool Exists(const std::string& path); + + Status GetPathInfoStatus(const std::string& path, internal::HdfsPathInfo* info); + + Status ListDirectory(const std::string& path, + std::vector* listing); + + // Delete file or directory + // @param path absolute path to data + // @param recursive if path is a directory, delete contents as well + // @returns error status on failure + Status Delete(const std::string& path, bool recursive = false); + Status Move(const std::string& src, const std::string& dest) override; Status CopyFile(const std::string& src, const std::string& dest) override; + // Move file or directory from source path to destination path within the + // current filesystem + Status Rename(const std::string& src, const std::string& dst); + + Status Copy(const std::string& src, const std::string& dst); + + Status GetCapacity(int64_t* nbytes); + + Status GetUsed(int64_t* nbytes); + + /// Change + /// + /// @param path file path to change + /// @param owner pass null for no change + /// @param group pass null for no change + Status Chown(const std::string& path, const char* owner, const char* group); + + /// Change path permissions + /// + /// \param path Absolute path in file system + /// \param mode Mode bitset + /// \return Status + Status Chmod(const std::string& path, int mode); + Result> OpenInputStream( const std::string& path) override; Result> OpenInputFile( @@ -107,6 +157,35 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { static Result> Make( const HdfsOptions& options, const io::IOContext& = io::default_io_context()); + // Open an HDFS file in READ mode. Returns error + // status if the file is not found. + // + // @param path complete file path + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file); + + Status OpenReadable(const std::string& path, int32_t buffer_size, + const io::IOContext& io_context, + std::shared_ptr* file); + + Status OpenReadable(const std::string& path, + std::shared_ptr* file); + + Status OpenReadable(const std::string& path, const io::IOContext& io_context, + std::shared_ptr* file); + + // FileMode::WRITE options + // @param path complete file path + // @param buffer_size 0 by default + // @param replication 0 by default + // @param default_block_size 0 by default + Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, + int16_t replication, int64_t default_block_size, + std::shared_ptr* file); + + Status OpenWritable(const std::string& path, bool append, + std::shared_ptr* file); + protected: HadoopFileSystem(const HdfsOptions& options, const io::IOContext&); @@ -114,4 +193,6 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { std::unique_ptr impl_; }; +ARROW_EXPORT Status HaveLibHdfs(); + } // namespace arrow::fs diff --git a/cpp/src/arrow/io/hdfs_internal.cc b/cpp/src/arrow/filesystem/hdfs_internal.cc similarity index 63% rename from cpp/src/arrow/io/hdfs_internal.cc rename to cpp/src/arrow/filesystem/hdfs_internal.cc index 4a88b9a6be6..00c481376f8 100644 --- a/cpp/src/arrow/io/hdfs_internal.cc +++ b/cpp/src/arrow/filesystem/hdfs_internal.cc @@ -28,7 +28,7 @@ // This software may be modified and distributed under the terms // of the BSD license. See the LICENSE file for details. -#include "arrow/io/hdfs_internal.h" +#include "arrow/filesystem/hdfs_internal.h" #include #include @@ -43,6 +43,7 @@ # include #endif +#include "arrow/buffer.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/util/io_util.h" @@ -51,12 +52,20 @@ namespace arrow { using internal::GetEnvVarNative; +using internal::IOErrorFromErrno; using internal::PlatformFilename; -namespace io::internal { +namespace fs::internal { namespace { +#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ + do { \ + if (RETURN_VALUE == -1) { \ + return IOErrorFromErrno(errno, "HDFS ", WHAT, " failed"); \ + } \ + } while (0) + template Status SetSymbol(void* handle, char const* name, T** symbol) { if (*symbol != nullptr) return Status::OK(); @@ -502,5 +511,304 @@ int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { } } -} // namespace io::internal +// ---------------------------------------------------------------------- +// File reading + +class HdfsAnyFileImpl { + public: + void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle) { + path_ = path; + driver_ = driver; + fs_ = fs; + file_ = handle; + is_open_ = true; + } + + Status Seek(int64_t position) { + RETURN_NOT_OK(CheckClosed()); + int ret = driver_->Seek(fs_, file_, position); + CHECK_FAILURE(ret, "seek"); + return Status::OK(); + } + + Result Tell() { + RETURN_NOT_OK(CheckClosed()); + int64_t ret = driver_->Tell(fs_, file_); + CHECK_FAILURE(ret, "tell"); + return ret; + } + + bool is_open() const { return is_open_; } + + protected: + Status CheckClosed() { + if (!is_open_) { + return Status::Invalid("Operation on closed HDFS file"); + } + return Status::OK(); + } + + std::string path_; + + internal::LibHdfsShim* driver_; + + // For threadsafety + std::mutex lock_; + + // These are pointers in libhdfs, so OK to copy + hdfsFS fs_; + hdfsFile file_; + + bool is_open_; +}; + +namespace { + +Status GetPathInfoFailed(const std::string& path) { + return IOErrorFromErrno(errno, "Calling GetPathInfo for '", path, "' failed"); +} + +} // namespace + +// Private implementation for read-only files +class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { + public: + explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} + + Status Close() { + if (is_open_) { + // is_open_ must be set to false in the beginning, because the destructor + // attempts to close the stream again, and if the first close fails, then + // the error doesn't get propagated properly and the second close + // initiated by the destructor raises a segfault + is_open_ = false; + int ret = driver_->CloseFile(fs_, file_); + CHECK_FAILURE(ret, "CloseFile"); + } + return Status::OK(); + } + + bool closed() const { return !is_open_; } + + Result ReadAt(int64_t position, int64_t nbytes, uint8_t* buffer) { + RETURN_NOT_OK(CheckClosed()); + if (!driver_->HasPread()) { + std::lock_guard guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, buffer); + } + + constexpr int64_t kMaxBlockSize = std::numeric_limits::max(); + int64_t total_bytes = 0; + while (nbytes > 0) { + const auto block_size = static_cast(std::min(kMaxBlockSize, nbytes)); + tSize ret = + driver_->Pread(fs_, file_, static_cast(position), buffer, block_size); + CHECK_FAILURE(ret, "read"); + DCHECK_LE(ret, block_size); + if (ret == 0) { + break; // EOF + } + buffer += ret; + total_bytes += ret; + position += ret; + nbytes -= ret; + } + return total_bytes; + } + + Result> ReadAt(int64_t position, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + ReadAt(position, nbytes, buffer->mutable_data())); + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + buffer->ZeroPadding(); + } + // R build with openSUSE155 requires an explicit shared_ptr construction + return std::shared_ptr(std::move(buffer)); + } + + Result Read(int64_t nbytes, void* buffer) { + RETURN_NOT_OK(CheckClosed()); + + int64_t total_bytes = 0; + while (total_bytes < nbytes) { + tSize ret = driver_->Read( + fs_, file_, reinterpret_cast(buffer) + total_bytes, + static_cast(std::min(buffer_size_, nbytes - total_bytes))); + CHECK_FAILURE(ret, "read"); + total_bytes += ret; + if (ret == 0) { + break; + } + } + return total_bytes; + } + + Result> Read(int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + } + // R build with openSUSE155 requires an explicit shared_ptr construction + return std::shared_ptr(std::move(buffer)); + } + + Result GetSize() { + RETURN_NOT_OK(CheckClosed()); + + hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); + if (entry == nullptr) { + return GetPathInfoFailed(path_); + } + int64_t size = entry->mSize; + driver_->FreeFileInfo(entry, 1); + return size; + } + + void set_memory_pool(MemoryPool* pool) { pool_ = pool; } + + void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } + + private: + MemoryPool* pool_; + int32_t buffer_size_; +}; + +HdfsReadableFile::HdfsReadableFile(const io::IOContext& io_context) { + impl_.reset(new HdfsReadableFileImpl(io_context.pool())); +} + +HdfsReadableFile::~HdfsReadableFile() { + ARROW_WARN_NOT_OK(impl_->Close(), "Failed to close HdfsReadableFile"); +} + +Status HdfsReadableFile::Close() { return impl_->Close(); } + +bool HdfsReadableFile::closed() const { return impl_->closed(); } + +Result HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, void* buffer) { + return impl_->ReadAt(position, nbytes, reinterpret_cast(buffer)); +} + +Result> HdfsReadableFile::ReadAt(int64_t position, + int64_t nbytes) { + return impl_->ReadAt(position, nbytes); +} + +Result HdfsReadableFile::Read(int64_t nbytes, void* buffer) { + return impl_->Read(nbytes, buffer); +} + +Result> HdfsReadableFile::Read(int64_t nbytes) { + return impl_->Read(nbytes); +} + +Result HdfsReadableFile::GetSize() { return impl_->GetSize(); } + +Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); } + +Result HdfsReadableFile::Tell() const { return impl_->Tell(); } + +Result> HdfsReadableFile::Make( + const std::string& path, int32_t buffer_size, const io::IOContext& io_context, + internal::LibHdfsShim* driver, hdfsFS fs, hdfsFile handle) { + // Must use `new` due to private constructor + std::shared_ptr file(new HdfsReadableFile(io_context)); + file->impl_->set_members(path, driver, fs, handle); + file->impl_->set_buffer_size(buffer_size); + return file; +} + +// ---------------------------------------------------------------------- +// File writing + +// Private implementation for writable-only files +class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { + public: + HdfsOutputStreamImpl() {} + + Status Close() { + if (is_open_) { + // is_open_ must be set to false in the beginning, because the destructor + // attempts to close the stream again, and if the first close fails, then + // the error doesn't get propagated properly and the second close + // initiated by the destructor raises a segfault + is_open_ = false; + RETURN_NOT_OK(FlushInternal()); + int ret = driver_->CloseFile(fs_, file_); + CHECK_FAILURE(ret, "CloseFile"); + } + return Status::OK(); + } + + bool closed() const { return !is_open_; } + + Status Flush() { + RETURN_NOT_OK(CheckClosed()); + + return FlushInternal(); + } + + Status Write(const uint8_t* buffer, int64_t nbytes) { + RETURN_NOT_OK(CheckClosed()); + + constexpr int64_t kMaxBlockSize = std::numeric_limits::max(); + + std::lock_guard guard(lock_); + while (nbytes > 0) { + const auto block_size = static_cast(std::min(kMaxBlockSize, nbytes)); + tSize ret = driver_->Write(fs_, file_, buffer, block_size); + CHECK_FAILURE(ret, "Write"); + DCHECK_LE(ret, block_size); + buffer += ret; + nbytes -= ret; + } + return Status::OK(); + } + + protected: + Status FlushInternal() { + int ret = driver_->Flush(fs_, file_); + CHECK_FAILURE(ret, "Flush"); + return Status::OK(); + } +}; + +HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); } + +HdfsOutputStream::~HdfsOutputStream() { + ARROW_WARN_NOT_OK(impl_->Close(), "Failed to close HdfsOutputStream"); +} + +Status HdfsOutputStream::Close() { return impl_->Close(); } + +bool HdfsOutputStream::closed() const { return impl_->closed(); } + +Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) { + return impl_->Write(reinterpret_cast(buffer), nbytes); +} + +Status HdfsOutputStream::Flush() { return impl_->Flush(); } + +Result HdfsOutputStream::Tell() const { return impl_->Tell(); } + +Result> HdfsOutputStream::Make(const std::string& path, + int32_t buffer_size, + LibHdfsShim* driver, + hdfsFS fs, + hdfsFile handle) { + std::shared_ptr file(new HdfsOutputStream()); + file->impl_->set_members(path, driver, fs, handle); + return file; +} + +} // namespace fs::internal } // namespace arrow diff --git a/cpp/src/arrow/io/hdfs_internal.h b/cpp/src/arrow/filesystem/hdfs_internal.h similarity index 72% rename from cpp/src/arrow/io/hdfs_internal.h rename to cpp/src/arrow/filesystem/hdfs_internal.h index 4b6b4884c00..93178402f05 100644 --- a/cpp/src/arrow/io/hdfs_internal.h +++ b/cpp/src/arrow/filesystem/hdfs_internal.h @@ -22,6 +22,11 @@ #include +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/type_fwd.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/io_util.h" #include "arrow/util/visibility.h" #include "arrow/util/windows_compatibility.h" // IWYU pragma: keep @@ -33,7 +38,12 @@ namespace arrow { class Status; -namespace io::internal { +using internal::IOErrorFromErrno; + +namespace fs::internal { + +class HdfsReadableFile; +class HdfsOutputStream; // NOTE(wesm): cpplint does not like use of short and other imprecise C types struct LibHdfsShim { @@ -210,5 +220,92 @@ struct LibHdfsShim { // TODO(wesm): Remove these exports when we are linking statically ARROW_EXPORT Status ConnectLibHdfs(LibHdfsShim** driver); -} // namespace io::internal +struct HdfsPathInfo { + arrow::fs::FileType kind; + + std::string name; + std::string owner; + std::string group; + + // Access times in UNIX timestamps (seconds) + int64_t size; + int64_t block_size; + + int32_t last_modified_time; + int32_t last_access_time; + + int16_t replication; + int16_t permissions; +}; + +class ARROW_EXPORT HdfsReadableFile : public io::RandomAccessFile { + public: + ~HdfsReadableFile() override; + + static Result> Make(const std::string& path, + int32_t buffer_size, + const io::IOContext& io_context, + LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle); + + Status Close() override; + + bool closed() const override; + + // NOTE: If you wish to read a particular range of a file in a multithreaded + // context, you may prefer to use ReadAt to avoid locking issues + Result Read(int64_t nbytes, void* out) override; + Result> Read(int64_t nbytes) override; + Result ReadAt(int64_t position, int64_t nbytes, void* out) override; + Result> ReadAt(int64_t position, int64_t nbytes) override; + + Status Seek(int64_t position) override; + Result Tell() const override; + Result GetSize() override; + + private: + explicit HdfsReadableFile(const io::IOContext&); + + class ARROW_NO_EXPORT HdfsReadableFileImpl; + std::unique_ptr impl_; + + friend class arrow::fs::HadoopFileSystem; + + ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); +}; + +// Naming this file OutputStream because it does not support seeking (like the +// WritableFile interface) +class ARROW_EXPORT HdfsOutputStream : public io::OutputStream { + public: + ~HdfsOutputStream() override; + + static Result> Make(const std::string& path, + int32_t buffer_size, + LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle); + + Status Close() override; + + bool closed() const override; + + using OutputStream::Write; + Status Write(const void* buffer, int64_t nbytes) override; + + Status Flush() override; + + Result Tell() const override; + + private: + class ARROW_NO_EXPORT HdfsOutputStreamImpl; + std::unique_ptr impl_; + + friend class arrow::fs::HadoopFileSystem; + + HdfsOutputStream(); + + ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); +}; + +} // namespace fs::internal } // namespace arrow diff --git a/cpp/src/arrow/io/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_internal_test.cc similarity index 84% rename from cpp/src/arrow/io/hdfs_test.cc rename to cpp/src/arrow/filesystem/hdfs_internal_test.cc index 4f0a280b768..e6f102da705 100644 --- a/cpp/src/arrow/io/hdfs_test.cc +++ b/cpp/src/arrow/filesystem/hdfs_internal_test.cc @@ -30,15 +30,22 @@ #include #include "arrow/buffer.h" -#include "arrow/io/hdfs.h" -#include "arrow/io/hdfs_internal.h" +#include "arrow/filesystem/hdfs_internal.h" +#include "arrow/filesystem/type_fwd.h" #include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" namespace arrow { -namespace io { + +using arrow::fs::HadoopFileSystem; +using arrow::fs::HdfsOptions; + +namespace fs::internal { + +using arrow::fs::internal::HdfsPathInfo; +using arrow::fs::internal::HdfsReadableFile; std::vector RandomData(int64_t size) { std::vector buffer(size); @@ -58,13 +65,13 @@ class TestHadoopFileSystem : public ::testing::Test { Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, bool append = false, int buffer_size = 0, int16_t replication = 0, int default_block_size = 0) { - std::shared_ptr file; - RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication, - default_block_size, &file)); - - RETURN_NOT_OK(file->Write(buffer, size)); - RETURN_NOT_OK(file->Close()); + { + std::shared_ptr file; + RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication, + default_block_size, &file)); + RETURN_NOT_OK(file->Write(buffer, size)); + } return Status::OK(); } @@ -82,7 +89,7 @@ class TestHadoopFileSystem : public ::testing::Test { // Set up shared state between unit tests void SetUp() { - internal::LibHdfsShim* driver_shim; + LibHdfsShim* driver_shim; client_ = nullptr; scratch_dir_ = @@ -118,7 +125,10 @@ class TestHadoopFileSystem : public ::testing::Test { conf_.user = user; conf_.port = port == nullptr ? 20500 : atoi(port); - ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_)); + HdfsOptions options; + options.connection_config = conf_; + io::IOContext io_context = io::default_io_context(); + ASSERT_OK_AND_ASSIGN(client_, HadoopFileSystem::Make(options, io_context)); } void TearDown() { @@ -126,11 +136,10 @@ class TestHadoopFileSystem : public ::testing::Test { if (client_->Exists(scratch_dir_)) { ARROW_EXPECT_OK(client_->Delete(scratch_dir_, true)); } - ARROW_EXPECT_OK(client_->Disconnect()); } } - HdfsConnectionConfig conf_; + arrow::fs::HdfsConnectionConfig conf_; bool loaded_driver_; // Resources shared amongst unit tests @@ -147,8 +156,13 @@ TEST_F(TestHadoopFileSystem, ConnectsAgain) { SKIP_IF_NO_DRIVER(); std::shared_ptr client; - ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client)); - ASSERT_OK(client->Disconnect()); + + { + HdfsOptions options; + options.connection_config = this->conf_; + io::IOContext io_context = io::default_io_context(); + ASSERT_OK_AND_ASSIGN(client_, HadoopFileSystem::Make(options, io_context)); + } } TEST_F(TestHadoopFileSystem, MultipleClients) { @@ -158,14 +172,18 @@ TEST_F(TestHadoopFileSystem, MultipleClients) { std::shared_ptr client1; std::shared_ptr client2; - ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1)); - ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2)); - ASSERT_OK(client1->Disconnect()); - // client2 continues to function after equivalent client1 has shutdown - std::vector listing; - ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing)); - ASSERT_OK(client2->Disconnect()); + io::IOContext io_context = io::default_io_context(); + HdfsOptions options; + options.connection_config = this->conf_; + { ASSERT_OK_AND_ASSIGN(client1, HadoopFileSystem::Make(options, io_context)); } + { + ASSERT_OK_AND_ASSIGN(client2, HadoopFileSystem::Make(options, io_context)); + + // client2 continues to function after equivalent client1 has shutdown + std::vector listing; + ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing)); + } } TEST_F(TestHadoopFileSystem, MakeDirectory) { @@ -200,7 +218,7 @@ TEST_F(TestHadoopFileSystem, GetCapacityUsed) { ASSERT_LT(0, nbytes); } -TEST_F(TestHadoopFileSystem, GetPathInfo) { +TEST_F(TestHadoopFileSystem, GetPathInfoStatus) { SKIP_IF_NO_DRIVER(); HdfsPathInfo info; @@ -208,8 +226,8 @@ TEST_F(TestHadoopFileSystem, GetPathInfo) { ASSERT_OK(this->MakeScratchDir()); // Directory info - ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info)); - ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + ASSERT_OK(this->client_->GetPathInfoStatus(this->scratch_dir_, &info)); + ASSERT_EQ(arrow::fs::FileType::Directory, info.kind); ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name); ASSERT_EQ(this->conf_.user, info.owner); @@ -222,15 +240,15 @@ TEST_F(TestHadoopFileSystem, GetPathInfo) { std::vector buffer = RandomData(size); ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size)); - ASSERT_OK(this->client_->GetPathInfo(path, &info)); + ASSERT_OK(this->client_->GetPathInfoStatus(path, &info)); - ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(arrow::fs::FileType::File, info.kind); ASSERT_EQ(this->HdfsAbsPath(path), info.name); ASSERT_EQ(this->conf_.user, info.owner); ASSERT_EQ(size, info.size); } -TEST_F(TestHadoopFileSystem, GetPathInfoNotExist) { +TEST_F(TestHadoopFileSystem, GetPathInfoStatusNotExist) { // ARROW-2919: Test that the error message is reasonable SKIP_IF_NO_DRIVER(); @@ -238,7 +256,7 @@ TEST_F(TestHadoopFileSystem, GetPathInfoNotExist) { auto path = this->ScratchPath("path-does-not-exist"); HdfsPathInfo info; - Status s = this->client_->GetPathInfo(path, &info); + Status s = this->client_->GetPathInfoStatus(path, &info); ASSERT_TRUE(s.IsIOError()); const std::string error_message = s.ToString(); @@ -262,7 +280,7 @@ TEST_F(TestHadoopFileSystem, AppendToFile) { ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true)); HdfsPathInfo info; - ASSERT_OK(this->client_->GetPathInfo(path, &info)); + ASSERT_OK(this->client_->GetPathInfoStatus(path, &info)); ASSERT_EQ(size * 2, info.size); } @@ -293,13 +311,13 @@ TEST_F(TestHadoopFileSystem, ListDirectory) { for (size_t i = 0; i < listing.size(); ++i) { const HdfsPathInfo& info = listing[i]; if (info.name == this->HdfsAbsPath(p1)) { - ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(arrow::fs::FileType::File, info.kind); ASSERT_EQ(size, info.size); } else if (info.name == this->HdfsAbsPath(p2)) { - ASSERT_EQ(ObjectType::FILE, info.kind); + ASSERT_EQ(arrow::fs::FileType::File, info.kind); ASSERT_EQ(size / 2, info.size); } else if (info.name == this->HdfsAbsPath(d1)) { - ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + ASSERT_EQ(arrow::fs::FileType::Directory, info.kind); } else { FAIL() << "Unexpected path: " << info.name; } @@ -405,13 +423,13 @@ TEST_F(TestHadoopFileSystem, ChmodChown) { HdfsPathInfo info; ASSERT_OK(this->client_->Chmod(path, mode)); - ASSERT_OK(this->client_->GetPathInfo(path, &info)); + ASSERT_OK(this->client_->GetPathInfoStatus(path, &info)); ASSERT_EQ(mode, info.permissions); std::string owner = "hadoop"; std::string group = "hadoop"; ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str())); - ASSERT_OK(this->client_->GetPathInfo(path, &info)); + ASSERT_OK(this->client_->GetPathInfoStatus(path, &info)); ASSERT_EQ("hadoop", info.owner); ASSERT_EQ("hadoop", info.group); } @@ -462,5 +480,5 @@ TEST_F(TestHadoopFileSystem, ThreadSafety) { ASSERT_EQ(niter * 4, correct_count); } -} // namespace io +} // namespace fs::internal } // namespace arrow diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index 0398422b053..647f54c9b7f 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -21,16 +21,6 @@ add_arrow_test(buffered_test PREFIX "arrow-io") add_arrow_test(compressed_test PREFIX "arrow-io") add_arrow_test(file_test PREFIX "arrow-io") - -if(ARROW_HDFS) - add_arrow_test(hdfs_test - NO_VALGRIND - PREFIX - "arrow-io" - EXTRA_LINK_LIBS - arrow::hadoop) -endif() - add_arrow_test(memory_test PREFIX "arrow-io") add_arrow_benchmark(file_benchmark PREFIX "arrow-io") diff --git a/cpp/src/arrow/io/api.h b/cpp/src/arrow/io/api.h index d55b2c2d55a..3bfde6de452 100644 --- a/cpp/src/arrow/io/api.h +++ b/cpp/src/arrow/io/api.h @@ -20,6 +20,5 @@ #include "arrow/io/buffered.h" #include "arrow/io/compressed.h" #include "arrow/io/file.h" -#include "arrow/io/hdfs.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc deleted file mode 100644 index c092a1ff7bc..00000000000 --- a/cpp/src/arrow/io/hdfs.cc +++ /dev/null @@ -1,718 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "arrow/buffer.h" -#include "arrow/io/hdfs.h" -#include "arrow/io/hdfs_internal.h" -#include "arrow/io/interfaces.h" -#include "arrow/memory_pool.h" -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/util/io_util.h" -#include "arrow/util/logging_internal.h" - -using std::size_t; - -namespace arrow { - -using internal::IOErrorFromErrno; - -namespace io { - -#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ - do { \ - if (RETURN_VALUE == -1) { \ - return IOErrorFromErrno(errno, "HDFS ", WHAT, " failed"); \ - } \ - } while (0) - -static constexpr int kDefaultHdfsBufferSize = 1 << 16; - -// ---------------------------------------------------------------------- -// File reading - -class HdfsAnyFileImpl { - public: - void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs, - hdfsFile handle) { - path_ = path; - driver_ = driver; - fs_ = fs; - file_ = handle; - is_open_ = true; - } - - Status Seek(int64_t position) { - RETURN_NOT_OK(CheckClosed()); - int ret = driver_->Seek(fs_, file_, position); - CHECK_FAILURE(ret, "seek"); - return Status::OK(); - } - - Result Tell() { - RETURN_NOT_OK(CheckClosed()); - int64_t ret = driver_->Tell(fs_, file_); - CHECK_FAILURE(ret, "tell"); - return ret; - } - - bool is_open() const { return is_open_; } - - protected: - Status CheckClosed() { - if (!is_open_) { - return Status::Invalid("Operation on closed HDFS file"); - } - return Status::OK(); - } - - std::string path_; - - internal::LibHdfsShim* driver_; - - // For threadsafety - std::mutex lock_; - - // These are pointers in libhdfs, so OK to copy - hdfsFS fs_; - hdfsFile file_; - - bool is_open_; -}; - -namespace { - -Status GetPathInfoFailed(const std::string& path) { - return IOErrorFromErrno(errno, "Calling GetPathInfo for '", path, "' failed"); -} - -} // namespace - -// Private implementation for read-only files -class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { - public: - explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} - - Status Close() { - if (is_open_) { - // is_open_ must be set to false in the beginning, because the destructor - // attempts to close the stream again, and if the first close fails, then - // the error doesn't get propagated properly and the second close - // initiated by the destructor raises a segfault - is_open_ = false; - int ret = driver_->CloseFile(fs_, file_); - CHECK_FAILURE(ret, "CloseFile"); - } - return Status::OK(); - } - - bool closed() const { return !is_open_; } - - Result ReadAt(int64_t position, int64_t nbytes, uint8_t* buffer) { - RETURN_NOT_OK(CheckClosed()); - if (!driver_->HasPread()) { - std::lock_guard guard(lock_); - RETURN_NOT_OK(Seek(position)); - return Read(nbytes, buffer); - } - - constexpr int64_t kMaxBlockSize = std::numeric_limits::max(); - int64_t total_bytes = 0; - while (nbytes > 0) { - const auto block_size = static_cast(std::min(kMaxBlockSize, nbytes)); - tSize ret = - driver_->Pread(fs_, file_, static_cast(position), buffer, block_size); - CHECK_FAILURE(ret, "read"); - DCHECK_LE(ret, block_size); - if (ret == 0) { - break; // EOF - } - buffer += ret; - total_bytes += ret; - position += ret; - nbytes -= ret; - } - return total_bytes; - } - - Result> ReadAt(int64_t position, int64_t nbytes) { - RETURN_NOT_OK(CheckClosed()); - - ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); - ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, - ReadAt(position, nbytes, buffer->mutable_data())); - if (bytes_read < nbytes) { - RETURN_NOT_OK(buffer->Resize(bytes_read)); - buffer->ZeroPadding(); - } - // R build with openSUSE155 requires an explicit shared_ptr construction - return std::shared_ptr(std::move(buffer)); - } - - Result Read(int64_t nbytes, void* buffer) { - RETURN_NOT_OK(CheckClosed()); - - int64_t total_bytes = 0; - while (total_bytes < nbytes) { - tSize ret = driver_->Read( - fs_, file_, reinterpret_cast(buffer) + total_bytes, - static_cast(std::min(buffer_size_, nbytes - total_bytes))); - CHECK_FAILURE(ret, "read"); - total_bytes += ret; - if (ret == 0) { - break; - } - } - return total_bytes; - } - - Result> Read(int64_t nbytes) { - RETURN_NOT_OK(CheckClosed()); - - ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); - ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); - if (bytes_read < nbytes) { - RETURN_NOT_OK(buffer->Resize(bytes_read)); - } - // R build with openSUSE155 requires an explicit shared_ptr construction - return std::shared_ptr(std::move(buffer)); - } - - Result GetSize() { - RETURN_NOT_OK(CheckClosed()); - - hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); - if (entry == nullptr) { - return GetPathInfoFailed(path_); - } - int64_t size = entry->mSize; - driver_->FreeFileInfo(entry, 1); - return size; - } - - void set_memory_pool(MemoryPool* pool) { pool_ = pool; } - - void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } - - private: - MemoryPool* pool_; - int32_t buffer_size_; -}; - -HdfsReadableFile::HdfsReadableFile(const io::IOContext& io_context) { - impl_.reset(new HdfsReadableFileImpl(io_context.pool())); -} - -HdfsReadableFile::~HdfsReadableFile() { - ARROW_WARN_NOT_OK(impl_->Close(), "Failed to close HdfsReadableFile"); -} - -Status HdfsReadableFile::Close() { return impl_->Close(); } - -bool HdfsReadableFile::closed() const { return impl_->closed(); } - -Result HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, void* buffer) { - return impl_->ReadAt(position, nbytes, reinterpret_cast(buffer)); -} - -Result> HdfsReadableFile::ReadAt(int64_t position, - int64_t nbytes) { - return impl_->ReadAt(position, nbytes); -} - -Result HdfsReadableFile::Read(int64_t nbytes, void* buffer) { - return impl_->Read(nbytes, buffer); -} - -Result> HdfsReadableFile::Read(int64_t nbytes) { - return impl_->Read(nbytes); -} - -Result HdfsReadableFile::GetSize() { return impl_->GetSize(); } - -Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); } - -Result HdfsReadableFile::Tell() const { return impl_->Tell(); } - -// ---------------------------------------------------------------------- -// File writing - -// Private implementation for writable-only files -class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { - public: - HdfsOutputStreamImpl() {} - - Status Close() { - if (is_open_) { - // is_open_ must be set to false in the beginning, because the destructor - // attempts to close the stream again, and if the first close fails, then - // the error doesn't get propagated properly and the second close - // initiated by the destructor raises a segfault - is_open_ = false; - RETURN_NOT_OK(FlushInternal()); - int ret = driver_->CloseFile(fs_, file_); - CHECK_FAILURE(ret, "CloseFile"); - } - return Status::OK(); - } - - bool closed() const { return !is_open_; } - - Status Flush() { - RETURN_NOT_OK(CheckClosed()); - - return FlushInternal(); - } - - Status Write(const uint8_t* buffer, int64_t nbytes) { - RETURN_NOT_OK(CheckClosed()); - - constexpr int64_t kMaxBlockSize = std::numeric_limits::max(); - - std::lock_guard guard(lock_); - while (nbytes > 0) { - const auto block_size = static_cast(std::min(kMaxBlockSize, nbytes)); - tSize ret = driver_->Write(fs_, file_, buffer, block_size); - CHECK_FAILURE(ret, "Write"); - DCHECK_LE(ret, block_size); - buffer += ret; - nbytes -= ret; - } - return Status::OK(); - } - - protected: - Status FlushInternal() { - int ret = driver_->Flush(fs_, file_); - CHECK_FAILURE(ret, "Flush"); - return Status::OK(); - } -}; - -HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); } - -HdfsOutputStream::~HdfsOutputStream() { - ARROW_WARN_NOT_OK(impl_->Close(), "Failed to close HdfsOutputStream"); -} - -Status HdfsOutputStream::Close() { return impl_->Close(); } - -bool HdfsOutputStream::closed() const { return impl_->closed(); } - -Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) { - return impl_->Write(reinterpret_cast(buffer), nbytes); -} - -Status HdfsOutputStream::Flush() { return impl_->Flush(); } - -Result HdfsOutputStream::Tell() const { return impl_->Tell(); } - -// ---------------------------------------------------------------------- -// HDFS client - -// TODO(wesm): this could throw std::bad_alloc in the course of copying strings -// into the path info object -static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { - out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY; - out->name = std::string(input->mName); - out->owner = std::string(input->mOwner); - out->group = std::string(input->mGroup); - - out->last_access_time = static_cast(input->mLastAccess); - out->last_modified_time = static_cast(input->mLastMod); - out->size = static_cast(input->mSize); - - out->replication = input->mReplication; - out->block_size = input->mBlockSize; - - out->permissions = input->mPermissions; -} - -// Private implementation -class HadoopFileSystem::HadoopFileSystemImpl { - public: - HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {} - - Status Connect(const HdfsConnectionConfig* config) { - RETURN_NOT_OK(ConnectLibHdfs(&driver_)); - - // connect to HDFS with the builder object - hdfsBuilder* builder = driver_->NewBuilder(); - if (!config->host.empty()) { - driver_->BuilderSetNameNode(builder, config->host.c_str()); - } - driver_->BuilderSetNameNodePort(builder, static_cast(config->port)); - if (!config->user.empty()) { - driver_->BuilderSetUserName(builder, config->user.c_str()); - } - if (!config->kerb_ticket.empty()) { - driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); - } - - for (const auto& kv : config->extra_conf) { - int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); - CHECK_FAILURE(ret, "confsetstr"); - } - - driver_->BuilderSetForceNewInstance(builder); - fs_ = driver_->BuilderConnect(builder); - - if (fs_ == nullptr) { - return Status::IOError("HDFS connection failed"); - } - namenode_host_ = config->host; - port_ = config->port; - user_ = config->user; - kerb_ticket_ = config->kerb_ticket; - - return Status::OK(); - } - - Status MakeDirectory(const std::string& path) { - int ret = driver_->MakeDirectory(fs_, path.c_str()); - CHECK_FAILURE(ret, "create directory"); - return Status::OK(); - } - - Status Delete(const std::string& path, bool recursive) { - int ret = driver_->Delete(fs_, path.c_str(), static_cast(recursive)); - CHECK_FAILURE(ret, "delete"); - return Status::OK(); - } - - Status Disconnect() { - int ret = driver_->Disconnect(fs_); - CHECK_FAILURE(ret, "hdfsFS::Disconnect"); - return Status::OK(); - } - - bool Exists(const std::string& path) { - // hdfsExists does not distinguish between RPC failure and the file not - // existing - int ret = driver_->Exists(fs_, path.c_str()); - return ret == 0; - } - - Status GetCapacity(int64_t* nbytes) { - tOffset ret = driver_->GetCapacity(fs_); - CHECK_FAILURE(ret, "GetCapacity"); - *nbytes = ret; - return Status::OK(); - } - - Status GetUsed(int64_t* nbytes) { - tOffset ret = driver_->GetUsed(fs_); - CHECK_FAILURE(ret, "GetUsed"); - *nbytes = ret; - return Status::OK(); - } - - Status GetWorkingDirectory(std::string* out) { - char buffer[2048]; - if (driver_->GetWorkingDirectory(fs_, buffer, sizeof(buffer) - 1) == nullptr) { - return IOErrorFromErrno(errno, "HDFS GetWorkingDirectory failed"); - } - *out = buffer; - return Status::OK(); - } - - Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { - hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); - - if (entry == nullptr) { - return GetPathInfoFailed(path); - } - - SetPathInfo(entry, info); - driver_->FreeFileInfo(entry, 1); - - return Status::OK(); - } - - Status Stat(const std::string& path, FileStatistics* stat) { - HdfsPathInfo info; - RETURN_NOT_OK(GetPathInfo(path, &info)); - - stat->size = info.size; - stat->kind = info.kind; - return Status::OK(); - } - - Status GetChildren(const std::string& path, std::vector* listing) { - std::vector detailed_listing; - RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); - for (const auto& info : detailed_listing) { - listing->push_back(info.name); - } - return Status::OK(); - } - - Status ListDirectory(const std::string& path, std::vector* listing) { - int num_entries = 0; - errno = 0; - hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); - - if (entries == nullptr) { - // If the directory is empty, entries is NULL but errno is 0. Non-zero - // errno indicates error - // - // Note: errno is thread-local - // - // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set - // errno 2/ENOENT for empty directories. To be more robust to this we - // double check this case - if ((errno == 0) || (errno == ENOENT && Exists(path))) { - num_entries = 0; - } else { - return IOErrorFromErrno(errno, "HDFS list directory failed"); - } - } - - // Allocate additional space for elements - int vec_offset = static_cast(listing->size()); - listing->resize(vec_offset + num_entries); - - for (int i = 0; i < num_entries; ++i) { - SetPathInfo(entries + i, &(*listing)[vec_offset + i]); - } - - // Free libhdfs file info - driver_->FreeFileInfo(entries, num_entries); - - return Status::OK(); - } - - Status OpenReadable(const std::string& path, int32_t buffer_size, - const io::IOContext& io_context, - std::shared_ptr* file) { - errno = 0; - hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); - - if (handle == nullptr) { - return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed"); - } - - // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsReadableFile(io_context)); - (*file)->impl_->set_members(path, driver_, fs_, handle); - (*file)->impl_->set_buffer_size(buffer_size); - - return Status::OK(); - } - - Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr* file) { - int flags = O_WRONLY; - if (append) flags |= O_APPEND; - - errno = 0; - hdfsFile handle = - driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication, - static_cast(default_block_size)); - - if (handle == nullptr) { - return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed"); - } - - // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsOutputStream()); - (*file)->impl_->set_members(path, driver_, fs_, handle); - - return Status::OK(); - } - - Status Rename(const std::string& src, const std::string& dst) { - int ret = driver_->Rename(fs_, src.c_str(), dst.c_str()); - CHECK_FAILURE(ret, "Rename"); - return Status::OK(); - } - - Status Copy(const std::string& src, const std::string& dst) { - int ret = driver_->Copy(fs_, src.c_str(), fs_, dst.c_str()); - CHECK_FAILURE(ret, "Rename"); - return Status::OK(); - } - - Status Move(const std::string& src, const std::string& dst) { - int ret = driver_->Move(fs_, src.c_str(), fs_, dst.c_str()); - CHECK_FAILURE(ret, "Rename"); - return Status::OK(); - } - - Status Chmod(const std::string& path, int mode) { - int ret = driver_->Chmod(fs_, path.c_str(), static_cast(mode)); // NOLINT - CHECK_FAILURE(ret, "Chmod"); - return Status::OK(); - } - - Status Chown(const std::string& path, const char* owner, const char* group) { - int ret = driver_->Chown(fs_, path.c_str(), owner, group); - CHECK_FAILURE(ret, "Chown"); - return Status::OK(); - } - - private: - internal::LibHdfsShim* driver_; - - std::string namenode_host_; - std::string user_; - int port_; - std::string kerb_ticket_; - - hdfsFS fs_; -}; - -// ---------------------------------------------------------------------- -// Public API for HDFSClient - -HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); } - -HadoopFileSystem::~HadoopFileSystem() {} - -Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config, - std::shared_ptr* fs) { - // ctor is private, make_shared will not work - *fs = std::shared_ptr(new HadoopFileSystem()); - - RETURN_NOT_OK((*fs)->impl_->Connect(config)); - return Status::OK(); -} - -Status HadoopFileSystem::MakeDirectory(const std::string& path) { - return impl_->MakeDirectory(path); -} - -Status HadoopFileSystem::Delete(const std::string& path, bool recursive) { - return impl_->Delete(path, recursive); -} - -Status HadoopFileSystem::DeleteDirectory(const std::string& path) { - return Delete(path, true); -} - -Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); } - -bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } - -Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) { - return impl_->GetPathInfo(path, info); -} - -Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) { - return impl_->Stat(path, stat); -} - -Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { - return impl_->GetCapacity(nbytes); -} - -Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } - -Status HadoopFileSystem::GetWorkingDirectory(std::string* out) { - return impl_->GetWorkingDirectory(out); -} - -Status HadoopFileSystem::GetChildren(const std::string& path, - std::vector* listing) { - return impl_->GetChildren(path, listing); -} - -Status HadoopFileSystem::ListDirectory(const std::string& path, - std::vector* listing) { - return impl_->ListDirectory(path, listing); -} - -Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr* file) { - return impl_->OpenReadable(path, buffer_size, io::default_io_context(), file); -} - -Status HadoopFileSystem::OpenReadable(const std::string& path, - std::shared_ptr* file) { - return OpenReadable(path, kDefaultHdfsBufferSize, io::default_io_context(), file); -} - -Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, - const io::IOContext& io_context, - std::shared_ptr* file) { - return impl_->OpenReadable(path, buffer_size, io_context, file); -} - -Status HadoopFileSystem::OpenReadable(const std::string& path, - const io::IOContext& io_context, - std::shared_ptr* file) { - return OpenReadable(path, kDefaultHdfsBufferSize, io_context, file); -} - -Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, - int32_t buffer_size, int16_t replication, - int64_t default_block_size, - std::shared_ptr* file) { - return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size, - file); -} - -Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, - std::shared_ptr* file) { - return OpenWritable(path, append, 0, 0, 0, file); -} - -Status HadoopFileSystem::Chmod(const std::string& path, int mode) { - return impl_->Chmod(path, mode); -} - -Status HadoopFileSystem::Chown(const std::string& path, const char* owner, - const char* group) { - return impl_->Chown(path, owner, group); -} - -Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) { - return impl_->Rename(src, dst); -} - -Status HadoopFileSystem::Copy(const std::string& src, const std::string& dst) { - return impl_->Copy(src, dst); -} - -Status HadoopFileSystem::Move(const std::string& src, const std::string& dst) { - return impl_->Move(src, dst); -} - -// ---------------------------------------------------------------------- -// Allow public API users to check whether we are set up correctly - -Status HaveLibHdfs() { - internal::LibHdfsShim* driver; - return internal::ConnectLibHdfs(&driver); -} - -} // namespace io -} // namespace arrow diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h deleted file mode 100644 index 46038070ae4..00000000000 --- a/cpp/src/arrow/io/hdfs.h +++ /dev/null @@ -1,284 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "arrow/io/interfaces.h" -#include "arrow/util/macros.h" -#include "arrow/util/visibility.h" - -namespace arrow { - -class Buffer; -class MemoryPool; -class Status; - -namespace io { - -class HdfsReadableFile; -class HdfsOutputStream; - -/// DEPRECATED. Use the FileSystem API in arrow::fs instead. -struct ObjectType { - enum type { FILE, DIRECTORY }; -}; - -/// DEPRECATED. Use the FileSystem API in arrow::fs instead. -struct ARROW_EXPORT FileStatistics { - /// Size of file, -1 if finding length is unsupported - int64_t size; - ObjectType::type kind; -}; - -class ARROW_EXPORT FileSystem { - public: - virtual ~FileSystem() = default; - - virtual Status MakeDirectory(const std::string& path) = 0; - - virtual Status DeleteDirectory(const std::string& path) = 0; - - virtual Status GetChildren(const std::string& path, - std::vector* listing) = 0; - - virtual Status Rename(const std::string& src, const std::string& dst) = 0; - - virtual Status Stat(const std::string& path, FileStatistics* stat) = 0; -}; - -struct HdfsPathInfo { - ObjectType::type kind; - - std::string name; - std::string owner; - std::string group; - - // Access times in UNIX timestamps (seconds) - int64_t size; - int64_t block_size; - - int32_t last_modified_time; - int32_t last_access_time; - - int16_t replication; - int16_t permissions; -}; - -struct HdfsConnectionConfig { - std::string host; - int port; - std::string user; - std::string kerb_ticket; - std::unordered_map extra_conf; -}; - -class ARROW_EXPORT HadoopFileSystem : public FileSystem { - public: - ~HadoopFileSystem() override; - - // Connect to an HDFS cluster given a configuration - // - // @param config (in): configuration for connecting - // @param fs (out): the created client - // @returns Status - static Status Connect(const HdfsConnectionConfig* config, - std::shared_ptr* fs); - - // Create directory and all parents - // - // @param path (in): absolute HDFS path - // @returns Status - Status MakeDirectory(const std::string& path) override; - - // Delete file or directory - // @param path absolute path to data - // @param recursive if path is a directory, delete contents as well - // @returns error status on failure - Status Delete(const std::string& path, bool recursive = false); - - Status DeleteDirectory(const std::string& path) override; - - // Disconnect from cluster - // - // @returns Status - Status Disconnect(); - - // @param path (in): absolute HDFS path - // @returns bool, true if the path exists, false if not (or on error) - bool Exists(const std::string& path); - - // @param path (in): absolute HDFS path - // @param info (out) - // @returns Status - Status GetPathInfo(const std::string& path, HdfsPathInfo* info); - - // @param nbytes (out): total capacity of the filesystem - // @returns Status - Status GetCapacity(int64_t* nbytes); - - // @param nbytes (out): total bytes used of the filesystem - // @returns Status - Status GetUsed(int64_t* nbytes); - - Status GetChildren(const std::string& path, std::vector* listing) override; - - /// List directory contents - /// - /// If path is a relative path, returned values will be absolute paths or URIs - /// starting from the current working directory. - Status ListDirectory(const std::string& path, std::vector* listing); - - /// Return the filesystem's current working directory. - /// - /// The working directory is the base path for all relative paths given to - /// other APIs. - /// NOTE: this actually returns a URI. - Status GetWorkingDirectory(std::string* out); - - /// Change - /// - /// @param path file path to change - /// @param owner pass null for no change - /// @param group pass null for no change - Status Chown(const std::string& path, const char* owner, const char* group); - - /// Change path permissions - /// - /// \param path Absolute path in file system - /// \param mode Mode bitset - /// \return Status - Status Chmod(const std::string& path, int mode); - - // Move file or directory from source path to destination path within the - // current filesystem - Status Rename(const std::string& src, const std::string& dst) override; - - Status Copy(const std::string& src, const std::string& dst); - - Status Move(const std::string& src, const std::string& dst); - - Status Stat(const std::string& path, FileStatistics* stat) override; - - // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory - - // Open an HDFS file in READ mode. Returns error - // status if the file is not found. - // - // @param path complete file path - Status OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr* file); - - Status OpenReadable(const std::string& path, int32_t buffer_size, - const io::IOContext& io_context, - std::shared_ptr* file); - - Status OpenReadable(const std::string& path, std::shared_ptr* file); - - Status OpenReadable(const std::string& path, const io::IOContext& io_context, - std::shared_ptr* file); - - // FileMode::WRITE options - // @param path complete file path - // @param buffer_size 0 by default - // @param replication 0 by default - // @param default_block_size 0 by default - Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr* file); - - Status OpenWritable(const std::string& path, bool append, - std::shared_ptr* file); - - private: - friend class HdfsReadableFile; - friend class HdfsOutputStream; - - class ARROW_NO_EXPORT HadoopFileSystemImpl; - std::unique_ptr impl_; - - HadoopFileSystem(); - ARROW_DISALLOW_COPY_AND_ASSIGN(HadoopFileSystem); -}; - -class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { - public: - ~HdfsReadableFile() override; - - Status Close() override; - - bool closed() const override; - - // NOTE: If you wish to read a particular range of a file in a multithreaded - // context, you may prefer to use ReadAt to avoid locking issues - Result Read(int64_t nbytes, void* out) override; - Result> Read(int64_t nbytes) override; - Result ReadAt(int64_t position, int64_t nbytes, void* out) override; - Result> ReadAt(int64_t position, int64_t nbytes) override; - - Status Seek(int64_t position) override; - Result Tell() const override; - Result GetSize() override; - - private: - explicit HdfsReadableFile(const io::IOContext&); - - class ARROW_NO_EXPORT HdfsReadableFileImpl; - std::unique_ptr impl_; - - friend class HadoopFileSystem::HadoopFileSystemImpl; - - ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); -}; - -// Naming this file OutputStream because it does not support seeking (like the -// WritableFile interface) -class ARROW_EXPORT HdfsOutputStream : public OutputStream { - public: - ~HdfsOutputStream() override; - - Status Close() override; - - bool closed() const override; - - using OutputStream::Write; - Status Write(const void* buffer, int64_t nbytes) override; - - Status Flush() override; - - Result Tell() const override; - - private: - class ARROW_NO_EXPORT HdfsOutputStreamImpl; - std::unique_ptr impl_; - - friend class HadoopFileSystem::HadoopFileSystemImpl; - - HdfsOutputStream(); - - ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); -}; - -ARROW_EXPORT Status HaveLibHdfs(); - -} // namespace io -} // namespace arrow diff --git a/cpp/src/arrow/meson.build b/cpp/src/arrow/meson.build index 33a9c6ed40c..19d78d1a841 100644 --- a/cpp/src/arrow/meson.build +++ b/cpp/src/arrow/meson.build @@ -84,8 +84,6 @@ arrow_components = { 'io/caching.cc', 'io/compressed.cc', 'io/file.cc', - 'io/hdfs.cc', - 'io/hdfs_internal.cc', 'io/interfaces.cc', 'io/memory.cc', 'io/slow.cc', diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index da2fe966475..82e20b5e847 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -111,6 +111,17 @@ def _filesystem_is_available(fs): return True +def have_libhdfs(): + """ + Return true if HDFS (HadoopFileSystem) library is set up correctly. + """ + try: + from pyarrow._hdfs import _have_libhdfs # noqa + return _have_libhdfs() + except ImportError: + return False + + def show_info(): """ Print detailed version and platform information, for error reporting @@ -260,8 +271,7 @@ def print_entry(label, value): BufferReader, BufferOutputStream, OSFile, MemoryMappedFile, memory_map, create_memory_map, MockOutputStream, - input_stream, output_stream, - have_libhdfs) + input_stream, output_stream) from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, concat_arrays, concat_tables, TableGroupBy, diff --git a/python/pyarrow/_hdfs.pyx b/python/pyarrow/_hdfs.pyx index 8a9fddee3dd..b03de2fc17e 100644 --- a/python/pyarrow/_hdfs.pyx +++ b/python/pyarrow/_hdfs.pyx @@ -23,9 +23,22 @@ from pyarrow.includes.libarrow_fs cimport * from pyarrow._fs cimport FileSystem from pyarrow.lib import frombytes, tobytes +from pyarrow.lib cimport check_status from pyarrow.util import _stringify_path +def _have_libhdfs(): + """ + Return true if HDFS (HadoopFileSystem) library is set up correctly. + """ + try: + with nogil: + check_status(HaveLibHdfs()) + return True + except Exception: + return False + + cdef class HadoopFileSystem(FileSystem): """ HDFS backed FileSystem implementation diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 39dc3a77d98..c359a4a84c7 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1585,10 +1585,6 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: FileMode_WRITE" arrow::io::FileMode::WRITE" FileMode_READWRITE" arrow::io::FileMode::READWRITE" - cdef enum ObjectType" arrow::io::ObjectType::type": - ObjectType_FILE" arrow::io::ObjectType::FILE" - ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" - cdef cppclass CIOContext" arrow::io::IOContext": CIOContext() CIOContext(CStopToken) @@ -1599,10 +1595,6 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: int GetIOThreadPoolCapacity() CStatus SetIOThreadPoolCapacity(int threads) - cdef cppclass FileStatistics: - int64_t size - ObjectType kind - cdef cppclass FileInterface: CStatus Close() CResult[int64_t] Tell() @@ -1670,9 +1662,6 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: WritableFile): pass - cdef cppclass CIOFileSystem" arrow::io::FileSystem": - CStatus Stat(const c_string& path, FileStatistics* stat) - cdef cppclass FileOutputStream(COutputStream): @staticmethod CResult[shared_ptr[COutputStream]] Open(const c_string& path) @@ -1755,78 +1744,6 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: CTransformInputStreamVTable vtable, object method_arg) - # ---------------------------------------------------------------------- - # HDFS - - CStatus HaveLibHdfs() - CStatus HaveLibHdfs3() - - cdef enum HdfsDriver" arrow::io::HdfsDriver": - HdfsDriver_LIBHDFS" arrow::io::HdfsDriver::LIBHDFS" - HdfsDriver_LIBHDFS3" arrow::io::HdfsDriver::LIBHDFS3" - - cdef cppclass HdfsConnectionConfig: - c_string host - int port - c_string user - c_string kerb_ticket - unordered_map[c_string, c_string] extra_conf - HdfsDriver driver - - cdef cppclass HdfsPathInfo: - ObjectType kind - c_string name - c_string owner - c_string group - int32_t last_modified_time - int32_t last_access_time - int64_t size - int16_t replication - int64_t block_size - int16_t permissions - - cdef cppclass HdfsReadableFile(CRandomAccessFile): - pass - - cdef cppclass HdfsOutputStream(COutputStream): - pass - - cdef cppclass CIOHadoopFileSystem \ - "arrow::io::HadoopFileSystem"(CIOFileSystem): - @staticmethod - CStatus Connect(const HdfsConnectionConfig* config, - shared_ptr[CIOHadoopFileSystem]* client) - - CStatus MakeDirectory(const c_string& path) - - CStatus Delete(const c_string& path, c_bool recursive) - - CStatus Disconnect() - - c_bool Exists(const c_string& path) - - CStatus Chmod(const c_string& path, int mode) - CStatus Chown(const c_string& path, const char* owner, - const char* group) - - CStatus GetCapacity(int64_t* nbytes) - CStatus GetUsed(int64_t* nbytes) - - CStatus ListDirectory(const c_string& path, - vector[HdfsPathInfo]* listing) - - CStatus GetPathInfo(const c_string& path, HdfsPathInfo* info) - - CStatus Rename(const c_string& src, const c_string& dst) - - CStatus OpenReadable(const c_string& path, - shared_ptr[HdfsReadableFile]* handle) - - CStatus OpenWritable(const c_string& path, c_bool append, - int32_t buffer_size, int16_t replication, - int64_t default_block_size, - shared_ptr[HdfsOutputStream]* handle) - cdef cppclass CBufferReader \ " arrow::io::BufferReader"(CRandomAccessFile): CBufferReader(const shared_ptr[CBuffer]& buffer) diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index af01c47c8c7..c75b1acc6b4 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -306,6 +306,14 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: const CIOContext& io_context, int64_t chunk_size, c_bool use_threads) + CStatus HaveLibHdfs() + + cdef cppclass HdfsConnectionConfig: + c_string host + int port + c_string user + c_string kerb_ticket + unordered_map[c_string, c_string] extra_conf # Callbacks for implementing Python filesystems # Use typedef to emulate syntax for std::function diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index fd2d4df42cc..71a980632b9 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -32,7 +32,7 @@ import warnings from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation from queue import Queue, Empty as QueueEmpty -from pyarrow.lib cimport check_status, HaveLibHdfs +from pyarrow.lib cimport check_status from pyarrow.util import _is_path_like, _stringify_path @@ -46,18 +46,6 @@ cdef extern from "Python.h": char *v, Py_ssize_t len) except NULL -def have_libhdfs(): - """ - Return true if HDFS (HadoopFileSystem) library is set up correctly. - """ - try: - with nogil: - check_status(HaveLibHdfs()) - return True - except Exception: - return False - - def io_thread_count(): """ Return the number of threads to use for I/O operations.