diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 660d052fd2d..898914994c3 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -118,7 +118,6 @@ set(ARROW_SRCS io/compressed.cc io/file.cc io/hdfs.cc - io/hdfs_internal.cc io/interfaces.cc io/memory.cc io/readahead.cc @@ -266,6 +265,8 @@ if(ARROW_FILESYSTEM) list(APPEND ARROW_SRCS filesystem/filesystem.cc + filesystem/hdfs.cc + filesystem/hdfs_internal.cc filesystem/localfs.cc filesystem/mockfs.cc filesystem/path_tree.cc diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index efb78055a9a..ba703ae99cf 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -32,3 +32,7 @@ if(ARROW_S3) add_dependencies(arrow-tests arrow-s3fs-narrative-test) endif() endif() + +if(ARROW_HDFS) + add_arrow_test(hdfs_test NO_VALGRIND) +endif() diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc new file mode 100644 index 00000000000..ddc83211352 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -0,0 +1,496 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/hdfs_internal.h" +#include "arrow/filesystem/util_internal.h" +#include "arrow/io/hdfs.h" +#include "arrow/io/interfaces.h" +#include "arrow/memory_pool.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +using std::size_t; + +namespace arrow { +namespace fs { + +using internal::CopyStream; +using internal::HdfsOutputStream; +using internal::HdfsReadableFile; + +namespace { + +std::string TranslateErrno(int error_code) { + std::stringstream ss; + ss << error_code << " (" << strerror(error_code) << ")"; + if (error_code == 255) { + // Unknown error can occur if the host is correct but the port is not + ss << " Please check that you are connecting to the correct HDFS RPC port"; + } + return ss.str(); +} + +} // namespace + +#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ + do { \ + if (RETURN_VALUE == -1) { \ + return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \ + } \ + } while (0) + +static constexpr int kDefaultHdfsBufferSize = 1 << 16; + +// ---------------------------------------------------------------------- +// File reading + +namespace { + +Status GetPathInfoFailed(const std::string& path) { + std::stringstream ss; + ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno); + return Status::IOError(ss.str()); +} + +} // namespace + +// ---------------------------------------------------------------------- +// HDFS client + +// TODO(wesm): this could throw std::bad_alloc in the course of copying strings +// into the path info object +static void SetPathInfo(const hdfsFileInfo* input, io::HdfsPathInfo* out) { + out->kind = + input->mKind == kObjectKindFile ? io::ObjectType::FILE : io::ObjectType::DIRECTORY; + out->name = std::string(input->mName); + out->owner = std::string(input->mOwner); + out->group = std::string(input->mGroup); + + out->last_access_time = static_cast(input->mLastAccess); + out->last_modified_time = static_cast(input->mLastMod); + out->size = static_cast(input->mSize); + + out->replication = input->mReplication; + out->block_size = input->mBlockSize; + + out->permissions = input->mPermissions; +} + +// Private implementation +class HadoopFileSystem::Impl { + public: + Impl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {} + + Status Connect(const HadoopOptions& config) { + if (config.driver == HadoopDriver::LIBHDFS3) { + RETURN_NOT_OK(ConnectLibHdfs3(&driver_)); + } else { + RETURN_NOT_OK(ConnectLibHdfs(&driver_)); + } + + // connect to HDFS with the builder object + hdfsBuilder* builder = driver_->NewBuilder(); + if (!config.host.empty()) { + driver_->BuilderSetNameNode(builder, config.host.c_str()); + } + driver_->BuilderSetNameNodePort(builder, static_cast(config.port)); + if (!config.user.empty()) { + driver_->BuilderSetUserName(builder, config.user.c_str()); + } + if (!config.kerb_ticket.empty()) { + driver_->BuilderSetKerbTicketCachePath(builder, config.kerb_ticket.c_str()); + } + + for (auto& kv : config.extra_conf) { + int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); + CHECK_FAILURE(ret, "confsetstr"); + } + + driver_->BuilderSetForceNewInstance(builder); + fs_ = driver_->BuilderConnect(builder); + + if (fs_ == nullptr) { + return Status::IOError("HDFS connection failed"); + } + namenode_host_ = config.host; + port_ = config.port; + user_ = config.user; + kerb_ticket_ = config.kerb_ticket; + + return Status::OK(); + } + + Status MakeDirectory(const std::string& path) { + int ret = driver_->MakeDirectory(fs_, path.c_str()); + CHECK_FAILURE(ret, "create directory"); + return Status::OK(); + } + + Status Delete(const std::string& path, bool recursive) { + int ret = driver_->Delete(fs_, path.c_str(), static_cast(recursive)); + CHECK_FAILURE(ret, "delete"); + return Status::OK(); + } + + Status Disconnect() { + int ret = driver_->Disconnect(fs_); + CHECK_FAILURE(ret, "hdfsFS::Disconnect"); + return Status::OK(); + } + + bool Exists(const std::string& path) { + // hdfsExists does not distinguish between RPC failure and the file not + // existing + int ret = driver_->Exists(fs_, path.c_str()); + return ret == 0; + } + + Status GetCapacity(int64_t* nbytes) { + tOffset ret = driver_->GetCapacity(fs_); + CHECK_FAILURE(ret, "GetCapacity"); + *nbytes = ret; + return Status::OK(); + } + + Status GetUsed(int64_t* nbytes) { + tOffset ret = driver_->GetUsed(fs_); + CHECK_FAILURE(ret, "GetUsed"); + *nbytes = ret; + return Status::OK(); + } + + Status GetPathInfo(const std::string& path, io::HdfsPathInfo* info) { + hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); + + if (entry == nullptr) { + return GetPathInfoFailed(path); + } + + SetPathInfo(entry, info); + driver_->FreeFileInfo(entry, 1); + + return Status::OK(); + } + + Status GetChildren(const std::string& path, std::vector* listing) { + std::vector detailed_listing; + RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); + for (const auto& info : detailed_listing) { + listing->push_back(info.name); + } + return Status::OK(); + } + + Status ListDirectory(const std::string& path, std::vector* listing) { + int num_entries = 0; + errno = 0; + hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); + + if (entries == nullptr) { + // If the directory is empty, entries is NULL but errno is 0. Non-zero + // errno indicates error + // + // Note: errno is thread-local + // + // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set + // errno 2/ENOENT for empty directories. To be more robust to this we + // double check this case + if ((errno == 0) || (errno == ENOENT && Exists(path))) { + num_entries = 0; + } else { + return Status::IOError("HDFS list directory failed, errno: ", + TranslateErrno(errno)); + } + } + + // Allocate additional space for elements + int vec_offset = static_cast(listing->size()); + listing->resize(vec_offset + num_entries); + + for (int i = 0; i < num_entries; ++i) { + SetPathInfo(entries + i, &(*listing)[vec_offset + i]); + } + + // Free libhdfs file info + driver_->FreeFileInfo(entries, num_entries); + + return Status::OK(); + } + + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file) { + hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); + + if (handle == nullptr) { + const char* msg = !Exists(path) ? "HDFS file does not exist: " + : "HDFS path exists, but opening file failed: "; + return Status::IOError(msg, path); + } + + *file = std::make_shared(path, driver_, fs_, handle, buffer_size); + return Status::OK(); + } + + Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, + int16_t replication, int64_t default_block_size, + std::shared_ptr* file) { + int flags = O_WRONLY; + if (append) flags |= O_APPEND; + + hdfsFile handle = + driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication, + static_cast(default_block_size)); + + if (handle == nullptr) { + return Status::IOError("Unable to open file ", path); + } + + *file = std::make_shared(path, driver_, fs_, handle); + return Status::OK(); + } + + Status Rename(const std::string& src, const std::string& dst) { + int ret = driver_->Rename(fs_, src.c_str(), dst.c_str()); + CHECK_FAILURE(ret, "Rename"); + return Status::OK(); + } + + Status Chmod(const std::string& path, int mode) { + int ret = driver_->Chmod(fs_, path.c_str(), static_cast(mode)); // NOLINT + CHECK_FAILURE(ret, "Chmod"); + return Status::OK(); + } + + Status Chown(const std::string& path, const char* owner, const char* group) { + int ret = driver_->Chown(fs_, path.c_str(), owner, group); + CHECK_FAILURE(ret, "Chown"); + return Status::OK(); + } + + private: + internal::LibHdfsShim* driver_; + + std::string namenode_host_; + std::string user_; + int port_; + std::string kerb_ticket_; + + hdfsFS fs_; +}; + +// ---------------------------------------------------------------------- +// Public API for HDFSClient + +HadoopFileSystem::HadoopFileSystem() { impl_.reset(new Impl()); } + +Status HadoopFileSystem::Connect(const HadoopOptions& options, + std::shared_ptr* fs) { + // ctor is private, make_shared will not work + *fs = std::shared_ptr(new HadoopFileSystem()); + + RETURN_NOT_OK((*fs)->impl_->Connect(options)); + return Status::OK(); +} + +Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); } + +Status HadoopFileSystem::GetTargetStats(const std::string& path, FileStats* out) { + out->set_path(path); + if (!impl_->Exists(path)) { + out->set_type(FileType::NonExistent); + return Status::OK(); + } + io::HdfsPathInfo info; + RETURN_NOT_OK(impl_->GetPathInfo(path, &info)); + out->set_size(info.size); + out->set_type(info.kind == io::ObjectType::FILE ? FileType::File : FileType::Directory); + return Status::OK(); +} + +inline Selector SetBaseDir(const Selector& selector, std::string d) { + Selector out; + out.base_dir = std::move(d); + out.allow_non_existent = selector.allow_non_existent; + out.recursive = selector.recursive; + out.max_recursion = selector.max_recursion; + return out; +} + +Status HadoopFileSystem::GetTargetStats(const Selector& selector, + std::vector* out) { + std::vector paths; + RETURN_NOT_OK(impl_->GetChildren(selector.base_dir, &paths)); + std::vector infos; + RETURN_NOT_OK(impl_->ListDirectory(selector.base_dir, &infos)); + out->reserve(out->size() + infos.size()); + for (size_t i = 0; i < infos.size(); ++i) { + out->emplace_back(); + out->back().set_path(paths[i]); + out->back().set_size(infos[i].size); + out->back().set_mtime(TimePoint(std::chrono::duration_cast( + std::chrono::seconds(infos[i].last_modified_time)))); + + if (infos[i].kind == io::ObjectType::FILE) { + out->back().set_type(FileType::File); + continue; + } + + out->back().set_type(FileType::Directory); + if (selector.recursive) { + Selector recursive = SetBaseDir(selector, std::move(paths[i])); + RETURN_NOT_OK(GetTargetStats(recursive, out)); + } + } + return Status::OK(); +} + +Status HadoopFileSystem::CreateDir(const std::string& path, bool recursive) { + // XXX hdfsCreateDirectory always creates non existent parents. + // Do we need to emit an error if !recursive and non existent parents will be created? + return impl_->MakeDirectory(path); +} + +Status HadoopFileSystem::DeleteDir(const std::string& path) { + return impl_->Delete(path, true); +} + +Status HadoopFileSystem::DeleteDirContents(const std::string& path) { + std::vector detailed_listing; + RETURN_NOT_OK(impl_->ListDirectory(path, &detailed_listing)); + for (const auto& info : detailed_listing) { + RETURN_NOT_OK(impl_->Delete(info.name, true)); + } + return Status::OK(); +} + +Status HadoopFileSystem::DeleteFile(const std::string& path) { + return impl_->Delete(path, false); +} + +Status HadoopFileSystem::Move(const std::string& src, const std::string& dst) { + return impl_->Rename(src, dst); +} + +Status HadoopFileSystem::CopyFile(const std::string& src, const std::string& dst) { + std::shared_ptr src_file; + std::shared_ptr dst_file; + RETURN_NOT_OK(OpenInputStream(src, &src_file)); + RETURN_NOT_OK(OpenOutputStream(dst, &dst_file)); + return CopyStream(src_file, dst_file, kDefaultHdfsBufferSize); +} + +Status HadoopFileSystem::OpenInputStream(const std::string& path, + std::shared_ptr* file) { + std::shared_ptr random_file; + RETURN_NOT_OK(impl_->OpenReadable(path, kDefaultHdfsBufferSize, &random_file)); + *file = std::move(random_file); + return Status::OK(); +} + +Status HadoopFileSystem::OpenInputFile(const std::string& path, + std::shared_ptr* file) { + return impl_->OpenReadable(path, kDefaultHdfsBufferSize, file); +} + +Status HadoopFileSystem::OpenOutputStream(const std::string& path, + std::shared_ptr* file) { + return OpenWritable(path, false, 0, 0, 0, file); +} + +Status HadoopFileSystem::OpenAppendStream(const std::string& path, + std::shared_ptr* file) { + return OpenWritable(path, true, 0, 0, 0, file); +} + +bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } + +Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { + return impl_->GetCapacity(nbytes); +} + +Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } + +Status HadoopFileSystem::OpenReadable(const std::string& path, + std::shared_ptr* file) { + return impl_->OpenReadable(path, kDefaultHdfsBufferSize, file); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file) { + return impl_->OpenReadable(path, buffer_size, file); +} + +Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, + int32_t buffer_size, int16_t replication, + int64_t default_block_size, + std::shared_ptr* file) { + return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size, + file); +} + +Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, + std::shared_ptr* file) { + return OpenWritable(path, append, 0, 0, 0, file); +} + +Status HadoopFileSystem::Chmod(const std::string& path, int mode) { + return impl_->Chmod(path, mode); +} + +Status HadoopFileSystem::Chown(const std::string& path, const char* owner, + const char* group) { + return impl_->Chown(path, owner, group); +} + +Status HadoopFileSystem::GetPathInfo(const std::string& path, HadoopPathInfo* info) { + return impl_->GetPathInfo(path, info); +} + +constexpr ObjectType::type ObjectType::FILE; +constexpr ObjectType::type ObjectType::DIRECTORY; + +// ---------------------------------------------------------------------- +// Allow public API users to check whether we are set up correctly + +Status HaveLibHdfs() { + internal::LibHdfsShim* driver; + return internal::ConnectLibHdfs(&driver); +} + +Status HaveLibHdfs3() { + internal::LibHdfsShim* driver; + return internal::ConnectLibHdfs3(&driver); +} + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h new file mode 100644 index 00000000000..501fdd11960 --- /dev/null +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class MemoryPool; +class Status; + +namespace fs { + +namespace internal { +class HdfsReadableFile; +class HdfsOutputStream; +} // namespace internal + +enum class HadoopDriver : char { LIBHDFS, LIBHDFS3 }; + +struct HadoopOptions { + std::string host; + int port; + std::string user; + std::string kerb_ticket; + std::unordered_map extra_conf; + HadoopDriver driver; + // TODO(bkietz) add replication, buffer_size, ... +}; + +// TODO(bkietz) delete this in favor of fs::FileType +struct ObjectType { + using type = FileType; + static constexpr type FILE = FileType::File; + static constexpr type DIRECTORY = FileType::Directory; +}; + +struct HadoopPathInfo { + // TODO(bkietz) consolidate with FileStats + ObjectType::type kind; + + std::string name; + std::string owner; + std::string group; + + // Access times in UNIX timestamps (seconds) + int64_t size; + int64_t block_size; + + int32_t last_modified_time; + int32_t last_access_time; + + int16_t replication; + int16_t permissions; +}; + +class ARROW_EXPORT HadoopFileSystem : public FileSystem { + public: + /// Connect to an HDFS cluster given a configuration + /// + /// \param config (in): configuration for connecting + /// \param fs (out): the created client + /// \returns Status + static Status Connect(const HadoopOptions& options, + std::shared_ptr* fs); + + /// Disconnect from cluster + /// + /// \returns Status + Status Disconnect(); + + using FileSystem::GetTargetStats; + Status GetTargetStats(const std::string& path, FileStats* out) override; + Status GetTargetStats(const Selector& select, std::vector* out) override; + + Status CreateDir(const std::string& path, bool recursive = true) override; + + Status DeleteDir(const std::string& path) override; + + Status DeleteDirContents(const std::string& path) override; + + Status DeleteFile(const std::string& path) override; + + Status Move(const std::string& src, const std::string& dest) override; + + Status CopyFile(const std::string& src, const std::string& dest) override; + + Status OpenInputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenInputFile(const std::string& path, + std::shared_ptr* out) override; + + Status OpenOutputStream(const std::string& path, + std::shared_ptr* out) override; + + Status OpenAppendStream(const std::string& path, + std::shared_ptr* out) override; + + /// \param path (in): absolute HDFS path + /// \returns bool, true if the path exists, false if not (or on error) + bool Exists(const std::string& path); + + /// \param path (in): absolute HDFS path + /// \param info (out) + /// \returns Status + Status GetPathInfo(const std::string& path, HadoopPathInfo* info); + + /// \param nbytes (out): total capacity of the filesystem + /// \returns Status + Status GetCapacity(int64_t* nbytes); + + /// \param nbytes (out): total bytes used of the filesystem + /// \returns Status + Status GetUsed(int64_t* nbytes); + + /// Change + /// + /// \param path file path to change + /// \param owner pass null for no change + /// \param group pass null for no change + Status Chown(const std::string& path, const char* owner, const char* group); + + /// Change path permissions + /// + /// \param path Absolute path in file system + /// \param mode Mode bitset + /// \return Status + Status Chmod(const std::string& path, int mode); + + // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory + + /// Open an HDFS file in READ mode. Returns error + /// status if the file is not found. + /// + /// \param path complete file path + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr* file); + + Status OpenReadable(const std::string& path, + std::shared_ptr* file); + + /// FileMode::WRITE options + /// + /// \param path complete file path + /// \param buffer_size, 0 for default + /// \param replication, 0 for default + /// \param default_block_size, 0 for default + Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, + int16_t replication, int64_t default_block_size, + std::shared_ptr* file); + + Status OpenWritable(const std::string& path, bool append, + std::shared_ptr* file); + + private: + class ARROW_NO_EXPORT Impl; + std::unique_ptr impl_; + + HadoopFileSystem(); + ARROW_DISALLOW_COPY_AND_ASSIGN(HadoopFileSystem); +}; + +Status ARROW_EXPORT HaveLibHdfs(); +Status ARROW_EXPORT HaveLibHdfs3(); + +} // namespace fs +} // namespace arrow diff --git a/cpp/src/arrow/io/hdfs_internal.cc b/cpp/src/arrow/filesystem/hdfs_internal.cc similarity index 61% rename from cpp/src/arrow/io/hdfs_internal.cc rename to cpp/src/arrow/filesystem/hdfs_internal.cc index 0b1bd66e3bc..6de8ece3110 100644 --- a/cpp/src/arrow/io/hdfs_internal.cc +++ b/cpp/src/arrow/filesystem/hdfs_internal.cc @@ -28,7 +28,7 @@ // This software may be modified and distributed under the terms // of the BSD license. See the LICENSE file for details. -#include "arrow/io/hdfs_internal.h" +#include "arrow/filesystem/hdfs_internal.h" #include #include @@ -45,16 +45,17 @@ #include // NOLINT #endif +#include "arrow/buffer.h" #include "arrow/status.h" #include "arrow/util/logging.h" namespace arrow { -namespace io { +namespace fs { namespace internal { #ifdef ARROW_WITH_BOOST_FILESYSTEM -namespace fs = boost::filesystem; +namespace bfs = boost::filesystem; #ifndef _WIN32 static void* libjvm_handle = NULL; @@ -66,18 +67,18 @@ static HINSTANCE libjvm_handle = NULL; */ // Helper functions for dlopens -static std::vector get_potential_libjvm_paths(); -static std::vector get_potential_libhdfs_paths(); -static std::vector get_potential_libhdfs3_paths(); -static arrow::Status try_dlopen(std::vector potential_paths, const char* name, +static std::vector get_potential_libjvm_paths(); +static std::vector get_potential_libhdfs_paths(); +static std::vector get_potential_libhdfs3_paths(); +static arrow::Status try_dlopen(std::vector potential_paths, const char* name, #ifndef _WIN32 void*& out_handle); #else HINSTANCE& out_handle); #endif -static std::vector get_potential_libhdfs_paths() { - std::vector libhdfs_potential_paths; +static std::vector get_potential_libhdfs_paths() { + std::vector libhdfs_potential_paths; std::string file_name; // OS-specific file name @@ -90,18 +91,18 @@ static std::vector get_potential_libhdfs_paths() { #endif // Common paths - std::vector search_paths = {fs::path(""), fs::path(".")}; + std::vector search_paths = {bfs::path(""), bfs::path(".")}; // Path from environment variable const char* hadoop_home = std::getenv("HADOOP_HOME"); if (hadoop_home != nullptr) { - auto path = fs::path(hadoop_home) / "lib/native"; + auto path = bfs::path(hadoop_home) / "lib/native"; search_paths.push_back(path); } const char* libhdfs_dir = std::getenv("ARROW_LIBHDFS_DIR"); if (libhdfs_dir != nullptr) { - search_paths.push_back(fs::path(libhdfs_dir)); + search_paths.push_back(bfs::path(libhdfs_dir)); } // All paths with file name @@ -112,8 +113,8 @@ static std::vector get_potential_libhdfs_paths() { return libhdfs_potential_paths; } -static std::vector get_potential_libhdfs3_paths() { - std::vector potential_paths; +static std::vector get_potential_libhdfs3_paths() { + std::vector potential_paths; std::string file_name; // OS-specific file name @@ -126,11 +127,11 @@ static std::vector get_potential_libhdfs3_paths() { #endif // Common paths - std::vector search_paths = {fs::path(""), fs::path(".")}; + std::vector search_paths = {bfs::path(""), bfs::path(".")}; const char* libhdfs3_dir = std::getenv("ARROW_LIBHDFS3_DIR"); if (libhdfs3_dir != nullptr) { - search_paths.push_back(fs::path(libhdfs3_dir)); + search_paths.push_back(bfs::path(libhdfs3_dir)); } // All paths with file name @@ -141,11 +142,11 @@ static std::vector get_potential_libhdfs3_paths() { return potential_paths; } -static std::vector get_potential_libjvm_paths() { - std::vector libjvm_potential_paths; +static std::vector get_potential_libjvm_paths() { + std::vector libjvm_potential_paths; - std::vector search_prefixes; - std::vector search_suffixes; + std::vector search_prefixes; + std::vector search_suffixes; std::string file_name; // From heuristics @@ -197,7 +198,7 @@ static std::vector get_potential_libjvm_paths() { // Generate cross product between search_prefixes, search_suffixes, and file_name for (auto& prefix : search_prefixes) { for (auto& suffix : search_suffixes) { - auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name)); + auto path = (bfs::path(prefix) / bfs::path(suffix) / bfs::path(file_name)); libjvm_potential_paths.push_back(path); } } @@ -206,7 +207,7 @@ static std::vector get_potential_libjvm_paths() { } #ifndef _WIN32 -static arrow::Status try_dlopen(std::vector potential_paths, const char* name, +static arrow::Status try_dlopen(std::vector potential_paths, const char* name, void*& out_handle) { std::vector error_messages; @@ -235,7 +236,7 @@ static arrow::Status try_dlopen(std::vector potential_paths, const cha } #else -static arrow::Status try_dlopen(std::vector potential_paths, const char* name, +static arrow::Status try_dlopen(std::vector potential_paths, const char* name, HINSTANCE& out_handle) { std::vector error_messages; @@ -324,29 +325,29 @@ int LibHdfsShim::BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char return this->hdfsBuilderConfSetStr(bld, key, val); } -int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); } +int LibHdfsShim::Disconnect(hdfsFS bfs) { return this->hdfsDisconnect(bfs); } -hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize, +hdfsFile LibHdfsShim::OpenFile(hdfsFS bfs, const char* path, int flags, int bufferSize, short replication, tSize blocksize) { // NOLINT - return this->hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize); + return this->hdfsOpenFile(bfs, path, flags, bufferSize, replication, blocksize); } -int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) { - return this->hdfsCloseFile(fs, file); +int LibHdfsShim::CloseFile(hdfsFS bfs, hdfsFile file) { + return this->hdfsCloseFile(bfs, file); } -int LibHdfsShim::Exists(hdfsFS fs, const char* path) { - return this->hdfsExists(fs, path); +int LibHdfsShim::Exists(hdfsFS bfs, const char* path) { + return this->hdfsExists(bfs, path); } -int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { - return this->hdfsSeek(fs, file, desiredPos); +int LibHdfsShim::Seek(hdfsFS bfs, hdfsFile file, tOffset desiredPos) { + return this->hdfsSeek(bfs, file, desiredPos); } -tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { return this->hdfsTell(fs, file); } +tOffset LibHdfsShim::Tell(hdfsFS bfs, hdfsFile file) { return this->hdfsTell(bfs, file); } -tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) { - return this->hdfsRead(fs, file, buffer, length); +tSize LibHdfsShim::Read(hdfsFS bfs, hdfsFile file, void* buffer, tSize length) { + return this->hdfsRead(bfs, file, buffer, length); } bool LibHdfsShim::HasPread() { @@ -354,23 +355,23 @@ bool LibHdfsShim::HasPread() { return this->hdfsPread != nullptr; } -tSize LibHdfsShim::Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, +tSize LibHdfsShim::Pread(hdfsFS bfs, hdfsFile file, tOffset position, void* buffer, tSize length) { GET_SYMBOL(this, hdfsPread); DCHECK(this->hdfsPread); - return this->hdfsPread(fs, file, position, buffer, length); + return this->hdfsPread(bfs, file, position, buffer, length); } -tSize LibHdfsShim::Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) { - return this->hdfsWrite(fs, file, buffer, length); +tSize LibHdfsShim::Write(hdfsFS bfs, hdfsFile file, const void* buffer, tSize length) { + return this->hdfsWrite(bfs, file, buffer, length); } -int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { return this->hdfsFlush(fs, file); } +int LibHdfsShim::Flush(hdfsFS bfs, hdfsFile file) { return this->hdfsFlush(bfs, file); } -int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { +int LibHdfsShim::Available(hdfsFS bfs, hdfsFile file) { GET_SYMBOL(this, hdfsAvailable); if (this->hdfsAvailable) - return this->hdfsAvailable(fs, file); + return this->hdfsAvailable(bfs, file); else return 0; } @@ -391,66 +392,66 @@ int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* d return 0; } -int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { - return this->hdfsDelete(fs, path, recursive); +int LibHdfsShim::Delete(hdfsFS bfs, const char* path, int recursive) { + return this->hdfsDelete(bfs, path, recursive); } -int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { +int LibHdfsShim::Rename(hdfsFS bfs, const char* oldPath, const char* newPath) { GET_SYMBOL(this, hdfsRename); if (this->hdfsRename) - return this->hdfsRename(fs, oldPath, newPath); + return this->hdfsRename(bfs, oldPath, newPath); else return 0; } -char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { +char* LibHdfsShim::GetWorkingDirectory(hdfsFS bfs, char* buffer, size_t bufferSize) { GET_SYMBOL(this, hdfsGetWorkingDirectory); if (this->hdfsGetWorkingDirectory) { - return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); + return this->hdfsGetWorkingDirectory(bfs, buffer, bufferSize); } else { return NULL; } } -int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) { +int LibHdfsShim::SetWorkingDirectory(hdfsFS bfs, const char* path) { GET_SYMBOL(this, hdfsSetWorkingDirectory); if (this->hdfsSetWorkingDirectory) { - return this->hdfsSetWorkingDirectory(fs, path); + return this->hdfsSetWorkingDirectory(bfs, path); } else { return 0; } } -int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) { - return this->hdfsCreateDirectory(fs, path); +int LibHdfsShim::MakeDirectory(hdfsFS bfs, const char* path) { + return this->hdfsCreateDirectory(bfs, path); } -int LibHdfsShim::SetReplication(hdfsFS fs, const char* path, int16_t replication) { +int LibHdfsShim::SetReplication(hdfsFS bfs, const char* path, int16_t replication) { GET_SYMBOL(this, hdfsSetReplication); if (this->hdfsSetReplication) { - return this->hdfsSetReplication(fs, path, replication); + return this->hdfsSetReplication(bfs, path, replication); } else { return 0; } } -hdfsFileInfo* LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) { - return this->hdfsListDirectory(fs, path, numEntries); +hdfsFileInfo* LibHdfsShim::ListDirectory(hdfsFS bfs, const char* path, int* numEntries) { + return this->hdfsListDirectory(bfs, path, numEntries); } -hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) { - return this->hdfsGetPathInfo(fs, path); +hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS bfs, const char* path) { + return this->hdfsGetPathInfo(bfs, path); } void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) { this->hdfsFreeFileInfo(hdfsFileInfo, numEntries); } -char*** LibHdfsShim::GetHosts(hdfsFS fs, const char* path, tOffset start, +char*** LibHdfsShim::GetHosts(hdfsFS bfs, const char* path, tOffset start, tOffset length) { GET_SYMBOL(this, hdfsGetHosts); if (this->hdfsGetHosts) { - return this->hdfsGetHosts(fs, path, start, length); + return this->hdfsGetHosts(bfs, path, start, length); } else { return NULL; } @@ -463,32 +464,32 @@ void LibHdfsShim::FreeHosts(char*** blockHosts) { } } -tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) { +tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS bfs) { GET_SYMBOL(this, hdfsGetDefaultBlockSize); if (this->hdfsGetDefaultBlockSize) { - return this->hdfsGetDefaultBlockSize(fs); + return this->hdfsGetDefaultBlockSize(bfs); } else { return 0; } } -tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { return this->hdfsGetCapacity(fs); } +tOffset LibHdfsShim::GetCapacity(hdfsFS bfs) { return this->hdfsGetCapacity(bfs); } -tOffset LibHdfsShim::GetUsed(hdfsFS fs) { return this->hdfsGetUsed(fs); } +tOffset LibHdfsShim::GetUsed(hdfsFS bfs) { return this->hdfsGetUsed(bfs); } -int LibHdfsShim::Chown(hdfsFS fs, const char* path, const char* owner, +int LibHdfsShim::Chown(hdfsFS bfs, const char* path, const char* owner, const char* group) { - return this->hdfsChown(fs, path, owner, group); + return this->hdfsChown(bfs, path, owner, group); } -int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT - return this->hdfsChmod(fs, path, mode); +int LibHdfsShim::Chmod(hdfsFS bfs, const char* path, short mode) { // NOLINT + return this->hdfsChmod(bfs, path, mode); } -int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { +int LibHdfsShim::Utime(hdfsFS bfs, const char* path, tTime mtime, tTime atime) { GET_SYMBOL(this, hdfsUtime); if (this->hdfsUtime) { - return this->hdfsUtime(fs, path, mtime, atime); + return this->hdfsUtime(bfs, path, mtime, atime); } else { return 0; } @@ -539,10 +540,10 @@ Status ConnectLibHdfs(LibHdfsShim** driver) { shim->Initialize(); - std::vector libjvm_potential_paths = get_potential_libjvm_paths(); + std::vector libjvm_potential_paths = get_potential_libjvm_paths(); RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle)); - std::vector libhdfs_potential_paths = get_potential_libhdfs_paths(); + std::vector libhdfs_potential_paths = get_potential_libhdfs_paths(); RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", shim->handle)); } else if (shim->handle == nullptr) { return Status::IOError("Prior attempt to load libhdfs failed"); @@ -564,7 +565,7 @@ Status ConnectLibHdfs3(LibHdfsShim** driver) { shim->Initialize(); - std::vector libhdfs3_potential_paths = get_potential_libhdfs3_paths(); + std::vector libhdfs3_potential_paths = get_potential_libhdfs3_paths(); RETURN_NOT_OK(try_dlopen(libhdfs3_potential_paths, "libhdfs3", shim->handle)); } else if (shim->handle == nullptr) { return Status::IOError("Prior attempt to load libhdfs3 failed"); @@ -586,6 +587,147 @@ Status ConnectLibHdfs3(LibHdfsShim** driver) { #endif +Status HdfsAnyFileImpl::DoSeek(int64_t position) { + int ret = driver_->Seek(fs_, file_, position); + CHECK_FAILURE(ret, "seek"); + return Status::OK(); +} + +Status HdfsAnyFileImpl::DoTell(int64_t* offset) const { + int64_t ret = driver_->Tell(fs_, file_); + CHECK_FAILURE(ret, "tell"); + *offset = ret; + return Status::OK(); +} + +HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(Close()); } + +Status HdfsReadableFile::Close() { + if (is_open_) { + int ret = driver_->CloseFile(fs_, file_); + CHECK_FAILURE(ret, "CloseFile"); + is_open_ = false; + } + return Status::OK(); +} + +Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + void* buffer) { + tSize ret; + if (driver_->HasPread()) { + ret = driver_->Pread(fs_, file_, static_cast(position), + reinterpret_cast(buffer), static_cast(nbytes)); + } else { + std::lock_guard guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, buffer); + } + CHECK_FAILURE(ret, "read"); + *bytes_read = ret; + return Status::OK(); +} + +Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr* out) { + std::shared_ptr buffer; + RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); + + int64_t bytes_read = 0; + RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); + + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + buffer->ZeroPadding(); + } + + *out = buffer; + return Status::OK(); +} + +Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) { + int64_t total_bytes = 0; + while (total_bytes < nbytes) { + tSize ret = driver_->Read( + fs_, file_, reinterpret_cast(buffer) + total_bytes, + static_cast(std::min(buffer_size_, nbytes - total_bytes))); + CHECK_FAILURE(ret, "read"); + total_bytes += ret; + if (ret == 0) { + break; + } + } + + *bytes_read = total_bytes; + return Status::OK(); +} + +Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr* out) { + std::shared_ptr buffer; + RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); + + int64_t bytes_read = 0; + RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + } + + *out = std::move(buffer); + return Status::OK(); +} + +inline Status GetPathInfoFailed(const std::string& path) { + std::stringstream ss; + ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno); + return Status::IOError(ss.str()); +} + +Status HdfsReadableFile::GetSize(int64_t* size) { + hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); + if (entry == nullptr) { + return GetPathInfoFailed(path_); + } + + *size = entry->mSize; + driver_->FreeFileInfo(entry, 1); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// File writing + +HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(Close()); } + +Status HdfsOutputStream::Close() { + if (is_open_) { + RETURN_NOT_OK(Flush()); + int ret = driver_->CloseFile(fs_, file_); + CHECK_FAILURE(ret, "CloseFile"); + is_open_ = false; + } + return Status::OK(); +} + +Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes, + int64_t* bytes_written) { + std::lock_guard guard(lock_); + tSize ret = driver_->Write(fs_, file_, reinterpret_cast(buffer), + static_cast(nbytes)); + CHECK_FAILURE(ret, "Write"); + *bytes_written = ret; + return Status::OK(); +} + +Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) { + int64_t bytes_written_dummy = 0; + return Write(buffer, nbytes, &bytes_written_dummy); +} + +Status HdfsOutputStream::Flush() { + int ret = driver_->Flush(fs_, file_); + CHECK_FAILURE(ret, "Flush"); + return Status::OK(); +} + } // namespace internal -} // namespace io +} // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/io/hdfs_internal.h b/cpp/src/arrow/filesystem/hdfs_internal.h similarity index 67% rename from cpp/src/arrow/io/hdfs_internal.h rename to cpp/src/arrow/filesystem/hdfs_internal.h index 3912f2f1144..2df9d2fedf8 100644 --- a/cpp/src/arrow/io/hdfs_internal.h +++ b/cpp/src/arrow/filesystem/hdfs_internal.h @@ -15,14 +15,17 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_IO_HDFS_INTERNAL -#define ARROW_IO_HDFS_INTERNAL +#pragma once #include #include +#include #include +#include "arrow/filesystem/hdfs.h" +#include "arrow/io/interfaces.h" +#include "arrow/memory_pool.h" #include "arrow/util/visibility.h" #include "arrow/util/windows_compatibility.h" // IWYU pragma: keep @@ -34,9 +37,121 @@ namespace arrow { class Status; -namespace io { +namespace fs { namespace internal { +inline std::string TranslateErrno(int error_code) { + return util::StringBuilder( + error_code, " (", strerror(error_code), ")", + // Unknown error can occur if the host is correct but the port is not + error_code == 255 + ? " Please check that you are connecting to the correct HDFS RPC port" + : ""); +} + +#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ + do { \ + if (RETURN_VALUE == -1) { \ + return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \ + } \ + } while (0) + +struct LibHdfsShim; + +class HdfsAnyFileImpl { + public: + Status DoSeek(int64_t position); + + Status DoTell(int64_t* offset) const; + + protected: + HdfsAnyFileImpl(const std::string& path, LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle) + : path_(path), driver_(driver), fs_(fs), file_(handle) {} + + std::string path_; + + internal::LibHdfsShim* driver_; + + // For threadsafety + std::mutex lock_; + + // These are pointers in libhdfs, so OK to copy + hdfsFS fs_; + hdfsFile file_; + + bool is_open_ = true; +}; + +class ARROW_EXPORT HdfsReadableFile : public io::RandomAccessFile, + public HdfsAnyFileImpl { + public: + HdfsReadableFile(const std::string& path, LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle, int32_t buffer_size, MemoryPool* pool) + : HdfsAnyFileImpl(path, driver, fs, handle), + pool_(pool), + buffer_size_(buffer_size) {} + + HdfsReadableFile(const std::string& path, LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle, int32_t buffer_size) + : HdfsReadableFile(path, driver, fs, handle, buffer_size, default_memory_pool()) {} + + ~HdfsReadableFile(); + + Status Close() override; + + bool closed() const override { return !is_open_; } + + Status GetSize(int64_t* size) override; + + // NOTE: If you wish to read a particular range of a file in a multithreaded + // context, you may prefer to use ReadAt to avoid locking issues + Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override; + + Status Read(int64_t nbytes, std::shared_ptr* out) override; + + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + void* buffer) override; + + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + Status Seek(int64_t position) override { return DoSeek(position); } + + Status Tell(int64_t* position) const override { return DoTell(position); } + + private: + MemoryPool* pool_; + int32_t buffer_size_; + + ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); +}; + +// Naming this file OutputStream because it does not support seeking (like the +// WritableFile interface) +class ARROW_EXPORT HdfsOutputStream : public io::OutputStream, public HdfsAnyFileImpl { + public: + HdfsOutputStream(const std::string& path, LibHdfsShim* driver, hdfsFS fs, + hdfsFile handle) + : HdfsAnyFileImpl(path, driver, fs, handle) {} + + ~HdfsOutputStream(); + + Status Close() override; + + bool closed() const override { return !is_open_; } + + using OutputStream::Write; + Status Write(const void* buffer, int64_t nbytes) override; + Status Write(const void* buffer, int64_t nbytes, int64_t* bytes_written); + + Status Flush() override; + + Status Tell(int64_t* position) const override { return DoTell(position); } + + private: + ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); +}; + // NOTE(wesm): cpplint does not like use of short and other imprecise C types struct LibHdfsShim { #ifndef _WIN32 @@ -218,7 +333,5 @@ Status ARROW_EXPORT ConnectLibHdfs(LibHdfsShim** driver); Status ARROW_EXPORT ConnectLibHdfs3(LibHdfsShim** driver); } // namespace internal -} // namespace io +} // namespace fs } // namespace arrow - -#endif // ARROW_IO_HDFS_INTERNAL diff --git a/cpp/src/arrow/io/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc similarity index 80% rename from cpp/src/arrow/io/hdfs_test.cc rename to cpp/src/arrow/filesystem/hdfs_test.cc index 2d3b6b95fc9..97b9221397e 100644 --- a/cpp/src/arrow/io/hdfs_test.cc +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -31,15 +31,15 @@ #include // NOLINT #include "arrow/buffer.h" -#include "arrow/io/hdfs.h" -#include "arrow/io/hdfs_internal.h" +#include "arrow/filesystem/hdfs.h" +#include "arrow/filesystem/hdfs_internal.h" #include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" namespace arrow { -namespace io { +namespace fs { std::vector RandomData(int64_t size) { std::vector buffer(size); @@ -48,11 +48,11 @@ std::vector RandomData(int64_t size) { } struct JNIDriver { - static HdfsDriver type; + static constexpr HadoopDriver type = HadoopDriver::LIBHDFS; }; struct PivotalDriver { - static HdfsDriver type; + static constexpr HadoopDriver type = HadoopDriver::LIBHDFS3; }; template @@ -60,15 +60,15 @@ class TestHadoopFileSystem : public ::testing::Test { public: Status MakeScratchDir() { if (client_->Exists(scratch_dir_)) { - RETURN_NOT_OK((client_->Delete(scratch_dir_, true))); + RETURN_NOT_OK((client_->DeleteDir(scratch_dir_))); } - return client_->MakeDirectory(scratch_dir_); + return client_->CreateDir(scratch_dir_); } Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size, bool append = false, int buffer_size = 0, int16_t replication = 0, int default_block_size = 0) { - std::shared_ptr file; + std::shared_ptr file; RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication, default_block_size, &file)); @@ -86,7 +86,7 @@ class TestHadoopFileSystem : public ::testing::Test { std::string HdfsAbsPath(const std::string& relpath) { std::stringstream ss; - ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath; + ss << "hdfs://" << opts_.host << ":" << opts_.port << relpath; return ss.str(); } @@ -104,7 +104,7 @@ class TestHadoopFileSystem : public ::testing::Test { Status msg; - if (DRIVER::type == HdfsDriver::LIBHDFS) { + if (DRIVER::type == HadoopDriver::LIBHDFS) { msg = ConnectLibHdfs(&driver_shim); if (!msg.ok()) { std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl; @@ -127,24 +127,24 @@ class TestHadoopFileSystem : public ::testing::Test { ASSERT_TRUE(user != nullptr) << "Set ARROW_HDFS_TEST_USER"; - conf_.host = host == nullptr ? "localhost" : host; - conf_.user = user; - conf_.port = port == nullptr ? 20500 : atoi(port); - conf_.driver = DRIVER::type; + opts_.host = host == nullptr ? "localhost" : host; + opts_.user = user; + opts_.port = port == nullptr ? 20500 : atoi(port); + opts_.driver = DRIVER::type; - ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_)); + ASSERT_OK(HadoopFileSystem::Connect(opts_, &client_)); } void TearDown() { if (client_) { if (client_->Exists(scratch_dir_)) { - ARROW_EXPECT_OK(client_->Delete(scratch_dir_, true)); + ARROW_EXPECT_OK(client_->DeleteDir(scratch_dir_)); } ARROW_EXPECT_OK(client_->Disconnect()); } } - HdfsConnectionConfig conf_; + HadoopOptions opts_; bool loaded_driver_; // Resources shared amongst unit tests @@ -154,9 +154,7 @@ class TestHadoopFileSystem : public ::testing::Test { template <> std::string TestHadoopFileSystem::HdfsAbsPath(const std::string& relpath) { - std::stringstream ss; - ss << relpath; - return ss.str(); + return relpath; } #define SKIP_IF_NO_DRIVER() \ @@ -165,9 +163,6 @@ std::string TestHadoopFileSystem::HdfsAbsPath(const std::string& return; \ } -HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS; -HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3; - typedef ::testing::Types DriverTypes; TYPED_TEST_CASE(TestHadoopFileSystem, DriverTypes); @@ -175,7 +170,7 @@ TYPED_TEST(TestHadoopFileSystem, ConnectsAgain) { SKIP_IF_NO_DRIVER(); std::shared_ptr client; - ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client)); + ASSERT_OK(HadoopFileSystem::Connect(this->opts_, &client)); ASSERT_OK(client->Disconnect()); } @@ -186,33 +181,36 @@ TYPED_TEST(TestHadoopFileSystem, MultipleClients) { std::shared_ptr client1; std::shared_ptr client2; - ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1)); - ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2)); + ASSERT_OK(HadoopFileSystem::Connect(this->opts_, &client1)); + ASSERT_OK(HadoopFileSystem::Connect(this->opts_, &client2)); ASSERT_OK(client1->Disconnect()); // client2 continues to function after equivalent client1 has shutdown - std::vector listing; - ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing)); + std::vector listing; + Selector scratch_dir; + scratch_dir.base_dir = this->scratch_dir_; + ASSERT_OK(client2->GetTargetStats(scratch_dir, &listing)); ASSERT_OK(client2->Disconnect()); } TYPED_TEST(TestHadoopFileSystem, MakeDirectory) { SKIP_IF_NO_DRIVER(); - std::string path = this->ScratchPath("create-directory"); + Selector scratch_dir; + scratch_dir.base_dir = this->ScratchPath("create-directory"); - if (this->client_->Exists(path)) { - ASSERT_OK(this->client_->Delete(path, true)); + if (this->client_->Exists(scratch_dir.base_dir)) { + ASSERT_OK(this->client_->DeleteDir(scratch_dir.base_dir)); } - ASSERT_OK(this->client_->MakeDirectory(path)); - ASSERT_TRUE(this->client_->Exists(path)); - std::vector listing; - ARROW_EXPECT_OK(this->client_->ListDirectory(path, &listing)); + ASSERT_OK(this->client_->CreateDir(scratch_dir.base_dir)); + ASSERT_TRUE(this->client_->Exists(scratch_dir.base_dir)); + std::vector listing; + ARROW_EXPECT_OK(this->client_->GetTargetStats(scratch_dir, &listing)); ASSERT_EQ(0, listing.size()); - ARROW_EXPECT_OK(this->client_->Delete(path, true)); - ASSERT_FALSE(this->client_->Exists(path)); - ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing)); + ARROW_EXPECT_OK(this->client_->DeleteDir(scratch_dir.base_dir)); + ASSERT_FALSE(this->client_->Exists(scratch_dir.base_dir)); + ASSERT_RAISES(IOError, this->client_->GetTargetStats(scratch_dir, &listing)); } TYPED_TEST(TestHadoopFileSystem, GetCapacityUsed) { @@ -231,7 +229,7 @@ TYPED_TEST(TestHadoopFileSystem, GetCapacityUsed) { TYPED_TEST(TestHadoopFileSystem, GetPathInfo) { SKIP_IF_NO_DRIVER(); - HdfsPathInfo info; + HadoopPathInfo info; ASSERT_OK(this->MakeScratchDir()); @@ -239,7 +237,7 @@ TYPED_TEST(TestHadoopFileSystem, GetPathInfo) { ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info)); ASSERT_EQ(ObjectType::DIRECTORY, info.kind); ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name); - ASSERT_EQ(this->conf_.user, info.owner); + ASSERT_EQ(this->opts_.user, info.owner); // TODO(wesm): test group, other attrs @@ -254,7 +252,7 @@ TYPED_TEST(TestHadoopFileSystem, GetPathInfo) { ASSERT_EQ(ObjectType::FILE, info.kind); ASSERT_EQ(this->HdfsAbsPath(path), info.name); - ASSERT_EQ(this->conf_.user, info.owner); + ASSERT_EQ(this->opts_.user, info.owner); ASSERT_EQ(size, info.size); } @@ -265,7 +263,7 @@ TYPED_TEST(TestHadoopFileSystem, GetPathInfoNotExist) { ASSERT_OK(this->MakeScratchDir()); auto path = this->ScratchPath("path-does-not-exist"); - HdfsPathInfo info; + HadoopPathInfo info; Status s = this->client_->GetPathInfo(path, &info); ASSERT_TRUE(s.IsIOError()); @@ -289,7 +287,7 @@ TYPED_TEST(TestHadoopFileSystem, AppendToFile) { // now append ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true)); - HdfsPathInfo info; + HadoopPathInfo info; ASSERT_OK(this->client_->GetPathInfo(path, &info)); ASSERT_EQ(size * 2, info.size); } @@ -307,29 +305,31 @@ TYPED_TEST(TestHadoopFileSystem, ListDirectory) { ASSERT_OK(this->MakeScratchDir()); ASSERT_OK(this->WriteDummyFile(p1, data.data(), size)); ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2)); - ASSERT_OK(this->client_->MakeDirectory(d1)); + ASSERT_OK(this->client_->CreateDir(d1)); - std::vector listing; - ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing)); + std::vector listing; + Selector scratch_dir; + scratch_dir.base_dir = this->scratch_dir_; + ASSERT_OK(this->client_->GetTargetStats(scratch_dir, &listing)); // Do it again, appends! - ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing)); + ASSERT_OK(this->client_->GetTargetStats(scratch_dir, &listing)); ASSERT_EQ(6, static_cast(listing.size())); // Argh, well, shouldn't expect the listing to be in any particular order for (size_t i = 0; i < listing.size(); ++i) { - const HdfsPathInfo& info = listing[i]; - if (info.name == this->HdfsAbsPath(p1)) { - ASSERT_EQ(ObjectType::FILE, info.kind); - ASSERT_EQ(size, info.size); - } else if (info.name == this->HdfsAbsPath(p2)) { - ASSERT_EQ(ObjectType::FILE, info.kind); - ASSERT_EQ(size / 2, info.size); - } else if (info.name == this->HdfsAbsPath(d1)) { - ASSERT_EQ(ObjectType::DIRECTORY, info.kind); + const auto& info = listing[i]; + if (info.path() == this->HdfsAbsPath(p1)) { + ASSERT_EQ(info.type(), FileType::File); + ASSERT_EQ(info.size(), size); + } else if (info.path() == this->HdfsAbsPath(p2)) { + ASSERT_EQ(info.type(), FileType::File); + ASSERT_EQ(info.size(), size / 2); + } else if (info.path() == this->HdfsAbsPath(d1)) { + ASSERT_EQ(info.type(), FileType::Directory); } else { - FAIL() << "Unexpected path: " << info.name; + FAIL() << "Unexpected path: " << info.path(); } } } @@ -345,7 +345,7 @@ TYPED_TEST(TestHadoopFileSystem, ReadableMethods) { std::vector data = RandomData(size); ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); - std::shared_ptr file; + std::shared_ptr file; ASSERT_OK(this->client_->OpenReadable(path, &file)); // Test GetSize -- move this into its own unit test if ever needed @@ -392,7 +392,7 @@ TYPED_TEST(TestHadoopFileSystem, LargeFile) { std::vector data = RandomData(size); ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); - std::shared_ptr file; + std::shared_ptr file; ASSERT_OK(this->client_->OpenReadable(path, &file)); ASSERT_FALSE(file->closed()); @@ -407,7 +407,7 @@ TYPED_TEST(TestHadoopFileSystem, LargeFile) { ASSERT_EQ(size, bytes_read); // explicit buffer size - std::shared_ptr file2; + std::shared_ptr file2; ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2)); std::shared_ptr buffer2; @@ -418,7 +418,7 @@ TYPED_TEST(TestHadoopFileSystem, LargeFile) { ASSERT_EQ(size, bytes_read); } -TYPED_TEST(TestHadoopFileSystem, RenameFile) { +TYPED_TEST(TestHadoopFileSystem, MoveFile) { SKIP_IF_NO_DRIVER(); ASSERT_OK(this->MakeScratchDir()); @@ -429,7 +429,7 @@ TYPED_TEST(TestHadoopFileSystem, RenameFile) { std::vector data = RandomData(size); ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size)); - ASSERT_OK(this->client_->Rename(src_path, dst_path)); + ASSERT_OK(this->client_->Move(src_path, dst_path)); ASSERT_FALSE(this->client_->Exists(src_path)); ASSERT_TRUE(this->client_->Exists(dst_path)); @@ -447,7 +447,7 @@ TYPED_TEST(TestHadoopFileSystem, ChmodChown) { std::vector data = RandomData(size); ASSERT_OK(this->WriteDummyFile(path, data.data(), size)); - HdfsPathInfo info; + HadoopPathInfo info; ASSERT_OK(this->client_->Chmod(path, mode)); ASSERT_OK(this->client_->GetPathInfo(path, &info)); ASSERT_EQ(mode, info.permissions); @@ -470,7 +470,7 @@ TYPED_TEST(TestHadoopFileSystem, ThreadSafety) { ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast(data.c_str()), static_cast(data.size()))); - std::shared_ptr file; + std::shared_ptr file; ASSERT_OK(this->client_->OpenReadable(src_path, &file)); std::atomic correct_count(0); @@ -506,5 +506,5 @@ TYPED_TEST(TestHadoopFileSystem, ThreadSafety) { ASSERT_EQ(niter * 4, correct_count); } -} // namespace io +} // namespace fs } // namespace arrow diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index f84c79930f5..c30bc5d93ff 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -22,10 +22,6 @@ add_arrow_test(buffered_test PREFIX "arrow-io") add_arrow_test(compressed_test PREFIX "arrow-io") add_arrow_test(file_test PREFIX "arrow-io") -if(ARROW_HDFS) - add_arrow_test(hdfs_test NO_VALGRIND PREFIX "arrow-io") -endif() - add_arrow_test(memory_test PREFIX "arrow-io") add_arrow_test(readahead_test PREFIX "arrow-io") diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 8845aaf55b9..2208b1bb8cc 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -17,7 +17,6 @@ #include -#include #include #include #include @@ -31,644 +30,14 @@ #include #include "arrow/buffer.h" +#include "arrow/filesystem/hdfs_internal.h" +#include "arrow/filesystem/util_internal.h" #include "arrow/io/hdfs.h" -#include "arrow/io/hdfs_internal.h" #include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/status.h" #include "arrow/util/logging.h" -using std::size_t; - namespace arrow { -namespace io { - -namespace { - -std::string TranslateErrno(int error_code) { - std::stringstream ss; - ss << error_code << " (" << strerror(error_code) << ")"; - if (error_code == 255) { - // Unknown error can occur if the host is correct but the port is not - ss << " Please check that you are connecting to the correct HDFS RPC port"; - } - return ss.str(); -} - -} // namespace - -#define CHECK_FAILURE(RETURN_VALUE, WHAT) \ - do { \ - if (RETURN_VALUE == -1) { \ - return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \ - } \ - } while (0) - -static constexpr int kDefaultHdfsBufferSize = 1 << 16; - -// ---------------------------------------------------------------------- -// File reading - -class HdfsAnyFileImpl { - public: - void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs, - hdfsFile handle) { - path_ = path; - driver_ = driver; - fs_ = fs; - file_ = handle; - is_open_ = true; - } - - Status Seek(int64_t position) { - int ret = driver_->Seek(fs_, file_, position); - CHECK_FAILURE(ret, "seek"); - return Status::OK(); - } - - Status Tell(int64_t* offset) { - int64_t ret = driver_->Tell(fs_, file_); - CHECK_FAILURE(ret, "tell"); - *offset = ret; - return Status::OK(); - } - - bool is_open() const { return is_open_; } - - protected: - std::string path_; - - internal::LibHdfsShim* driver_; - - // For threadsafety - std::mutex lock_; - - // These are pointers in libhdfs, so OK to copy - hdfsFS fs_; - hdfsFile file_; - - bool is_open_; -}; - -namespace { - -Status GetPathInfoFailed(const std::string& path) { - std::stringstream ss; - ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno); - return Status::IOError(ss.str()); -} - -} // namespace - -// Private implementation for read-only files -class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { - public: - explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} - - Status Close() { - if (is_open_) { - int ret = driver_->CloseFile(fs_, file_); - CHECK_FAILURE(ret, "CloseFile"); - is_open_ = false; - } - return Status::OK(); - } - - bool closed() const { return !is_open_; } - - Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, void* buffer) { - tSize ret; - if (driver_->HasPread()) { - ret = driver_->Pread(fs_, file_, static_cast(position), - reinterpret_cast(buffer), static_cast(nbytes)); - } else { - std::lock_guard guard(lock_); - RETURN_NOT_OK(Seek(position)); - return Read(nbytes, bytes_read, buffer); - } - CHECK_FAILURE(ret, "read"); - *bytes_read = ret; - return Status::OK(); - } - - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) { - std::shared_ptr buffer; - RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); - - int64_t bytes_read = 0; - RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); - - if (bytes_read < nbytes) { - RETURN_NOT_OK(buffer->Resize(bytes_read)); - buffer->ZeroPadding(); - } - - *out = buffer; - return Status::OK(); - } - - Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) { - int64_t total_bytes = 0; - while (total_bytes < nbytes) { - tSize ret = driver_->Read( - fs_, file_, reinterpret_cast(buffer) + total_bytes, - static_cast(std::min(buffer_size_, nbytes - total_bytes))); - CHECK_FAILURE(ret, "read"); - total_bytes += ret; - if (ret == 0) { - break; - } - } - - *bytes_read = total_bytes; - return Status::OK(); - } - - Status Read(int64_t nbytes, std::shared_ptr* out) { - std::shared_ptr buffer; - RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer)); - - int64_t bytes_read = 0; - RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); - if (bytes_read < nbytes) { - RETURN_NOT_OK(buffer->Resize(bytes_read)); - } - - *out = buffer; - return Status::OK(); - } - - Status GetSize(int64_t* size) { - hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); - if (entry == nullptr) { - return GetPathInfoFailed(path_); - } - - *size = entry->mSize; - driver_->FreeFileInfo(entry, 1); - return Status::OK(); - } - - void set_memory_pool(MemoryPool* pool) { pool_ = pool; } - - void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } - - private: - MemoryPool* pool_; - int32_t buffer_size_; -}; - -HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { - if (pool == nullptr) { - pool = default_memory_pool(); - } - impl_.reset(new HdfsReadableFileImpl(pool)); -} - -HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); } - -Status HdfsReadableFile::Close() { return impl_->Close(); } - -bool HdfsReadableFile::closed() const { return impl_->closed(); } - -Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, - void* buffer) { - return impl_->ReadAt(position, nbytes, bytes_read, buffer); -} - -Status HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, - std::shared_ptr* out) { - return impl_->ReadAt(position, nbytes, out); -} - -Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, void* buffer) { - return impl_->Read(nbytes, bytes_read, buffer); -} - -Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr* buffer) { - return impl_->Read(nbytes, buffer); -} - -Status HdfsReadableFile::GetSize(int64_t* size) { return impl_->GetSize(size); } - -Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); } - -Status HdfsReadableFile::Tell(int64_t* position) const { return impl_->Tell(position); } - -// ---------------------------------------------------------------------- -// File writing - -// Private implementation for writable-only files -class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { - public: - HdfsOutputStreamImpl() {} - - Status Close() { - if (is_open_) { - RETURN_NOT_OK(Flush()); - int ret = driver_->CloseFile(fs_, file_); - CHECK_FAILURE(ret, "CloseFile"); - is_open_ = false; - } - return Status::OK(); - } - - bool closed() const { return !is_open_; } - - Status Flush() { - int ret = driver_->Flush(fs_, file_); - CHECK_FAILURE(ret, "Flush"); - return Status::OK(); - } - - Status Write(const void* buffer, int64_t nbytes, int64_t* bytes_written) { - std::lock_guard guard(lock_); - tSize ret = driver_->Write(fs_, file_, reinterpret_cast(buffer), - static_cast(nbytes)); - CHECK_FAILURE(ret, "Write"); - *bytes_written = ret; - return Status::OK(); - } -}; - -HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); } - -HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_->Close()); } - -Status HdfsOutputStream::Close() { return impl_->Close(); } - -bool HdfsOutputStream::closed() const { return impl_->closed(); } - -Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes, int64_t* bytes_read) { - return impl_->Write(buffer, nbytes, bytes_read); -} - -Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) { - int64_t bytes_written_dummy = 0; - return Write(buffer, nbytes, &bytes_written_dummy); -} - -Status HdfsOutputStream::Flush() { return impl_->Flush(); } - -Status HdfsOutputStream::Tell(int64_t* position) const { return impl_->Tell(position); } - -// ---------------------------------------------------------------------- -// HDFS client - -// TODO(wesm): this could throw std::bad_alloc in the course of copying strings -// into the path info object -static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { - out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY; - out->name = std::string(input->mName); - out->owner = std::string(input->mOwner); - out->group = std::string(input->mGroup); - - out->last_access_time = static_cast(input->mLastAccess); - out->last_modified_time = static_cast(input->mLastMod); - out->size = static_cast(input->mSize); - - out->replication = input->mReplication; - out->block_size = input->mBlockSize; - - out->permissions = input->mPermissions; -} - -// Private implementation -class HadoopFileSystem::HadoopFileSystemImpl { - public: - HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {} - - Status Connect(const HdfsConnectionConfig* config) { - if (config->driver == HdfsDriver::LIBHDFS3) { - RETURN_NOT_OK(ConnectLibHdfs3(&driver_)); - } else { - RETURN_NOT_OK(ConnectLibHdfs(&driver_)); - } - - // connect to HDFS with the builder object - hdfsBuilder* builder = driver_->NewBuilder(); - if (!config->host.empty()) { - driver_->BuilderSetNameNode(builder, config->host.c_str()); - } - driver_->BuilderSetNameNodePort(builder, static_cast(config->port)); - if (!config->user.empty()) { - driver_->BuilderSetUserName(builder, config->user.c_str()); - } - if (!config->kerb_ticket.empty()) { - driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); - } - - for (auto& kv : config->extra_conf) { - int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); - CHECK_FAILURE(ret, "confsetstr"); - } - - driver_->BuilderSetForceNewInstance(builder); - fs_ = driver_->BuilderConnect(builder); - - if (fs_ == nullptr) { - return Status::IOError("HDFS connection failed"); - } - namenode_host_ = config->host; - port_ = config->port; - user_ = config->user; - kerb_ticket_ = config->kerb_ticket; - - return Status::OK(); - } - - Status MakeDirectory(const std::string& path) { - int ret = driver_->MakeDirectory(fs_, path.c_str()); - CHECK_FAILURE(ret, "create directory"); - return Status::OK(); - } - - Status Delete(const std::string& path, bool recursive) { - int ret = driver_->Delete(fs_, path.c_str(), static_cast(recursive)); - CHECK_FAILURE(ret, "delete"); - return Status::OK(); - } - - Status Disconnect() { - int ret = driver_->Disconnect(fs_); - CHECK_FAILURE(ret, "hdfsFS::Disconnect"); - return Status::OK(); - } - - bool Exists(const std::string& path) { - // hdfsExists does not distinguish between RPC failure and the file not - // existing - int ret = driver_->Exists(fs_, path.c_str()); - return ret == 0; - } - - Status GetCapacity(int64_t* nbytes) { - tOffset ret = driver_->GetCapacity(fs_); - CHECK_FAILURE(ret, "GetCapacity"); - *nbytes = ret; - return Status::OK(); - } - - Status GetUsed(int64_t* nbytes) { - tOffset ret = driver_->GetUsed(fs_); - CHECK_FAILURE(ret, "GetUsed"); - *nbytes = ret; - return Status::OK(); - } - - Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { - hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); - - if (entry == nullptr) { - return GetPathInfoFailed(path); - } - - SetPathInfo(entry, info); - driver_->FreeFileInfo(entry, 1); - - return Status::OK(); - } - - Status Stat(const std::string& path, FileStatistics* stat) { - HdfsPathInfo info; - RETURN_NOT_OK(GetPathInfo(path, &info)); - - stat->size = info.size; - stat->kind = info.kind; - return Status::OK(); - } - - Status GetChildren(const std::string& path, std::vector* listing) { - std::vector detailed_listing; - RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); - for (const auto& info : detailed_listing) { - listing->push_back(info.name); - } - return Status::OK(); - } - - Status ListDirectory(const std::string& path, std::vector* listing) { - int num_entries = 0; - errno = 0; - hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); - - if (entries == nullptr) { - // If the directory is empty, entries is NULL but errno is 0. Non-zero - // errno indicates error - // - // Note: errno is thread-local - // - // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set - // errno 2/ENOENT for empty directories. To be more robust to this we - // double check this case - if ((errno == 0) || (errno == ENOENT && Exists(path))) { - num_entries = 0; - } else { - return Status::IOError("HDFS list directory failed, errno: ", - TranslateErrno(errno)); - } - } - - // Allocate additional space for elements - int vec_offset = static_cast(listing->size()); - listing->resize(vec_offset + num_entries); - - for (int i = 0; i < num_entries; ++i) { - SetPathInfo(entries + i, &(*listing)[vec_offset + i]); - } - - // Free libhdfs file info - driver_->FreeFileInfo(entries, num_entries); - - return Status::OK(); - } - - Status OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr* file) { - hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); - - if (handle == nullptr) { - const char* msg = !Exists(path) ? "HDFS file does not exist: " - : "HDFS path exists, but opening file failed: "; - return Status::IOError(msg, path); - } - - // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsReadableFile()); - (*file)->impl_->set_members(path, driver_, fs_, handle); - (*file)->impl_->set_buffer_size(buffer_size); - - return Status::OK(); - } - - Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr* file) { - int flags = O_WRONLY; - if (append) flags |= O_APPEND; - - hdfsFile handle = - driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication, - static_cast(default_block_size)); - - if (handle == nullptr) { - return Status::IOError("Unable to open file ", path); - } - - // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsOutputStream()); - (*file)->impl_->set_members(path, driver_, fs_, handle); - - return Status::OK(); - } - - Status Rename(const std::string& src, const std::string& dst) { - int ret = driver_->Rename(fs_, src.c_str(), dst.c_str()); - CHECK_FAILURE(ret, "Rename"); - return Status::OK(); - } - - Status Chmod(const std::string& path, int mode) { - int ret = driver_->Chmod(fs_, path.c_str(), static_cast(mode)); // NOLINT - CHECK_FAILURE(ret, "Chmod"); - return Status::OK(); - } - - Status Chown(const std::string& path, const char* owner, const char* group) { - int ret = driver_->Chown(fs_, path.c_str(), owner, group); - CHECK_FAILURE(ret, "Chown"); - return Status::OK(); - } - - private: - internal::LibHdfsShim* driver_; - - std::string namenode_host_; - std::string user_; - int port_; - std::string kerb_ticket_; - - hdfsFS fs_; -}; - -// ---------------------------------------------------------------------- -// Public API for HDFSClient - -HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); } - -HadoopFileSystem::~HadoopFileSystem() {} - -Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config, - std::shared_ptr* fs) { - // ctor is private, make_shared will not work - *fs = std::shared_ptr(new HadoopFileSystem()); - - RETURN_NOT_OK((*fs)->impl_->Connect(config)); - return Status::OK(); -} - -Status HadoopFileSystem::MakeDirectory(const std::string& path) { - return impl_->MakeDirectory(path); -} - -Status HadoopFileSystem::Delete(const std::string& path, bool recursive) { - return impl_->Delete(path, recursive); -} - -Status HadoopFileSystem::DeleteDirectory(const std::string& path) { - return Delete(path, true); -} - -Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); } - -bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } - -Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) { - return impl_->GetPathInfo(path, info); -} - -Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) { - return impl_->Stat(path, stat); -} - -Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { - return impl_->GetCapacity(nbytes); -} - -Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } - -Status HadoopFileSystem::GetChildren(const std::string& path, - std::vector* listing) { - return impl_->GetChildren(path, listing); -} - -Status HadoopFileSystem::ListDirectory(const std::string& path, - std::vector* listing) { - return impl_->ListDirectory(path, listing); -} - -Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr* file) { - return impl_->OpenReadable(path, buffer_size, file); -} - -Status HadoopFileSystem::OpenReadable(const std::string& path, - std::shared_ptr* file) { - return OpenReadable(path, kDefaultHdfsBufferSize, file); -} - -Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, - int32_t buffer_size, int16_t replication, - int64_t default_block_size, - std::shared_ptr* file) { - return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size, - file); -} - -Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, - std::shared_ptr* file) { - return OpenWritable(path, append, 0, 0, 0, file); -} - -Status HadoopFileSystem::Chmod(const std::string& path, int mode) { - return impl_->Chmod(path, mode); -} - -Status HadoopFileSystem::Chown(const std::string& path, const char* owner, - const char* group) { - return impl_->Chown(path, owner, group); -} - -Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) { - return impl_->Rename(src, dst); -} - -// Deprecated in 0.11 - -Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append, - int32_t buffer_size, int16_t replication, - int64_t default_block_size, - std::shared_ptr* file) { - return OpenWritable(path, append, buffer_size, replication, default_block_size, file); -} - -Status HadoopFileSystem::OpenWriteable(const std::string& path, bool append, - std::shared_ptr* file) { - return OpenWritable(path, append, 0, 0, 0, file); -} - -// ---------------------------------------------------------------------- -// Allow public API users to check whether we are set up correctly - -Status HaveLibHdfs() { - internal::LibHdfsShim* driver; - return internal::ConnectLibHdfs(&driver); -} - -Status HaveLibHdfs3() { - internal::LibHdfsShim* driver; - return internal::ConnectLibHdfs3(&driver); -} - -} // namespace io +namespace io {} // namespace io } // namespace arrow diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 133142c33ba..a3ce06912c5 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_IO_HDFS -#define ARROW_IO_HDFS +#pragma once #include #include @@ -24,6 +23,8 @@ #include #include +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/hdfs.h" #include "arrow/io/interfaces.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -36,223 +37,36 @@ class Status; namespace io { -class HdfsReadableFile; -class HdfsOutputStream; +using fs::FileStats; +using fs::FileSystem; +using fs::FileType; +using fs::Selector; +using fs::TimePoint; -struct HdfsPathInfo { - ObjectType::type kind; +using fs::ObjectType; - std::string name; - std::string owner; - std::string group; +using HdfsPathInfo = fs::HadoopPathInfo; +using HdfsDriver = fs::HadoopDriver; +using HdfsConnectionConfig = fs::HadoopOptions; - // Access times in UNIX timestamps (seconds) - int64_t size; - int64_t block_size; - - int32_t last_modified_time; - int32_t last_access_time; - - int16_t replication; - int16_t permissions; -}; - -enum class HdfsDriver : char { LIBHDFS, LIBHDFS3 }; - -struct HdfsConnectionConfig { - std::string host; - int port; - std::string user; - std::string kerb_ticket; - std::unordered_map extra_conf; - HdfsDriver driver; -}; - -class ARROW_EXPORT HadoopFileSystem : public FileSystem { +class ARROW_EXPORT HadoopFileSystem : public fs::HadoopFileSystem { public: ~HadoopFileSystem() override; - // Connect to an HDFS cluster given a configuration - // - // @param config (in): configuration for connecting - // @param fs (out): the created client - // @returns Status - static Status Connect(const HdfsConnectionConfig* config, - std::shared_ptr* fs); - - // Create directory and all parents - // - // @param path (in): absolute HDFS path - // @returns Status - Status MakeDirectory(const std::string& path) override; - - // Delete file or directory - // @param path: absolute path to data - // @param recursive: if path is a directory, delete contents as well - // @returns error status on failure - Status Delete(const std::string& path, bool recursive = false); - - Status DeleteDirectory(const std::string& path) override; - - // Disconnect from cluster - // - // @returns Status - Status Disconnect(); - - // @param path (in): absolute HDFS path - // @returns bool, true if the path exists, false if not (or on error) - bool Exists(const std::string& path); - - // @param path (in): absolute HDFS path - // @param info (out) - // @returns Status - Status GetPathInfo(const std::string& path, HdfsPathInfo* info); - - // @param nbytes (out): total capacity of the filesystem - // @returns Status - Status GetCapacity(int64_t* nbytes); - - // @param nbytes (out): total bytes used of the filesystem - // @returns Status - Status GetUsed(int64_t* nbytes); - - Status GetChildren(const std::string& path, std::vector* listing) override; - - Status ListDirectory(const std::string& path, std::vector* listing); - - /// Change - /// - /// @param path file path to change - /// @param owner pass null for no change - /// @param group pass null for no change - Status Chown(const std::string& path, const char* owner, const char* group); - - /// Change path permissions + /// Connect to an HDFS cluster given a configuration /// - /// \param path Absolute path in file system - /// \param mode Mode bitset - /// \return Status - Status Chmod(const std::string& path, int mode); - - // Move file or directory from source path to destination path within the - // current filesystem - Status Rename(const std::string& src, const std::string& dst) override; - - Status Stat(const std::string& path, FileStatistics* stat) override; - - // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory - - // Open an HDFS file in READ mode. Returns error - // status if the file is not found. - // - // @param path complete file path - Status OpenReadable(const std::string& path, int32_t buffer_size, - std::shared_ptr* file); - - Status OpenReadable(const std::string& path, std::shared_ptr* file); - - // FileMode::WRITE options - // @param path complete file path - // @param buffer_size, 0 for default - // @param replication, 0 for default - // @param default_block_size, 0 for default - Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr* file); - - Status OpenWritable(const std::string& path, bool append, - std::shared_ptr* file); - - ARROW_DEPRECATED("Use OpenWritable") - Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size, - int16_t replication, int64_t default_block_size, - std::shared_ptr* file); - - ARROW_DEPRECATED("Use OpenWritable") - Status OpenWriteable(const std::string& path, bool append, - std::shared_ptr* file); - - private: - friend class HdfsReadableFile; - friend class HdfsOutputStream; - - class ARROW_NO_EXPORT HadoopFileSystemImpl; - std::unique_ptr impl_; - - HadoopFileSystem(); - ARROW_DISALLOW_COPY_AND_ASSIGN(HadoopFileSystem); + /// \param config (in): configuration for connecting + /// \param fs (out): the created client + /// \returns Status + static Status Connect(const HdfsConnectionConfig* options, + std::shared_ptr* fs) { + // FIXME(bkietz) delete this soon + std::shared_ptr fs_fs; + RETURN_NOT_OK(fs::HadoopFileSystem::Connect(*options, &fs_fs)); + *fs = std::static_pointer_cast(fs_fs); + return Status::OK(); + } }; -class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { - public: - ~HdfsReadableFile() override; - - Status Close() override; - - bool closed() const override; - - Status GetSize(int64_t* size) override; - - // NOTE: If you wish to read a particular range of a file in a multithreaded - // context, you may prefer to use ReadAt to avoid locking issues - Status Read(int64_t nbytes, int64_t* bytes_read, void* buffer) override; - - Status Read(int64_t nbytes, std::shared_ptr* out) override; - - Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, - void* buffer) override; - - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - - Status Seek(int64_t position) override; - Status Tell(int64_t* position) const override; - - void set_memory_pool(MemoryPool* pool); - - private: - explicit HdfsReadableFile(MemoryPool* pool = NULLPTR); - - class ARROW_NO_EXPORT HdfsReadableFileImpl; - std::unique_ptr impl_; - - friend class HadoopFileSystem::HadoopFileSystemImpl; - - ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); -}; - -// Naming this file OutputStream because it does not support seeking (like the -// WritableFile interface) -class ARROW_EXPORT HdfsOutputStream : public OutputStream { - public: - ~HdfsOutputStream() override; - - Status Close() override; - - bool closed() const override; - - using OutputStream::Write; - Status Write(const void* buffer, int64_t nbytes) override; - Status Write(const void* buffer, int64_t nbytes, int64_t* bytes_written); - - Status Flush() override; - - Status Tell(int64_t* position) const override; - - private: - class ARROW_NO_EXPORT HdfsOutputStreamImpl; - std::unique_ptr impl_; - - friend class HadoopFileSystem::HadoopFileSystemImpl; - - HdfsOutputStream(); - - ARROW_DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream); -}; - -Status ARROW_EXPORT HaveLibHdfs(); -Status ARROW_EXPORT HaveLibHdfs3(); - } // namespace io } // namespace arrow - -#endif // ARROW_IO_HDFS diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index fde8ed2789d..da775a104a3 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -35,32 +35,6 @@ struct FileMode { enum type { READ, WRITE, READWRITE }; }; -struct ObjectType { - enum type { FILE, DIRECTORY }; -}; - -struct ARROW_EXPORT FileStatistics { - /// Size of file, -1 if finding length is unsupported - int64_t size; - ObjectType::type kind; -}; - -class ARROW_EXPORT FileSystem { - public: - virtual ~FileSystem() = default; - - virtual Status MakeDirectory(const std::string& path) = 0; - - virtual Status DeleteDirectory(const std::string& path) = 0; - - virtual Status GetChildren(const std::string& path, - std::vector* listing) = 0; - - virtual Status Rename(const std::string& src, const std::string& dst) = 0; - - virtual Status Stat(const std::string& path, FileStatistics* stat) = 0; -}; - class ARROW_EXPORT FileInterface { public: virtual ~FileInterface() = 0; diff --git a/integration/hdfs/Dockerfile b/integration/hdfs/Dockerfile index 71db503436d..6d3cec2fa00 100644 --- a/integration/hdfs/Dockerfile +++ b/integration/hdfs/Dockerfile @@ -23,15 +23,19 @@ ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 \ HADOOP_HOME=/usr/local/hadoop \ HADOOP_OPTS=-Djava.library.path=/usr/local/hadoop/lib/native \ PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin -RUN apt-get update -y && \ - apt-get install -y --no-install-recommends openjdk-8-jdk && \ - apt-get clean && \ - rm -rf /var/lib/apt/lists/* - -RUN wget -q -O hadoop-$HADOOP_VERSION.tar.gz "https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz" && \ - tar -zxf /hadoop-$HADOOP_VERSION.tar.gz && \ - rm /hadoop-$HADOOP_VERSION.tar.gz && \ - mv /hadoop-$HADOOP_VERSION /usr/local/hadoop + +#ADD https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz /hadoop-$HADOOP_VERSION.tar.gz + +COPY integration/hdfs/hadoop-$HADOOP_VERSION.tar.gz /hadoop-$HADOOP_VERSION.tar.gz +RUN tar -zxf /hadoop-$HADOOP_VERSION.tar.gz +RUN rm /hadoop-$HADOOP_VERSION.tar.gz +RUN mv /hadoop-$HADOOP_VERSION $HADOOP_HOME + +RUN apt-get update -y +RUN apt-get install -y --no-install-recommends openjdk-8-jdk +RUN apt-get clean +RUN rm -rf /var/lib/apt/lists/* + COPY integration/hdfs/hdfs-site.xml $HADOOP_HOME/etc/hadoop/ # installing libhdfs3, it needs to be pinned, see ARROW-1465 and ARROW-1445 diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index dc29c10aed9..2fd6af3fb71 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -977,12 +977,6 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: int64_t block_size int16_t permissions - cdef cppclass HdfsReadableFile(CRandomAccessFile): - pass - - cdef cppclass HdfsOutputStream(COutputStream): - pass - cdef cppclass CHadoopFileSystem \ "arrow::io::HadoopFileSystem"(CIOFileSystem): @staticmethod @@ -1012,12 +1006,12 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: CStatus Rename(const c_string& src, const c_string& dst) CStatus OpenReadable(const c_string& path, - shared_ptr[HdfsReadableFile]* handle) + shared_ptr[CRandomAccessFile]* handle) CStatus OpenWritable(const c_string& path, c_bool append, int32_t buffer_size, int16_t replication, int64_t default_block_size, - shared_ptr[HdfsOutputStream]* handle) + shared_ptr[COutputStream]* handle) cdef cppclass CBufferReader \ " arrow::io::BufferReader"(CRandomAccessFile): diff --git a/python/pyarrow/io-hdfs.pxi b/python/pyarrow/io-hdfs.pxi index b224abbead8..58e62403c3a 100644 --- a/python/pyarrow/io-hdfs.pxi +++ b/python/pyarrow/io-hdfs.pxi @@ -410,8 +410,8 @@ cdef class HadoopFileSystem: cdef int16_t c_replication = replication or 0 cdef int64_t c_default_block_size = default_block_size or 0 - cdef shared_ptr[HdfsOutputStream] wr_handle - cdef shared_ptr[HdfsReadableFile] rd_handle + cdef shared_ptr[COutputStream] wr_handle + cdef shared_ptr[CRandomAccessFile] rd_handle if mode in ('wb', 'ab'): if mode == 'ab':