From 2987f7c7f119fa9f2b7ff1b00066b3d861f5eb3c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 4 Dec 2023 11:37:13 -0500 Subject: [PATCH] GH-38309: [C++] build filesystems as separate modules --- cpp/examples/arrow/CMakeLists.txt | 10 + .../arrow/filesystem_definition_example.cc | 151 +++++++++++++ .../arrow/filesystem_usage_example.cc | 55 +++++ cpp/src/arrow/dataset/partition.cc | 4 +- cpp/src/arrow/dataset/partition_test.cc | 2 +- .../engine/substrait/relation_internal.cc | 4 +- cpp/src/arrow/filesystem/CMakeLists.txt | 4 +- cpp/src/arrow/filesystem/azurefs.h | 16 +- cpp/src/arrow/filesystem/filesystem.cc | 199 +++++++++++++++++- cpp/src/arrow/filesystem/filesystem.h | 162 ++++++++++++-- cpp/src/arrow/filesystem/filesystem_library.h | 39 ++++ cpp/src/arrow/filesystem/gcsfs.cc | 4 +- cpp/src/arrow/filesystem/gcsfs.h | 3 +- cpp/src/arrow/filesystem/hdfs.cc | 2 +- cpp/src/arrow/filesystem/hdfs.h | 21 +- cpp/src/arrow/filesystem/hdfs_test.cc | 2 +- cpp/src/arrow/filesystem/localfs.cc | 18 +- cpp/src/arrow/filesystem/localfs.h | 15 +- cpp/src/arrow/filesystem/localfs_test.cc | 117 +++++++++- cpp/src/arrow/filesystem/mockfs.h | 25 +-- cpp/src/arrow/filesystem/path_util.cc | 2 +- cpp/src/arrow/filesystem/s3fs.cc | 2 +- cpp/src/arrow/filesystem/s3fs.h | 21 +- .../arrow/filesystem/s3fs_narrative_test.cc | 6 +- cpp/src/arrow/filesystem/s3fs_test.cc | 2 +- cpp/src/arrow/filesystem/util_internal.cc | 2 +- cpp/src/arrow/filesystem/util_internal.h | 2 +- cpp/src/arrow/flight/cookie_internal.cc | 4 +- cpp/src/arrow/flight/transport.h | 2 +- .../flight/transport/grpc/grpc_client.cc | 4 +- .../flight/transport/grpc/grpc_server.cc | 11 +- .../arrow/flight/transport/ucx/ucx_client.cc | 9 +- .../arrow/flight/transport/ucx/ucx_server.cc | 3 +- .../flight/transport/ucx/util_internal.cc | 2 +- .../flight/transport/ucx/util_internal.h | 2 +- cpp/src/arrow/flight/transport_server.h | 2 +- cpp/src/arrow/flight/types.cc | 2 +- cpp/src/arrow/flight/types.h | 6 +- cpp/src/arrow/io/hdfs_internal.cc | 124 ++++------- cpp/src/arrow/io/hdfs_internal.h | 14 +- cpp/src/arrow/testing/CMakeLists.txt | 6 + cpp/src/arrow/testing/examplefs.cc | 38 ++++ cpp/src/arrow/util/io_util.cc | 67 +++++- cpp/src/arrow/util/io_util.h | 41 +++- cpp/src/arrow/util/type_fwd.h | 1 + cpp/src/arrow/util/uri.cc | 33 +-- cpp/src/arrow/util/uri.h | 9 +- cpp/src/arrow/util/uri_test.cc | 6 +- cpp/src/arrow/util/visibility.h | 6 + docs/source/cpp/api/filesystem.rst | 16 +- docs/source/cpp/io.rst | 57 ++++- python/pyarrow/src/arrow/python/filesystem.h | 24 ++- 52 files changed, 1100 insertions(+), 279 deletions(-) create mode 100644 cpp/examples/arrow/filesystem_definition_example.cc create mode 100644 cpp/examples/arrow/filesystem_usage_example.cc create mode 100644 cpp/src/arrow/filesystem/filesystem_library.h create mode 100644 cpp/src/arrow/testing/examplefs.cc diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt index a092a31733f..a5b69a5d8b4 100644 --- a/cpp/examples/arrow/CMakeLists.txt +++ b/cpp/examples/arrow/CMakeLists.txt @@ -195,3 +195,13 @@ if(ARROW_GANDIVA) endif() add_arrow_example(gandiva_example EXTRA_LINK_LIBS ${GANDIVA_EXAMPLE_LINK_LIBS}) endif() + +if(ARROW_FILESYSTEM) + add_library(filesystem_definition_example MODULE filesystem_definition_example.cc) + target_link_libraries(filesystem_definition_example ${ARROW_EXAMPLE_LINK_LIBS}) + + add_arrow_example(filesystem_usage_example) + target_compile_definitions(filesystem-usage-example + PUBLIC FILESYSTEM_EXAMPLE_LIBPATH="$" + ) +endif() diff --git a/cpp/examples/arrow/filesystem_definition_example.cc b/cpp/examples/arrow/filesystem_definition_example.cc new file mode 100644 index 00000000000..efe1bd10470 --- /dev/null +++ b/cpp/examples/arrow/filesystem_definition_example.cc @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +// Demonstrate registering a user-defined Arrow FileSystem outside +// of the Arrow source tree. + +using arrow::Result; +using arrow::Status; +namespace io = arrow::io; +namespace fs = arrow::fs; + +class ExampleFileSystem : public fs::FileSystem { + public: + explicit ExampleFileSystem(const io::IOContext& io_context) + : fs::FileSystem{io_context} {} + + // This is a mock filesystem whose root directory contains a single file. + // All operations which would mutate will simply raise an error. + static constexpr std::string_view kPath = "example_file"; + static constexpr std::string_view kContents = "hello world"; + static fs::FileInfo info() { + fs::FileInfo info; + info.set_path(std::string{kPath}); + info.set_type(fs::FileType::File); + info.set_size(kContents.size()); + return info; + } + + static Status NotFound(std::string_view path) { + return Status::IOError("Path does not exist '", path, "'"); + } + + static Status NoMutation() { + return Status::IOError("operations which would mutate are not permitted"); + } + + Result PathFromUri(const std::string& uri_string) const override { + ARROW_ASSIGN_OR_RAISE(auto uri, arrow::util::Uri::FromString(uri_string)); + return uri.path(); + } + + std::string type_name() const override { return "example"; } + + bool Equals(const FileSystem& other) const override { + return type_name() == other.type_name(); + } + + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + + Result GetFileInfo(const std::string& path) override { + if (path == kPath) { + return info(); + } + return NotFound(path); + } + + Result> GetFileInfo(const fs::FileSelector& select) override { + if (select.base_dir == "/" || select.base_dir == "") { + return std::vector{info()}; + } + if (select.allow_not_found) { + return std::vector{}; + } + return NotFound(select.base_dir); + } + + Status CreateDir(const std::string& path, bool recursive) override { + return NoMutation(); + } + + Status DeleteDir(const std::string& path) override { return NoMutation(); } + + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override { + return NoMutation(); + } + + Status DeleteRootDirContents() override { return NoMutation(); } + + Status DeleteFile(const std::string& path) override { return NoMutation(); } + + Status Move(const std::string& src, const std::string& dest) override { + return NoMutation(); + } + + Status CopyFile(const std::string& src, const std::string& dest) override { + return NoMutation(); + } + + Result> OpenInputStream( + const std::string& path) override { + return OpenInputFile(path); + } + + Result> OpenInputFile( + const std::string& path) override { + if (path == kPath) { + return io::BufferReader::FromString(std::string{kContents}); + } + return NotFound(path); + } + + Result> OpenOutputStream( + const std::string& path, + const std::shared_ptr& metadata) override { + return NoMutation(); + } + + Result> OpenAppendStream( + const std::string& path, + const std::shared_ptr& metadata) override { + return NoMutation(); + } +}; + +fs::FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const arrow::util::Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + auto fs = std::make_shared(io_context); + if (out_path) { + ARROW_ASSIGN_OR_RAISE(*out_path, fs->PathFromUri(uri.ToString())); + } + return fs; + }, +}; diff --git a/cpp/examples/arrow/filesystem_usage_example.cc b/cpp/examples/arrow/filesystem_usage_example.cc new file mode 100644 index 00000000000..e674586bc63 --- /dev/null +++ b/cpp/examples/arrow/filesystem_usage_example.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +namespace fs = arrow::fs; + +// Demonstrate dynamically loading a user-defined Arrow FileSystem + +arrow::Status Execute() { + ARROW_RETURN_NOT_OK(arrow::fs::LoadFileSystemFactories(FILESYSTEM_EXAMPLE_LIBPATH)); + + std::string uri = "example:///example_file"; + std::cout << "Uri: " << uri << std::endl; + + std::string path; + ARROW_ASSIGN_OR_RAISE(auto fs, arrow::fs::FileSystemFromUri(uri, &path)); + std::cout << "Path: " << path << std::endl; + + fs::FileSelector sel; + sel.base_dir = "/"; + ARROW_ASSIGN_OR_RAISE(auto infos, fs->GetFileInfo(sel)); + + std::cout << "Root directory contains:" << std::endl; + for (const auto& info : infos) { + std::cout << "- " << info << std::endl; + } + return arrow::Status::OK(); +} + +int main() { + auto status = Execute(); + if (!status.ok()) { + std::cerr << "Error occurred : " << status.message() << std::endl; + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index bf7145ca936..4d5c6e8ce7a 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -53,7 +53,7 @@ namespace dataset { namespace { /// Apply UriUnescape, then ensure the results are valid UTF-8. Result SafeUriUnescape(std::string_view encoded) { - auto decoded = ::arrow::internal::UriUnescape(encoded); + auto decoded = ::arrow::util::UriUnescape(encoded); if (!util::ValidateUTF8(decoded)) { return Status::Invalid("Partition segment was not valid UTF-8 after URL decoding: ", encoded); @@ -755,7 +755,7 @@ Result HivePartitioning::FormatValues( // field_index <-> path nesting relation segments[i] = name + "=" + hive_options_.null_fallback; } else { - segments[i] = name + "=" + arrow::internal::UriEscape(values[i]->ToString()); + segments[i] = name + "=" + arrow::util::UriEscape(values[i]->ToString()); } } diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 1b71be15d19..9f0bd7c0be0 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -935,7 +935,7 @@ TEST_F(TestPartitioning, WriteHiveWithSlashesInValues) { "experiment/A/f.csv", "experiment/B/f.csv", "experiment/C/k.csv", "experiment/M/i.csv"}; for (auto partition : unique_partitions) { - encoded_paths.push_back("part=" + arrow::internal::UriEscape(partition)); + encoded_paths.push_back("part=" + arrow::util::UriEscape(partition)); } ASSERT_EQ(all_dirs.size(), encoded_paths.size()); diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc index 73f55c27ee8..f15f1a5527b 100644 --- a/cpp/src/arrow/engine/substrait/relation_internal.cc +++ b/cpp/src/arrow/engine/substrait/relation_internal.cc @@ -67,7 +67,7 @@ namespace arrow { using internal::checked_cast; using internal::StartsWith; using internal::ToChars; -using internal::UriFromAbsolutePath; +using util::UriFromAbsolutePath; namespace engine { @@ -463,7 +463,7 @@ Result FromProto(const substrait::Rel& rel, const ExtensionSet& } // Extract and parse the read relation's source URI - ::arrow::internal::Uri item_uri; + ::arrow::util::Uri item_uri; switch (item.path_type_case()) { case substrait::ReadRel::LocalFiles::FileOrFiles::kUriPath: RETURN_NOT_OK(item_uri.Parse(item.uri_path())); diff --git a/cpp/src/arrow/filesystem/CMakeLists.txt b/cpp/src/arrow/filesystem/CMakeLists.txt index b9ed11e7608..deac04af72f 100644 --- a/cpp/src/arrow/filesystem/CMakeLists.txt +++ b/cpp/src/arrow/filesystem/CMakeLists.txt @@ -26,7 +26,9 @@ add_arrow_test(filesystem-test filesystem_test.cc localfs_test.cc EXTRA_LABELS - filesystem) + filesystem + DEFINITIONS + ARROW_FILESYSTEM_EXAMPLE_LIBPATH="$") if(ARROW_BUILD_BENCHMARKS) add_arrow_benchmark(localfs_benchmark diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 2a131e40c05..770f6988189 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -196,15 +196,23 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; @@ -246,11 +254,11 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem { Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; }; } // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 810e9c179b1..01f166d87fb 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -15,9 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include +#include #include +#include #include +#include "arrow/type_fwd.h" #include "arrow/util/config.h" #include "arrow/filesystem/filesystem.h" @@ -43,19 +47,22 @@ #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/parallel.h" +#include "arrow/util/string.h" #include "arrow/util/uri.h" #include "arrow/util/vector.h" +#include "arrow/util/visibility.h" #include "arrow/util/windows_fixup.h" namespace arrow { using internal::checked_pointer_cast; using internal::TaskHints; -using internal::Uri; using io::internal::SubmitIO; +using util::Uri; namespace fs { +using arrow::internal::GetEnvVar; using internal::ConcatAbstractPath; using internal::EnsureTrailingSlash; using internal::GetAbstractPathParent; @@ -128,7 +135,7 @@ std::string FileInfo::extension() const { ////////////////////////////////////////////////////////////////////////// // FileSystem default method implementations -FileSystem::~FileSystem() {} +FileSystem::~FileSystem() = default; Result FileSystem::NormalizePath(std::string path) { return path; } @@ -203,6 +210,10 @@ Future<> FileSystem::DeleteDirContentsAsync(const std::string& path, }); } +Future<> FileSystem::DeleteDirContentsAsync(const std::string& path) { + return DeleteDirContentsAsync(path, false); +} + Result> FileSystem::OpenInputStream( const FileInfo& info) { RETURN_NOT_OK(ValidateInputFileInfo(info)); @@ -279,7 +290,7 @@ SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path, base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()), base_fs_(base_fs) {} -SubTreeFileSystem::~SubTreeFileSystem() {} +SubTreeFileSystem::~SubTreeFileSystem() = default; Result SubTreeFileSystem::NormalizeBasePath( std::string base_path, const std::shared_ptr& base_fs) { @@ -674,6 +685,157 @@ Status CopyFiles(const std::shared_ptr& source_fs, return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); } +class FileSystemFactoryRegistry { + public: + static FileSystemFactoryRegistry* GetInstance() { + static FileSystemFactoryRegistry registry; + if (registry.merged_into_ != nullptr) { + return registry.merged_into_; + } + return ®istry; + } + + Result FactoryForScheme(const std::string& scheme) { + std::shared_lock lock{mutex_}; + RETURN_NOT_OK(CheckValid()); + + auto it = scheme_to_factory_.find(scheme); + if (it == scheme_to_factory_.end()) return nullptr; + + return it->second.Map([](const auto& r) { return &r.factory; }); + } + + Status MergeInto(FileSystemFactoryRegistry* main_registry) { + std::unique_lock lock{mutex_}; + RETURN_NOT_OK(CheckValid()); + + std::unique_lock main_lock{main_registry->mutex_}; + RETURN_NOT_OK(main_registry->CheckValid()); + + std::vector duplicated_schemes; + for (auto& [scheme, registered] : scheme_to_factory_) { + if (!registered.ok()) { + duplicated_schemes.emplace_back(scheme); + continue; + } + + auto [it, success] = + main_registry->scheme_to_factory_.emplace(std::move(scheme), registered); + if (success) continue; + + duplicated_schemes.emplace_back(it->first); + } + scheme_to_factory_.clear(); + merged_into_ = main_registry; + + if (duplicated_schemes.empty()) return Status::OK(); + return Status::KeyError("Attempted to register ", duplicated_schemes.size(), + " factories for schemes ['", + arrow::internal::JoinStrings(duplicated_schemes, "', '"), + "'] but those schemes were already registered."); + } + + void EnsureFinalized() { + std::unique_lock lock{mutex_}; + if (finalized_) return; + + for (const auto& [_, registered_or_error] : scheme_to_factory_) { + if (!registered_or_error.ok()) continue; + registered_or_error->finalizer(); + } + finalized_ = true; + } + + Status RegisterFactory(std::string scheme, FileSystemFactory factory, + std::function finalizer, bool defer_error) { + std::unique_lock lock{mutex_}; + RETURN_NOT_OK(CheckValid()); + + auto [it, success] = scheme_to_factory_.emplace( + std::move(scheme), Registered{std::move(factory), std::move(finalizer)}); + if (success) { + return Status::OK(); + } + + auto st = Status::KeyError( + "Attempted to register factory for " + "scheme '", + it->first, + "' but that scheme is already " + "registered."); + if (!defer_error) return st; + + it->second = std::move(st); + return Status::OK(); + } + + private: + struct Registered { + FileSystemFactory factory; + std::function finalizer; + }; + + Status CheckValid() { + if (finalized_) { + return Status::Invalid("FileSystem factories were already finalized!"); + } + if (merged_into_ != nullptr) { + return Status::Invalid( + "FileSystem factories were merged into a different registry!"); + } + return Status::OK(); + } + + std::shared_mutex mutex_; + std::unordered_map> scheme_to_factory_; + bool finalized_ = false; + FileSystemFactoryRegistry* merged_into_ = nullptr; +}; + +Status RegisterFileSystemFactory(std::string scheme, FileSystemFactory factory, + std::function finalizer) { + return FileSystemFactoryRegistry::GetInstance()->RegisterFactory( + std::move(scheme), factory, std::move(finalizer), + /*defer_error=*/false); +} + +void EnsureFinalized() { FileSystemFactoryRegistry::GetInstance()->EnsureFinalized(); } + +FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory factory, + std::function finalizer) { + DCHECK_OK(FileSystemFactoryRegistry::GetInstance()->RegisterFactory( + std::move(scheme), std::move(factory), std::move(finalizer), + /*defer_error=*/true)); +} + +namespace internal { +void* GetFileSystemRegistry() { return FileSystemFactoryRegistry::GetInstance(); } +} // namespace internal + +Status LoadFileSystemFactories(const char* libpath) { + using ::arrow::internal::GetSymbolAs; + using ::arrow::internal::LoadDynamicLibrary; + + ARROW_ASSIGN_OR_RAISE(void* lib, LoadDynamicLibrary(libpath)); + auto* get_instance = + GetSymbolAs(lib, "arrow_filesystem_get_registry").ValueOr(nullptr); + if (get_instance == nullptr) { + // If a third party library is linked such that registry deduplication is not + // necessary (for example if built with `ARROW_BUILD_SHARED`), we do not require that + // library to export arrow_filesystem_get_registry() since that symbol is not used + // except for deduplication. + return Status::OK(); + } + + auto* lib_registry = static_cast(get_instance()); + auto* main_registry = FileSystemFactoryRegistry::GetInstance(); + if (lib_registry != main_registry) { + RETURN_NOT_OK(lib_registry->MergeInto(main_registry)); + } + + return Status::OK(); +} + namespace { Result> FileSystemFromUriReal(const Uri& uri, @@ -682,6 +844,15 @@ Result> FileSystemFromUriReal(const Uri& uri, std::string* out_path) { const auto scheme = uri.scheme(); + { + ARROW_ASSIGN_OR_RAISE( + auto* factory, + FileSystemFactoryRegistry::GetInstance()->FactoryForScheme(scheme)); + if (factory != nullptr) { + return (*factory)(uri, io_context, out_path); + } + } + if (scheme == "file") { std::string path; ARROW_ASSIGN_OR_RAISE(auto options, LocalFileSystemOptions::FromUri(uri, &path)); @@ -695,7 +866,9 @@ Result> FileSystemFromUriReal(const Uri& uri, ARROW_ASSIGN_OR_RAISE(auto options, GcsOptions::FromUri(uri, out_path)); return GcsFileSystem::Make(options, io_context); #else - return Status::NotImplemented("Got GCS URI but Arrow compiled without GCS support"); + return Status::NotImplemented( + "Got GCS URI but Arrow compiled " + "without GCS support"); #endif } @@ -708,7 +881,9 @@ Result> FileSystemFromUriReal(const Uri& uri, ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options, io_context)); return hdfs; #else - return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support"); + return Status::NotImplemented( + "Got HDFS URI but Arrow compiled " + "without HDFS support"); #endif } if (scheme == "s3") { @@ -718,13 +893,17 @@ Result> FileSystemFromUriReal(const Uri& uri, ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context)); return s3fs; #else - return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support"); + return Status::NotImplemented( + "Got S3 URI but Arrow compiled " + "without S3 support"); #endif } if (scheme == "mock") { - // MockFileSystem does not have an absolute / relative path distinction, - // normalize path by removing leading slash. + // MockFileSystem does not have an + // absolute / relative path distinction, + // normalize path by removing leading + // slash. if (out_path != nullptr) { *out_path = std::string(RemoveLeadingSlash(uri.path())); } @@ -760,8 +939,8 @@ Result> FileSystemFromUriOrPath( if (internal::DetectAbsolutePath(uri_string)) { // Normalize path separators if (out_path != nullptr) { - *out_path = - std::string(RemoveTrailingSlash(ToSlashes(uri_string), /*preserve_root=*/true)); + *out_path = std::string(RemoveTrailingSlash(ToSlashes(uri_string), + /*preserve_root=*/true)); } return std::make_shared(); } diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 559f1335f12..272e42256a3 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -38,6 +38,8 @@ namespace arrow { namespace fs { +using arrow::util::Uri; + // A system clock time point expressed as a 64-bit (or more) number of // nanoseconds since the epoch. using TimePoint = @@ -156,7 +158,11 @@ struct IterationTraits { namespace fs { /// \brief Abstract file system API -class ARROW_EXPORT FileSystem : public std::enable_shared_from_this { +class ARROW_EXPORT FileSystem + /// \cond false + : public std::enable_shared_from_this +/// \endcond +{ // NOLINT public: virtual ~FileSystem(); @@ -225,7 +231,8 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Create a directory and subdirectories. /// /// This function succeeds if the directory already exists. - virtual Status CreateDir(const std::string& path, bool recursive = true) = 0; + virtual Status CreateDir(const std::string& path, bool recursive) = 0; + Status CreateDir(const std::string& path) { return CreateDir(path, true); } /// Delete a directory and its contents, recursively. virtual Status DeleteDir(const std::string& path) = 0; @@ -234,12 +241,18 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// /// Like DeleteDir, but doesn't delete the directory itself. /// Passing an empty path ("" or "/") is disallowed, see DeleteRootDirContents. - virtual Status DeleteDirContents(const std::string& path, - bool missing_dir_ok = false) = 0; + virtual Status DeleteDirContents(const std::string& path, bool missing_dir_ok) = 0; + Status DeleteDirContents(const std::string& path) { + return DeleteDirContents(path, false); + } /// Async version of DeleteDirContents. - virtual Future<> DeleteDirContentsAsync(const std::string& path, - bool missing_dir_ok = false); + virtual Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok); + + /// Async version of DeleteDirContents. + /// + /// This overload allows missing directories. + Future<> DeleteDirContentsAsync(const std::string& path); /// EXPERIMENTAL: Delete the root directory's contents, recursively. /// @@ -272,6 +285,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input stream for sequential reading. virtual Result> OpenInputStream( const std::string& path) = 0; + /// Open an input stream for sequential reading. /// /// This override assumes the given FileInfo validly represents the file's @@ -282,6 +296,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Open an input file for random access reading. virtual Result> OpenInputFile( const std::string& path) = 0; + /// Open an input file for random access reading. /// /// This override assumes the given FileInfo validly represents the file's @@ -293,6 +308,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Async version of OpenInputStream virtual Future> OpenInputStreamAsync( const std::string& path); + /// Async version of OpenInputStream virtual Future> OpenInputStreamAsync( const FileInfo& info); @@ -300,6 +316,7 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// Async version of OpenInputFile virtual Future> OpenInputFileAsync( const std::string& path); + /// Async version of OpenInputFile virtual Future> OpenInputFileAsync( const FileInfo& info); @@ -335,6 +352,9 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this bool default_async_is_sync_ = true; }; +using FileSystemFactory = std::function>( + const Uri& uri, const io::IOContext& io_context, std::string* out_path)>; + /// \brief A FileSystem implementation that delegates to another /// implementation after prepending a fixed base path. /// @@ -361,17 +381,22 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -399,13 +424,13 @@ class ARROW_EXPORT SubTreeFileSystem : public FileSystem { Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: - SubTreeFileSystem() {} + SubTreeFileSystem() = default; const std::string base_path_; std::shared_ptr base_fs_; @@ -433,14 +458,21 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -458,16 +490,25 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { const FileInfo& info) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: std::shared_ptr base_fs_; std::shared_ptr latencies_; }; +/// \brief Ensure all registered filesystem implementations are finalized. +/// +/// Individual finalizers may wait for concurrent calls to finish so as to avoid +/// race conditions. After this function has been called, all filesystem APIs +/// will fail with an error. +/// +/// The user is responsible for synchronization of calls to this function. +void EnsureFinalized(); + /// \defgroup filesystem-factories Functions for creating FileSystem instances /// /// @{ @@ -477,6 +518,8 @@ class ARROW_EXPORT SlowFileSystem : public FileSystem { /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", /// "gs" and "gcs". /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[out] out_path (optional) Path inside the filesystem. /// \return out_fs FileSystem instance. @@ -489,6 +532,8 @@ Result> FileSystemFromUri(const std::string& uri, /// Recognized schemes are "file", "mock", "hdfs", "viewfs", "s3", /// "gs" and "gcs". /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// \param[in] uri a URI-based path, ex: file:///some/local/path /// \param[in] io_context an IOContext which will be associated with the filesystem /// \param[out] out_path (optional) Path inside the filesystem. @@ -500,6 +545,8 @@ Result> FileSystemFromUri(const std::string& uri, /// \brief Create a new FileSystem by URI /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// Same as FileSystemFromUri, but in addition also recognize non-URIs /// and treat them as local filesystem paths. Only absolute local filesystem /// paths are allowed. @@ -509,6 +556,8 @@ Result> FileSystemFromUriOrPath( /// \brief Create a new FileSystem by URI with a custom IO context /// +/// Support for other schemes can be added using RegisterFileSystemFactory. +/// /// Same as FileSystemFromUri, but in addition also recognize non-URIs /// and treat them as local filesystem paths. Only absolute local filesystem /// paths are allowed. @@ -519,6 +568,89 @@ Result> FileSystemFromUriOrPath( /// @} +/// \defgroup filesystem-factory-registration Helpers for FileSystem registration +/// +/// @{ + +/// \brief Register a FileSystem factory +/// +/// Support for custom URI schemes can be added by registering a factory +/// for the corresponding FileSystem. +/// +/// \param[in] scheme a Uri scheme which the factory will handle. +/// If a factory has already been registered for a scheme, +/// the new factory will be ignored. +/// \param[in] factory a function which can produce a FileSystem for Uris which match +/// scheme. +/// \param[in] finalizer a function which must be called to finalize the factory before +/// the process exits, or nullptr if no finalization is necessary. +/// \return raises KeyError if a name collision occurs. +ARROW_EXPORT Status RegisterFileSystemFactory(std::string scheme, + FileSystemFactory factory, + std::function finalizer = {}); + +/// \brief Register FileSystem factories from a shared library +/// +/// FileSystem implementations may be housed in separate shared libraries and only +/// registered when the shared library is explicitly loaded. FileSystemRegistrar is +/// provided to simplify definition of such libraries: each instance at namespace scope +/// in the library will register a factory for a scheme. Any library which uses +/// FileSystemRegistrars and which must be dynamically loaded should be loaded using +/// LoadFileSystemFactories(), which will additionally merge registries are if necessary +/// (static linkage to arrow can produce isolated registries). +ARROW_EXPORT Status LoadFileSystemFactories(const char* libpath); + +struct ARROW_EXPORT FileSystemRegistrar { + /// \brief Register a FileSystem factory at load time + /// + /// Support for custom URI schemes can be added by registering a factory for the + /// corresponding FileSystem. An instance of this helper can be defined at namespace + /// scope to cause the factory to be registered at load time. + /// + /// Global constructors will finish execution before main() starts if the registrar is + /// linked into the same binary as main(), or before dlopen()/LoadLibrary() returns if + /// the library in which the registrar is defined is dynamically loaded. + /// + /// \code + /// FileSystemRegistrar kSlowFileSystemModule{ + /// "slowfile", + /// [](const Uri& uri, const io::IOContext& io_context, std::string* out_path) + /// ->Result> { + /// auto local_uri = "file" + uri.ToString().substr(uri.scheme().size()); + /// ARROW_ASSIGN_OR_RAISE(auto base_fs, + /// FileSystemFromUri(local_uri, io_context, out_path)); + /// double average_latency = 1; + /// int32_t seed = 0xDEADBEEF; + /// ARROW_ASSIGN_OR_RAISE(auto params, uri.query_item()); + /// for (const auto& [key, value] : params) { + /// if (key == "average_latency") { + /// average_latency = std::stod(value); + /// } + /// if (key == "seed") { + /// seed = std::stoi(value, nullptr, /*base=*/16); + /// } + /// } + /// return std::make_shared(base_fs, average_latency, seed); + /// })); + /// \endcode + /// + /// \param[in] scheme a Uri scheme which the factory will handle. + /// If a factory has already been registered for a scheme, the + /// new factory will be ignored. + /// \param[in] factory a function which can produce a FileSystem for Uris which match + /// scheme. + /// \param[in] finalizer a function which must be called to finalize the factory before + /// the process exits, or nullptr if no finalization is necessary. + FileSystemRegistrar(std::string scheme, FileSystemFactory factory, + std::function finalizer = {}); +}; + +/// @} + +namespace internal { +ARROW_EXPORT void* GetFileSystemRegistry(); +} // namespace internal + /// \brief Copy files, including from one FileSystem to another /// /// If a source and destination are resident in the same FileSystem FileSystem::CopyFile diff --git a/cpp/src/arrow/filesystem/filesystem_library.h b/cpp/src/arrow/filesystem/filesystem_library.h new file mode 100644 index 00000000000..d610c72237a --- /dev/null +++ b/cpp/src/arrow/filesystem/filesystem_library.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/filesystem/filesystem.h" + +namespace arrow::fs { +extern "C" { + +// ARROW_FORCE_EXPORT ensures this function's visibility is +// _declspec(dllexport)/[[gnu::visibility("default")]] even when +// this header is #included by a non-arrow source, as in a third +// party filesystem implementation. +ARROW_FORCE_EXPORT void* arrow_filesystem_get_registry() { + // In the case where libarrow is linked statically both to the executable and to a + // dynamically loaded filesystem implementation library, the library contains a + // duplicate definition of the registry into which the library's instances of + // FileSystemRegistrar insert their factories. This function is made accessible to + // dlsym/GetProcAddress to enable detection of such duplicate registries and merging + // into the registry accessible to the executable. + return internal::GetFileSystemRegistry(); +} +} +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/gcsfs.cc b/cpp/src/arrow/filesystem/gcsfs.cc index d41cb49022c..8eaa923fedd 100644 --- a/cpp/src/arrow/filesystem/gcsfs.cc +++ b/cpp/src/arrow/filesystem/gcsfs.cc @@ -751,7 +751,7 @@ GcsOptions GcsOptions::FromServiceAccountCredentials(const std::string& json_obj return options; } -Result GcsOptions::FromUri(const arrow::internal::Uri& uri, +Result GcsOptions::FromUri(const arrow::util::Uri& uri, std::string* out_path) { const auto bucket = uri.host(); auto path = uri.path(); @@ -815,7 +815,7 @@ Result GcsOptions::FromUri(const arrow::internal::Uri& uri, Result GcsOptions::FromUri(const std::string& uri_string, std::string* out_path) { - arrow::internal::Uri uri; + arrow::util::Uri uri; RETURN_NOT_OK(uri.Parse(uri_string)); return FromUri(uri, out_path); } diff --git a/cpp/src/arrow/filesystem/gcsfs.h b/cpp/src/arrow/filesystem/gcsfs.h index e4a1edfd6bf..f1fbc95bf95 100644 --- a/cpp/src/arrow/filesystem/gcsfs.h +++ b/cpp/src/arrow/filesystem/gcsfs.h @@ -146,8 +146,7 @@ struct ARROW_EXPORT GcsOptions { static GcsOptions FromServiceAccountCredentials(const std::string& json_object); /// Initialize from URIs such as "gs://bucket/object". - static Result FromUri(const arrow::internal::Uri& uri, - std::string* out_path); + static Result FromUri(const arrow::util::Uri& uri, std::string* out_path); static Result FromUri(const std::string& uri, std::string* out_path); }; diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc index b227aae65d9..d59b2a342d7 100644 --- a/cpp/src/arrow/filesystem/hdfs.cc +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -35,7 +35,7 @@ namespace arrow { using internal::ErrnoFromStatus; using internal::ParseValue; -using internal::Uri; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 798aac0ea90..25604a39e3a 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -25,8 +25,7 @@ #include "arrow/io/hdfs.h" #include "arrow/util/uri.h" -namespace arrow { -namespace fs { +namespace arrow::fs { /// Options for the HDFS implementation. struct ARROW_EXPORT HdfsOptions { @@ -51,7 +50,7 @@ struct ARROW_EXPORT HdfsOptions { bool Equals(const HdfsOptions& other) const; - static Result FromUri(const ::arrow::internal::Uri& uri); + static Result FromUri(const ::arrow::util::Uri& uri); static Result FromUri(const std::string& uri); }; @@ -69,16 +68,21 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; @@ -94,10 +98,10 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; /// Create a HdfsFileSystem instance from the given options. static Result> Make( @@ -110,5 +114,4 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { std::unique_ptr impl_; }; -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/hdfs_test.cc b/cpp/src/arrow/filesystem/hdfs_test.cc index 7ad9e6cd40e..db5cefef374 100644 --- a/cpp/src/arrow/filesystem/hdfs_test.cc +++ b/cpp/src/arrow/filesystem/hdfs_test.cc @@ -34,7 +34,7 @@ namespace arrow { using internal::ErrnoFromStatus; -using internal::Uri; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 01ac9463791..fbb33fd0086 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -24,10 +24,10 @@ #ifdef _WIN32 #include "arrow/util/windows_compatibility.h" #else -#include #include -#include #include +#include +#include #endif #include "arrow/filesystem/filesystem.h" @@ -42,8 +42,7 @@ #include "arrow/util/uri.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace fs { +namespace arrow::fs { using ::arrow::internal::IOErrorFromErrno; #ifdef _WIN32 @@ -217,9 +216,7 @@ Status StatSelector(const PlatformFilename& dir_fn, const FileSelector& select, } // namespace -LocalFileSystemOptions LocalFileSystemOptions::Defaults() { - return LocalFileSystemOptions(); -} +LocalFileSystemOptions LocalFileSystemOptions::Defaults() { return {}; } bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { return use_mmap == other.use_mmap && directory_readahead == other.directory_readahead && @@ -227,7 +224,7 @@ bool LocalFileSystemOptions::Equals(const LocalFileSystemOptions& other) const { } Result LocalFileSystemOptions::FromUri( - const ::arrow::internal::Uri& uri, std::string* out_path) { + const ::arrow::util::Uri& uri, std::string* out_path) { if (!uri.username().empty() || !uri.password().empty()) { return Status::Invalid("Unsupported username or password in local URI: '", uri.ToString(), "'"); @@ -260,7 +257,7 @@ LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options, const io::IOContext& io_context) : FileSystem(io_context), options_(options) {} -LocalFileSystem::~LocalFileSystem() {} +LocalFileSystem::~LocalFileSystem() = default; Result LocalFileSystem::NormalizePath(std::string path) { return DoNormalizePath(std::move(path)); @@ -689,5 +686,4 @@ Result> LocalFileSystem::OpenAppendStream( return OpenOutputStreamGeneric(path, truncate, append); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h index 108530c2b27..45a3da317f6 100644 --- a/cpp/src/arrow/filesystem/localfs.h +++ b/cpp/src/arrow/filesystem/localfs.h @@ -62,7 +62,7 @@ struct ARROW_EXPORT LocalFileSystemOptions { bool Equals(const LocalFileSystemOptions& other) const; - static Result FromUri(const ::arrow::internal::Uri& uri, + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path); }; @@ -89,16 +89,21 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { LocalFileSystemOptions options() const { return options_; } /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -113,10 +118,10 @@ class ARROW_EXPORT LocalFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; protected: LocalFileSystemOptions options_; diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index 7ce2a569686..f90833a88d1 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -35,14 +35,12 @@ #include "arrow/util/io_util.h" #include "arrow/util/uri.h" -namespace arrow { -namespace fs { -namespace internal { +namespace arrow::fs::internal { using ::arrow::internal::FileDescriptor; using ::arrow::internal::PlatformFilename; using ::arrow::internal::TemporaryDir; -using ::arrow::internal::UriFromAbsolutePath; +using ::arrow::util::UriFromAbsolutePath; class LocalFSTestMixin : public ::testing::Test { public: @@ -83,6 +81,108 @@ Result> FSFromUriOrPath(const std::string& uri, return FileSystemFromUriOrPath(uri, out_path); } +//////////////////////////////////////////////////////////////////////////// +// Registered FileSystemFactory tests + +class SlowFileSystemPublicProps : public SlowFileSystem { + public: + SlowFileSystemPublicProps(std::shared_ptr base_fs, double average_latency, + int32_t seed) + : SlowFileSystem(base_fs, average_latency, seed), + average_latency{average_latency}, + seed{seed} {} + double average_latency; + int32_t seed; +}; + +Result> SlowFileSystemFactory(const Uri& uri, + const io::IOContext& io_context, + std::string* out_path) { + auto local_uri = "file" + uri.ToString().substr(uri.scheme().size()); + ARROW_ASSIGN_OR_RAISE(auto base_fs, FileSystemFromUri(local_uri, io_context, out_path)); + double average_latency = 1; + int32_t seed = 0xDEADBEEF; + ARROW_ASSIGN_OR_RAISE(auto params, uri.query_items()); + for (const auto& [key, value] : params) { + if (key == "average_latency") { + average_latency = std::stod(value); + } + if (key == "seed") { + seed = std::stoi(value, nullptr, /*base=*/16); + } + } + return std::make_shared(base_fs, average_latency, seed); +} +FileSystemRegistrar kSlowFileSystemModule{ + "slowfile", + SlowFileSystemFactory, +}; + +TEST(FileSystemFromUri, LinkedRegisteredFactory) { + // Since the registrar's definition is in this translation unit (which is linked to the + // unit test executable), its factory will be registered be loaded automatically before + // main() is entered. + std::string path; + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); + + ASSERT_OK_AND_ASSIGN( + fs, FileSystemFromUri("slowfile:///hey/yo?seed=42&average_latency=0.5", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); + const auto& slow_fs = + ::arrow::internal::checked_cast(*fs); + EXPECT_EQ(slow_fs.seed, 0x42); + EXPECT_EQ(slow_fs.average_latency, 0.5); +} + +TEST(FileSystemFromUri, LoadedRegisteredFactory) { + // Since the registrar's definition is in libarrow_filesystem_example.so, + // its factory will be registered only after the library is dynamically loaded. + std::string path; + EXPECT_THAT(FileSystemFromUri("example:///hey/yo", &path), Raises(StatusCode::Invalid)); + + EXPECT_THAT(LoadFileSystemFactories(ARROW_FILESYSTEM_EXAMPLE_LIBPATH), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("example:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "local"); +} + +TEST(FileSystemFromUri, RuntimeRegisteredFactory) { + std::string path; + EXPECT_THAT(FileSystemFromUri("slowfile2:///hey/yo", &path), + Raises(StatusCode::Invalid)); + + EXPECT_THAT(RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), Ok()); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("slowfile2:///hey/yo", &path)); + EXPECT_EQ(path, "/hey/yo"); + EXPECT_EQ(fs->type_name(), "slow"); + + EXPECT_THAT( + RegisterFileSystemFactory("slowfile2", SlowFileSystemFactory), + Raises(StatusCode::KeyError, + testing::HasSubstr("Attempted to register factory for scheme 'slowfile2' " + "but that scheme is already registered"))); +} + +FileSystemRegistrar kSegfaultFileSystemModule[]{ + {"segfault", nullptr}, + {"segfault", nullptr}, + {"segfault", nullptr}, +}; +TEST(FileSystemFromUri, LinkedRegisteredFactoryNameCollision) { + // Since multiple registrars are defined in this translation unit which all + // register factories for the 'segfault' scheme, using that scheme in FileSystemFromUri + // is invalidated and raises KeyError. + std::string path; + EXPECT_THAT(FileSystemFromUri("segfault:///hey/yo", &path), + Raises(StatusCode::KeyError)); + // other schemes are not affected by the collision + EXPECT_THAT(FileSystemFromUri("slowfile:///hey/yo", &path), Ok()); +} //////////////////////////////////////////////////////////////////////////// // Misc tests @@ -164,7 +264,7 @@ GENERIC_FS_TEST_FUNCTIONS(TestLocalFSGenericMMap); template class TestLocalFS : public LocalFSTestMixin { public: - void SetUp() { + void SetUp() override { LocalFSTestMixin::SetUp(); path_formatter_ = PathFormatter(); local_path_ = EnsureTrailingSlash(path_formatter_(temp_dir_->path().ToString())); @@ -494,9 +594,4 @@ TYPED_TEST(TestLocalFS, StressGetFileInfoGenerator) { } } -// TODO Should we test backslash paths on Windows? -// SubTreeFileSystem isn't compatible with them. - -} // namespace internal -} // namespace fs -} // namespace arrow +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h index 32d06e5910d..5626560e083 100644 --- a/cpp/src/arrow/filesystem/mockfs.h +++ b/cpp/src/arrow/filesystem/mockfs.h @@ -26,9 +26,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace fs { -namespace internal { +namespace arrow::fs::internal { struct MockDirInfo { std::string full_path; @@ -68,16 +66,21 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; - // XXX It's not very practical to have to explicitly declare inheritance - // of default overrides. + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -92,10 +95,10 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; // Contents-dumping helpers to ease testing. // Output is lexicographically-ordered by full path. @@ -128,6 +131,4 @@ class ARROW_EXPORT MockAsyncFileSystem : public MockFileSystem { FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; }; -} // namespace internal -} // namespace fs -} // namespace arrow +} // namespace arrow::fs::internal diff --git a/cpp/src/arrow/filesystem/path_util.cc b/cpp/src/arrow/filesystem/path_util.cc index 9c895ae76c7..4e9628d3dc2 100644 --- a/cpp/src/arrow/filesystem/path_util.cc +++ b/cpp/src/arrow/filesystem/path_util.cc @@ -344,7 +344,7 @@ bool IsLikelyUri(std::string_view v) { // with 36 characters. return false; } - return ::arrow::internal::IsValidUriScheme(v.substr(0, pos)); + return ::arrow::util::IsValidUriScheme(v.substr(0, pos)); } struct Globber::Impl { diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 5fefe6b7cb0..f1ffc5ec816 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -132,8 +132,8 @@ namespace arrow { using internal::TaskGroup; using internal::ToChars; -using internal::Uri; using io::internal::SubmitIO; +using util::Uri; namespace fs { diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 13a0abde323..82d08bc5ea8 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -51,7 +51,7 @@ struct ARROW_EXPORT S3ProxyOptions { /// Initialize from URI such as http://username:password@host:port /// or http://host:port static Result FromUri(const std::string& uri); - static Result FromUri(const ::arrow::internal::Uri& uri); + static Result FromUri(const ::arrow::util::Uri& uri); bool Equals(const S3ProxyOptions& other) const; }; @@ -232,7 +232,7 @@ struct ARROW_EXPORT S3Options { /// generate temporary credentials. static S3Options FromAssumeRoleWithWebIdentity(); - static Result FromUri(const ::arrow::internal::Uri& uri, + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path = NULLPTR); static Result FromUri(const std::string& uri, std::string* out_path = NULLPTR); @@ -258,19 +258,24 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::DeleteDirContentsAsync; using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; - Future<> DeleteDirContentsAsync(const std::string& path, - bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; + Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -312,11 +317,11 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { /// implementing your own background execution strategy. Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; /// Create a S3FileSystem instance from the given options. static Result> Make( diff --git a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc index bbb3c32ee6b..9322beb45ea 100644 --- a/cpp/src/arrow/filesystem/s3fs_narrative_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_narrative_test.cc @@ -48,8 +48,7 @@ DEFINE_string(region, "", "AWS region"); DEFINE_string(endpoint, "", "Endpoint override (e.g. '127.0.0.1:9000')"); DEFINE_string(scheme, "https", "Connection scheme"); -namespace arrow { -namespace fs { +namespace arrow::fs { #define ASSERT_RAISES_PRINT(context_msg, error_type, expr) \ do { \ @@ -247,8 +246,7 @@ void TestMain(int argc, char** argv) { ASSERT_OK(FinalizeS3()); } -} // namespace fs -} // namespace arrow +} // namespace arrow::fs int main(int argc, char** argv) { std::stringstream ss; diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index ad7aaa1bd43..b581be73502 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -75,8 +75,8 @@ namespace fs { using ::arrow::internal::checked_pointer_cast; using ::arrow::internal::PlatformFilename; using ::arrow::internal::ToChars; -using ::arrow::internal::UriEscape; using ::arrow::internal::Zip; +using ::arrow::util::UriEscape; using ::arrow::fs::internal::ConnectRetryStrategy; using ::arrow::fs::internal::ErrorToStatus; diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index 8747f9683b9..d69f6c896d0 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -30,7 +30,7 @@ namespace arrow { using internal::StatusDetailFromErrno; -using internal::Uri; +using util::Uri; namespace fs { namespace internal { diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index 96cc5178a9f..74ddf015432 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -28,7 +28,7 @@ #include "arrow/util/visibility.h" namespace arrow { -using internal::Uri; +using util::Uri; namespace fs { namespace internal { diff --git a/cpp/src/arrow/flight/cookie_internal.cc b/cpp/src/arrow/flight/cookie_internal.cc index 921f0173136..8f41106ebce 100644 --- a/cpp/src/arrow/flight/cookie_internal.cc +++ b/cpp/src/arrow/flight/cookie_internal.cc @@ -159,8 +159,8 @@ CookiePair Cookie::ParseCookieAttribute(const std::string& cookie_header_value, } // Key/Value may be URI-encoded. - out_key = arrow::internal::UriUnescape(out_key); - out_value = arrow::internal::UriUnescape(out_value); + out_key = arrow::util::UriUnescape(out_key); + out_value = arrow::util::UriUnescape(out_value); // Strip outer quotes on the value. if (out_value.size() >= 2 && out_value[0] == '"' && diff --git a/cpp/src/arrow/flight/transport.h b/cpp/src/arrow/flight/transport.h index ee7bd017207..4029aa5223d 100644 --- a/cpp/src/arrow/flight/transport.h +++ b/cpp/src/arrow/flight/transport.h @@ -168,7 +168,7 @@ class ARROW_FLIGHT_EXPORT ClientTransport { /// Initialize the client. virtual Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) = 0; + const arrow::util::Uri& uri) = 0; /// Close the client. Once this returns, the client is no longer usable. virtual Status Close() = 0; diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc index f7612759e8d..f799ba761c4 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc @@ -684,13 +684,13 @@ class GrpcClientImpl : public internal::ClientTransport { } Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) override { + const arrow::util::Uri& uri) override { const std::string& scheme = location.scheme(); std::stringstream grpc_uri; std::shared_ptr<::grpc::ChannelCredentials> creds; if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) { - grpc_uri << arrow::internal::UriEncodeHost(uri.host()) << ':' << uri.port_text(); + grpc_uri << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text(); if (scheme == kSchemeGrpcTls) { if (options.disable_server_verification) { diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc index a9780b5eeb7..28fc736aa00 100644 --- a/cpp/src/arrow/flight/transport/grpc/grpc_server.cc +++ b/cpp/src/arrow/flight/transport/grpc/grpc_server.cc @@ -575,8 +575,7 @@ class GrpcServerTransport : public internal::ServerTransport { new GrpcServerTransport(base, std::move(memory_manager))); } - Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) override { + Status Init(const FlightServerOptions& options, const arrow::util::Uri& uri) override { grpc_service_.reset( new GrpcServiceHandler(options.auth_handler, options.middleware, this)); @@ -588,7 +587,7 @@ class GrpcServerTransport : public internal::ServerTransport { int port = 0; if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp || scheme == kSchemeGrpcTls) { std::stringstream address; - address << arrow::internal::UriEncodeHost(uri.host()) << ':' << uri.port_text(); + address << arrow::util::UriEncodeHost(uri.host()) << ':' << uri.port_text(); std::shared_ptr<::grpc::ServerCredentials> creds; if (scheme == kSchemeGrpcTls) { @@ -635,12 +634,10 @@ class GrpcServerTransport : public internal::ServerTransport { if (scheme == kSchemeGrpcTls) { ARROW_ASSIGN_OR_RAISE( - location_, - Location::ForGrpcTls(arrow::internal::UriEncodeHost(uri.host()), port)); + location_, Location::ForGrpcTls(arrow::util::UriEncodeHost(uri.host()), port)); } else if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp) { ARROW_ASSIGN_OR_RAISE( - location_, - Location::ForGrpcTcp(arrow::internal::UriEncodeHost(uri.host()), port)); + location_, Location::ForGrpcTcp(arrow::util::UriEncodeHost(uri.host()), port)); } return Status::OK(); } diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc index cd9ddaa85a6..32c2fd776f3 100644 --- a/cpp/src/arrow/flight/transport/ucx/ucx_client.cc +++ b/cpp/src/arrow/flight/transport/ucx/ucx_client.cc @@ -77,7 +77,7 @@ class ClientConnection { ARROW_DEFAULT_MOVE_AND_ASSIGN(ClientConnection); ~ClientConnection() { DCHECK(!driver_) << "Connection was not closed!"; } - Status Init(std::shared_ptr ucp_context, const arrow::internal::Uri& uri) { + Status Init(std::shared_ptr ucp_context, const arrow::util::Uri& uri) { auto status = InitImpl(std::move(ucp_context), uri); // Clean up after-the-fact if we fail to initialize if (!status.ok()) { @@ -91,8 +91,7 @@ class ClientConnection { return status; } - Status InitImpl(std::shared_ptr ucp_context, - const arrow::internal::Uri& uri) { + Status InitImpl(std::shared_ptr ucp_context, const arrow::util::Uri& uri) { { ucs_status_t status; ucp_worker_params_t worker_params; @@ -521,7 +520,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport { } Status Init(const FlightClientOptions& options, const Location& location, - const arrow::internal::Uri& uri) override { + const arrow::util::Uri& uri) override { RETURN_NOT_OK(uri_.Parse(uri.ToString())); { ucp_config_t* ucp_config; @@ -721,7 +720,7 @@ class UcxClientImpl : public arrow::flight::internal::ClientTransport { private: static constexpr size_t kMaxOpenConnections = 3; - arrow::internal::Uri uri_; + arrow::util::Uri uri_; std::shared_ptr ucp_context_; std::mutex connections_mutex_; std::deque connections_; diff --git a/cpp/src/arrow/flight/transport/ucx/ucx_server.cc b/cpp/src/arrow/flight/transport/ucx/ucx_server.cc index b20f8f286e3..cb9c8948ccf 100644 --- a/cpp/src/arrow/flight/transport/ucx/ucx_server.cc +++ b/cpp/src/arrow/flight/transport/ucx/ucx_server.cc @@ -198,8 +198,7 @@ class UcxServerImpl : public arrow::flight::internal::ServerTransport { } } - Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) override { + Status Init(const FlightServerOptions& options, const arrow::util::Uri& uri) override { const auto max_threads = std::max(8, std::thread::hardware_concurrency()); ARROW_ASSIGN_OR_RAISE(rpc_pool_, arrow::internal::ThreadPool::Make(max_threads)); diff --git a/cpp/src/arrow/flight/transport/ucx/util_internal.cc b/cpp/src/arrow/flight/transport/ucx/util_internal.cc index acaa4f58723..2db7d4e2630 100644 --- a/cpp/src/arrow/flight/transport/ucx/util_internal.cc +++ b/cpp/src/arrow/flight/transport/ucx/util_internal.cc @@ -50,7 +50,7 @@ ucs_status_t FlightUcxStatusDetail::Unwrap(const Status& status) { return dynamic_cast(status.detail().get())->status_; } -arrow::Result UriToSockaddr(const arrow::internal::Uri& uri, +arrow::Result UriToSockaddr(const arrow::util::Uri& uri, struct sockaddr_storage* addr) { std::string host = uri.host(); if (host.empty()) { diff --git a/cpp/src/arrow/flight/transport/ucx/util_internal.h b/cpp/src/arrow/flight/transport/ucx/util_internal.h index 84e84ba0711..958868d59d4 100644 --- a/cpp/src/arrow/flight/transport/ucx/util_internal.h +++ b/cpp/src/arrow/flight/transport/ucx/util_internal.h @@ -71,7 +71,7 @@ static inline bool IsIgnorableDisconnectError(ucs_status_t ucs_status) { /// /// \return The length of the sockaddr ARROW_FLIGHT_EXPORT -arrow::Result UriToSockaddr(const arrow::internal::Uri& uri, +arrow::Result UriToSockaddr(const arrow::util::Uri& uri, struct sockaddr_storage* addr); ARROW_FLIGHT_EXPORT diff --git a/cpp/src/arrow/flight/transport_server.h b/cpp/src/arrow/flight/transport_server.h index 51105a89304..8e5fe3e710c 100644 --- a/cpp/src/arrow/flight/transport_server.h +++ b/cpp/src/arrow/flight/transport_server.h @@ -69,7 +69,7 @@ class ARROW_FLIGHT_EXPORT ServerTransport { /// This method should launch the server in a background thread, i.e. it /// should not block. Once this returns, the server should be active. virtual Status Init(const FlightServerOptions& options, - const arrow::internal::Uri& uri) = 0; + const arrow::util::Uri& uri) = 0; /// \brief Shutdown the server. /// /// This should wait for active RPCs to finish. Once this returns, the diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index a1b799a3a06..39b59f65d9c 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -821,7 +821,7 @@ arrow::Result CloseSessionResult::Deserialize( return out; } -Location::Location() { uri_ = std::make_shared(); } +Location::Location() { uri_ = std::make_shared(); } arrow::Result Location::Parse(const std::string& uri_string) { Location location; diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index 7b3259c3c3a..b3df8377b8f 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -52,11 +52,11 @@ class DictionaryMemo; } // namespace ipc -namespace internal { +namespace util { class Uri; -} // namespace internal +} // namespace util namespace flight { @@ -474,7 +474,7 @@ struct ARROW_FLIGHT_EXPORT Location { private: friend class FlightClient; friend class FlightServerBase; - std::shared_ptr uri_; + std::shared_ptr uri_; }; /// \brief A flight ticket and list of locations where the ticket can be diff --git a/cpp/src/arrow/io/hdfs_internal.cc b/cpp/src/arrow/io/hdfs_internal.cc index 4592392b806..5619dd2435a 100644 --- a/cpp/src/arrow/io/hdfs_internal.cc +++ b/cpp/src/arrow/io/hdfs_internal.cc @@ -37,6 +37,7 @@ #include #include #include +#include "arrow/util/basic_decimal.h" #ifndef _WIN32 #include @@ -51,53 +52,27 @@ namespace arrow { using internal::GetEnvVarNative; using internal::PlatformFilename; -#ifdef _WIN32 -using internal::WinErrorMessage; -#endif -namespace io { -namespace internal { +namespace io::internal { namespace { -void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) { - if (handle == NULL) return NULL; -#ifndef _WIN32 - return dlsym(handle, symbol); -#else +template +Status SetSymbol(void* handle, char const* name, T** symbol) { + if (*symbol != nullptr) return Status::OK(); - void* ret = reinterpret_cast(GetProcAddress(handle, symbol)); - if (ret == NULL) { - // logstream(LOG_INFO) << "GetProcAddress error: " - // << get_last_err_str(GetLastError()) << std::endl; + auto maybe_symbol = ::arrow::internal::GetSymbolAs(handle, name); + if (Required || maybe_symbol.ok()) { + ARROW_ASSIGN_OR_RAISE(*symbol, maybe_symbol); } - return ret; -#endif + return Status::OK(); } -#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ - do { \ - if (!SHIM->SYMBOL_NAME) { \ - *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ - GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ - } \ - if (!SHIM->SYMBOL_NAME) \ - return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \ - } while (0) - -#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ - if (!SHIM->SYMBOL_NAME) { \ - *reinterpret_cast(&SHIM->SYMBOL_NAME) = \ - GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \ - } +#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \ + RETURN_NOT_OK(SetSymbol(SHIM->handle, #SYMBOL_NAME, &SHIM->SYMBOL_NAME)); -LibraryHandle libjvm_handle = nullptr; - -// Helper functions for dlopens -Result> get_potential_libjvm_paths(); -Result> get_potential_libhdfs_paths(); -Result try_dlopen(const std::vector& potential_paths, - const char* name); +#define GET_SYMBOL(SHIM, SYMBOL_NAME) \ + DCHECK_OK(SetSymbol(SHIM->handle, #SYMBOL_NAME, &SHIM->SYMBOL_NAME)); Result> MakeFilenameVector( const std::vector& names) { @@ -244,46 +219,18 @@ Result> get_potential_libjvm_paths() { return potential_paths; } -#ifndef _WIN32 -Result try_dlopen(const std::vector& potential_paths, - const char* name) { - std::string error_message = "unknown error"; - LibraryHandle handle; - - for (const auto& p : potential_paths) { - handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL); - - if (handle != NULL) { - return handle; - } else { - const char* err_msg = dlerror(); - if (err_msg != NULL) { - error_message = err_msg; - } - } - } - - return Status::IOError("Unable to load ", name, ": ", error_message); -} - -#else -Result try_dlopen(const std::vector& potential_paths, - const char* name) { - std::string error_message; - LibraryHandle handle; +Result try_dlopen(const std::vector& potential_paths, + std::string name) { + std::string error_message = "Unable to load " + name; for (const auto& p : potential_paths) { - handle = LoadLibraryW(p.ToNative().c_str()); - if (handle != NULL) { - return handle; - } else { - error_message = WinErrorMessage(GetLastError()); - } + auto maybe_handle = arrow::internal::LoadDynamicLibrary(p); + if (maybe_handle.ok()) return *maybe_handle; + error_message += "\n" + maybe_handle.status().message(); } - return Status::IOError("Unable to load ", name, ": ", error_message); + return Status(StatusCode::IOError, std::move(error_message)); } -#endif // _WIN32 LibHdfsShim libhdfs_shim; @@ -335,7 +282,7 @@ Status ConnectLibHdfs(LibHdfsShim** driver) { shim->Initialize(); ARROW_ASSIGN_OR_RAISE(auto libjvm_potential_paths, get_potential_libjvm_paths()); - ARROW_ASSIGN_OR_RAISE(libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm")); + RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm")); ARROW_ASSIGN_OR_RAISE(auto libhdfs_potential_paths, get_potential_libhdfs_paths()); ARROW_ASSIGN_OR_RAISE(shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs")); @@ -350,7 +297,7 @@ Status ConnectLibHdfs(LibHdfsShim** driver) { /////////////////////////////////////////////////////////////////////////// // HDFS thin wrapper methods -hdfsBuilder* LibHdfsShim::NewBuilder(void) { return this->hdfsNewBuilder(); } +hdfsBuilder* LibHdfsShim::NewBuilder() { return this->hdfsNewBuilder(); } void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) { this->hdfsBuilderSetNameNode(bld, nn); @@ -426,26 +373,29 @@ int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { return this->hdfsFlush(fs, fi int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) { GET_SYMBOL(this, hdfsAvailable); - if (this->hdfsAvailable) + if (this->hdfsAvailable) { return this->hdfsAvailable(fs, file); - else + } else { return 0; + } } int LibHdfsShim::Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { GET_SYMBOL(this, hdfsCopy); - if (this->hdfsCopy) + if (this->hdfsCopy) { return this->hdfsCopy(srcFS, src, dstFS, dst); - else + } else { return 0; + } } int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) { GET_SYMBOL(this, hdfsMove); - if (this->hdfsMove) + if (this->hdfsMove) { return this->hdfsMove(srcFS, src, dstFS, dst); - else + } else { return 0; + } } int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { @@ -454,10 +404,11 @@ int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) { int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) { GET_SYMBOL(this, hdfsRename); - if (this->hdfsRename) + if (this->hdfsRename) { return this->hdfsRename(fs, oldPath, newPath); - else + } else { return 0; + } } char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) { @@ -465,7 +416,7 @@ char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSiz if (this->hdfsGetWorkingDirectory) { return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize); } else { - return NULL; + return nullptr; } } @@ -509,7 +460,7 @@ char*** LibHdfsShim::GetHosts(hdfsFS fs, const char* path, tOffset start, if (this->hdfsGetHosts) { return this->hdfsGetHosts(fs, path, start, length); } else { - return NULL; + return nullptr; } } @@ -551,6 +502,5 @@ int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) { } } -} // namespace internal -} // namespace io +} // namespace io::internal } // namespace arrow diff --git a/cpp/src/arrow/io/hdfs_internal.h b/cpp/src/arrow/io/hdfs_internal.h index 590e3a48359..4b6b4884c00 100644 --- a/cpp/src/arrow/io/hdfs_internal.h +++ b/cpp/src/arrow/io/hdfs_internal.h @@ -33,18 +33,11 @@ namespace arrow { class Status; -namespace io { -namespace internal { - -#ifndef _WIN32 -typedef void* LibraryHandle; -#else -typedef HINSTANCE LibraryHandle; -#endif +namespace io::internal { // NOTE(wesm): cpplint does not like use of short and other imprecise C types struct LibHdfsShim { - LibraryHandle handle; + void* handle; hdfsBuilder* (*hdfsNewBuilder)(void); void (*hdfsBuilderSetNameNode)(hdfsBuilder* bld, const char* nn); @@ -217,6 +210,5 @@ struct LibHdfsShim { // TODO(wesm): Remove these exports when we are linking statically ARROW_EXPORT Status ConnectLibHdfs(LibHdfsShim** driver); -} // namespace internal -} // namespace io +} // namespace io::internal } // namespace arrow diff --git a/cpp/src/arrow/testing/CMakeLists.txt b/cpp/src/arrow/testing/CMakeLists.txt index 59825f0bf22..6cf4b2d2b12 100644 --- a/cpp/src/arrow/testing/CMakeLists.txt +++ b/cpp/src/arrow/testing/CMakeLists.txt @@ -20,4 +20,10 @@ arrow_install_all_headers("arrow/testing") if(ARROW_BUILD_TESTS) add_arrow_test(random_test) add_arrow_test(gtest_util_test) + + if(ARROW_FILESYSTEM) + add_library(arrow_filesystem_example MODULE examplefs.cc) + target_link_libraries(arrow_filesystem_example ${ARROW_TEST_LINK_LIBS} + ${ARROW_EXAMPLE_LINK_LIBS}) + endif() endif() diff --git a/cpp/src/arrow/testing/examplefs.cc b/cpp/src/arrow/testing/examplefs.cc new file mode 100644 index 00000000000..d3e7e3b03f6 --- /dev/null +++ b/cpp/src/arrow/testing/examplefs.cc @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/filesystem_library.h" +#include "arrow/result.h" +#include "arrow/util/uri.h" + +#include + +namespace arrow::fs { + +FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + constexpr std::string_view kScheme = "example"; + EXPECT_EQ(uri.scheme(), kScheme); + auto local_uri = "file" + uri.ToString().substr(kScheme.size()); + return FileSystemFromUri(local_uri, io_context, out_path); + }, +}; + +} // namespace arrow::fs diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index b693336e099..5928ebcb889 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -116,11 +116,13 @@ #include #endif -namespace arrow { - -using internal::checked_cast; +#ifdef _WIN32 +#include +#else +#include +#endif -namespace internal { +namespace arrow::internal { namespace { @@ -2215,5 +2217,58 @@ int64_t GetTotalMemoryBytes() { #endif } -} // namespace internal -} // namespace arrow +Result LoadDynamicLibrary(const char* path) { +#ifdef _WIN32 + ARROW_ASSIGN_OR_RAISE(auto platform_path, PlatformFilename::FromString(path)); + return LoadDynamicLibrary(platform_path); +#else + constexpr int kFlags = + // All undefined symbols in the shared object are resolved before dlopen() returns. + RTLD_NOW + // Symbols defined in this shared object are not made available to + // resolve references in subsequently loaded shared objects. + | RTLD_LOCAL; + if (void* handle = dlopen(path, kFlags)) return handle; + // dlopen(3) man page: "If dlopen() fails for any reason, it returns NULL." + // There is no null-returning non-error condition. + auto* error = dlerror(); + return Status::IOError("dlopen(", path, ") failed: ", error ? error : "unknown error"); +#endif +} + +Result LoadDynamicLibrary(const PlatformFilename& path) { +#ifdef _WIN32 + if (void* handle = LoadLibraryW(path.ToNative().c_str())) { + return handle; + } + // win32 api doc: "If the function fails, the return value is NULL." + // There is no null-returning non-error condition. + return IOErrorFromWinError(GetLastError(), "LoadLibrary(", path.ToString(), ") failed"); +#else + return LoadDynamicLibrary(path.ToNative().c_str()); +#endif +} + +Result GetSymbol(void* handle, const char* name) { + if (handle == nullptr) { + return Status::Invalid("Attempting to retrieve symbol '", name, + "' from null library handle"); + } +#ifdef _WIN32 + if (void* sym = reinterpret_cast( + GetProcAddress(reinterpret_cast(handle), name))) { + return sym; + } + // win32 api doc: "If the function fails, the return value is NULL." + // There is no null-returning non-error condition. + return IOErrorFromWinError(GetLastError(), "GetProcAddress(", name, ") failed."); +#else + if (void* sym = dlsym(handle, name)) return sym; + // dlsym(3) man page: "On failure, they return NULL" + // There is no null-returning non-error condition. + auto* error = dlerror(); + return Status::IOError("dlsym(", name, ") failed: ", error ? error : "unknown error"); +#endif +} + +} // namespace arrow::internal diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index bba71c0d80a..5f5bbd169e2 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -29,16 +29,16 @@ #include #if ARROW_HAVE_SIGACTION -#include // Needed for struct sigaction +#include // Needed for struct sigaction #endif +#include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/windows_fixup.h" -namespace arrow { -namespace internal { +namespace arrow::internal { // NOTE: 8-bit path strings on Windows are encoded using UTF-8. // Using MBCS would fail encoding some paths. @@ -338,7 +338,7 @@ class ARROW_EXPORT TemporaryDir { class ARROW_EXPORT SignalHandler { public: - typedef void (*Callback)(int); + using Callback = void (*)(int); SignalHandler(); explicit SignalHandler(Callback cb); @@ -419,5 +419,34 @@ int64_t GetCurrentRSS(); ARROW_EXPORT int64_t GetTotalMemoryBytes(); -} // namespace internal -} // namespace arrow +/// \brief Load a dynamic library +/// +/// This wraps dlopen() except on Windows, where LoadLibrary() is called. +/// These two platforms handle absolute paths consistently; relative paths +/// or the library's bare name may be handled but inconsistently. +/// +/// \return An opaque handle for the dynamic library, which can be used for +/// subsequent symbol lookup. Nullptr will never be returned; instead +/// an error will be raised. +ARROW_EXPORT Result LoadDynamicLibrary(const PlatformFilename& path); + +/// \brief Load a dynamic library +/// +/// An overload taking null terminated string. +ARROW_EXPORT Result LoadDynamicLibrary(const char* path); + +/// \brief Retrieve a symbol by name from a library handle. +/// +/// This wraps dlsym() except on Windows, where GetProcAddress() is called. +/// +/// \return The address associated with the named symbol. Nullptr will never be +/// returned; instead an error will be raised. +ARROW_EXPORT Result GetSymbol(void* handle, const char* name); + +template +Result GetSymbolAs(void* handle, const char* name) { + ARROW_ASSIGN_OR_RAISE(void* sym, GetSymbol(handle, name)); + return reinterpret_cast(sym); +} + +} // namespace arrow::internal diff --git a/cpp/src/arrow/util/type_fwd.h b/cpp/src/arrow/util/type_fwd.h index 6d904f19b11..3174881f4d0 100644 --- a/cpp/src/arrow/util/type_fwd.h +++ b/cpp/src/arrow/util/type_fwd.h @@ -64,6 +64,7 @@ class AsyncTaskScheduler; class Compressor; class Decompressor; class Codec; +class Uri; } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/uri.cc b/cpp/src/arrow/util/uri.cc index b291ee3d7f1..9c0f7f9a596 100644 --- a/cpp/src/arrow/util/uri.cc +++ b/cpp/src/arrow/util/uri.cc @@ -27,8 +27,7 @@ #include "arrow/util/value_parsing.h" #include "arrow/vendored/uriparser/Uri.h" -namespace arrow { -namespace internal { +namespace arrow::util { namespace { @@ -111,7 +110,7 @@ bool IsValidUriScheme(std::string_view s) { } struct Uri::Impl { - Impl() : string_rep_(""), port_(-1) { memset(&uri_, 0, sizeof(uri_)); } + Impl() { memset(&uri_, 0, sizeof(uri_)); } ~Impl() { uriFreeUriMembersA(&uri_); } @@ -133,7 +132,7 @@ struct Uri::Impl { // Keep alive strings that uriparser stores pointers to std::vector data_; std::string string_rep_; - int32_t port_; + int32_t port_ = -1; std::vector path_segments_; bool is_file_uri_; bool is_absolute_path_; @@ -141,7 +140,7 @@ struct Uri::Impl { Uri::Uri() : impl_(new Impl) {} -Uri::~Uri() {} +Uri::~Uri() = default; Uri::Uri(Uri&& u) : impl_(std::move(u.impl_)) {} @@ -169,21 +168,19 @@ int32_t Uri::port() const { return impl_->port_; } std::string Uri::username() const { auto userpass = TextRangeToView(impl_->uri_.userInfo); auto sep_pos = userpass.find_first_of(':'); - if (sep_pos == std::string_view::npos) { - return UriUnescape(userpass); - } else { - return UriUnescape(userpass.substr(0, sep_pos)); + if (sep_pos != std::string_view::npos) { + userpass = userpass.substr(0, sep_pos); } + return UriUnescape(userpass); } std::string Uri::password() const { auto userpass = TextRangeToView(impl_->uri_.userInfo); auto sep_pos = userpass.find_first_of(':'); if (sep_pos == std::string_view::npos) { - return std::string(); - } else { - return UriUnescape(userpass.substr(sep_pos + 1)); + return ""; } + return UriUnescape(userpass.substr(sep_pos + 1)); } std::string Uri::path() const { @@ -301,7 +298,8 @@ Status Uri::Parse(const std::string& uri_string) { auto port_text = TextRangeToView(impl_->uri_.portText); if (port_text.size()) { uint16_t port_num; - if (!ParseValue(port_text.data(), port_text.size(), &port_num)) { + if (!::arrow::internal::ParseValue(port_text.data(), port_text.size(), + &port_num)) { return Status::Invalid("Invalid port number '", port_text, "' in URI '", uri_string, "'"); } @@ -311,6 +309,12 @@ Status Uri::Parse(const std::string& uri_string) { return Status::OK(); } +Result Uri::FromString(const std::string& uri_string) { + Uri uri; + ARROW_RETURN_NOT_OK(uri.Parse(uri_string)); + return uri; +} + Result UriFromAbsolutePath(std::string_view path) { if (path.empty()) { return Status::Invalid( @@ -336,5 +340,4 @@ Result UriFromAbsolutePath(std::string_view path) { return out; } -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri.h b/cpp/src/arrow/util/uri.h index 855a61408da..74dbe924ff2 100644 --- a/cpp/src/arrow/util/uri.h +++ b/cpp/src/arrow/util/uri.h @@ -27,8 +27,7 @@ #include "arrow/type_fwd.h" #include "arrow/util/visibility.h" -namespace arrow { -namespace internal { +namespace arrow::util { /// \brief A parsed URI class ARROW_EXPORT Uri { @@ -86,6 +85,9 @@ class ARROW_EXPORT Uri { /// Factory function to parse a URI from its string representation. Status Parse(const std::string& uri_string); + /// Factory function to parse a URI from its string representation. + static Result FromString(const std::string& uri_string); + private: struct Impl; std::unique_ptr impl_; @@ -114,5 +116,4 @@ bool IsValidUriScheme(std::string_view s); ARROW_EXPORT Result UriFromAbsolutePath(std::string_view path); -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/uri_test.cc b/cpp/src/arrow/util/uri_test.cc index 4293dc73b01..36e09b1b2e8 100644 --- a/cpp/src/arrow/util/uri_test.cc +++ b/cpp/src/arrow/util/uri_test.cc @@ -26,8 +26,7 @@ #include "arrow/util/logging.h" #include "arrow/util/uri.h" -namespace arrow { -namespace internal { +namespace arrow::util { TEST(UriEscape, Basics) { ASSERT_EQ(UriEscape(""), ""); @@ -371,5 +370,4 @@ TEST(UriFromAbsolutePath, Basics) { #endif } -} // namespace internal -} // namespace arrow +} // namespace arrow::util diff --git a/cpp/src/arrow/util/visibility.h b/cpp/src/arrow/util/visibility.h index b0fd790295b..5e7e34603d9 100644 --- a/cpp/src/arrow/util/visibility.h +++ b/cpp/src/arrow/util/visibility.h @@ -36,6 +36,9 @@ #define ARROW_DLLIMPORT __declspec(dllimport) #endif +// _declspec(dllexport) even when the #included by a non-arrow source +#define ARROW_FORCE_EXPORT ARROW_DLLEXPORT + #ifdef ARROW_STATIC #define ARROW_EXPORT #define ARROW_FRIEND_EXPORT @@ -80,4 +83,7 @@ #define ARROW_FRIEND_EXPORT #define ARROW_TEMPLATE_EXPORT +// [[gnu::visibility("default")]] even when #included by a non-arrow source +#define ARROW_FORCE_EXPORT [[gnu::visibility("default")]] + #endif // Non-Windows diff --git a/docs/source/cpp/api/filesystem.rst b/docs/source/cpp/api/filesystem.rst index 8132af42e24..02b12668327 100644 --- a/docs/source/cpp/api/filesystem.rst +++ b/docs/source/cpp/api/filesystem.rst @@ -19,6 +19,8 @@ Filesystems =========== +.. _cpp-api-filesystems: + Interface ========= @@ -33,12 +35,22 @@ Interface .. doxygenclass:: arrow::fs::FileSystem :members: -High-level factory function -=========================== +.. doxygenfunction:: arrow::fs::EnsureFinalized() + +.. _filesystem-factory-functions: + +High-level factory functions +============================ .. doxygengroup:: filesystem-factories :content-only: +Factory registration functions +============================== + +.. doxygengroup:: filesystem-factory-registration + :content-only: + Concrete implementations ======================== diff --git a/docs/source/cpp/io.rst b/docs/source/cpp/io.rst index 28ab5d783a2..2a05473852c 100644 --- a/docs/source/cpp/io.rst +++ b/docs/source/cpp/io.rst @@ -73,6 +73,9 @@ The :class:`filesystem interface ` allows abstracted access over various data storage backends such as the local filesystem or a S3 bucket. It provides input and output streams as well as directory operations. +.. seealso:: + :ref:`Filesystems API reference `. + The filesystem interface exposes a simplified view of the underlying data storage. Data paths are represented as *abstract paths*, which are ``/``-separated, even on Windows, and shouldn't include special path @@ -81,7 +84,14 @@ underlying storage, are automatically dereferenced. Only basic :class:`metadata ` about file entries, such as the file size and modification time, is made available. -Concrete implementations are available for +Filesystem instances can be constructed from URI strings using one of the +:ref:`FromUri factories `, which dispatch to +implementation-specific factories based on the URI's ``scheme``. Other properties +for the new instance are extracted from the URI's other properties such as the +``hostname``, ``username``, etc. Arrow supports runtime registration of new +filesystems, and provides built-in support for several filesystems. + +Which built-in filesystems are supported is configured at build time and may include :class:`local filesystem access `, :class:`HDFS `, :class:`Amazon S3-compatible storage ` and @@ -91,4 +101,47 @@ Concrete implementations are available for Tasks that use filesystems will typically run on the :ref:`I/O thread pool`. For filesystems that support high levels - of concurrency you may get a benefit from increasing the size of the I/O thread pool. \ No newline at end of file + of concurrency you may get a benefit from increasing the size of the I/O thread pool. + +Defining new filesystems +======================== + +Support for additional URI schemes can be added to the +:ref:`FromUri factories ` +by registering a factory for each new URI scheme with +:func:`~arrow::fs::RegisterFileSystemFactory`. To enable the common case +wherein it is preferred that registration be automatic, an instance of +:class:`~arrow::fs::FileSystemRegistrar` can be defined at namespace +scope, which will register a factory whenever the instance is loaded: + +.. code-block:: cpp + + arrow::fs::FileSystemRegistrar kExampleFileSystemModule{ + "example", + [](const Uri& uri, const io::IOContext& io_context, + std::string* out_path) -> Result> { + EnsureExampleFileSystemInitialized(); + return std::make_shared(); + }, + &EnsureExampleFileSystemFinalized, + }; + +If a filesystem implementation requires initialization before any instances +may be constructed, this should be included in the corresponding factory or +otherwise automatically ensured before the factory is invoked. Likewise if +a filesystem implementation requires tear down before the process ends, this +can be wrapped in a function and registered alongside the factory. All +finalizers will be called by :func:`~arrow::fs::EnsureFinalized`. + +Build complexity can be decreased by compartmentalizing a filesystem +implementation into a separate shared library, which applications may +link or load dynamically. Arrow's built-in filesystem implementations +also follow this pattern. If a shared library containing instances of +:class:`~arrow::fs::FileSystemRegistrar` must be dynamically loaded, +:func:`~arrow::fs::LoadFileSystemFactories` should be used to load it. +If such a library might link statically to arrow, it +should have exactly one of its sources +``#include "arrow/filesystem/filesystem_library.h"`` +in order to ensure the presence of the symbol on which +:func:`~arrow::fs::LoadFileSystemFactories` depends. + diff --git a/python/pyarrow/src/arrow/python/filesystem.h b/python/pyarrow/src/arrow/python/filesystem.h index 003fd5cb805..194b226ac5c 100644 --- a/python/pyarrow/src/arrow/python/filesystem.h +++ b/python/pyarrow/src/arrow/python/filesystem.h @@ -26,9 +26,7 @@ #include "arrow/python/visibility.h" #include "arrow/util/macros.h" -namespace arrow { -namespace py { -namespace fs { +namespace arrow::py::fs { class ARROW_PYTHON_EXPORT PyFileSystemVtable { public: @@ -83,16 +81,24 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { bool Equals(const FileSystem& other) const override; + /// \cond FALSE + using FileSystem::CreateDir; + using FileSystem::DeleteDirContents; + using FileSystem::GetFileInfo; + using FileSystem::OpenAppendStream; + using FileSystem::OpenOutputStream; + /// \endcond + Result GetFileInfo(const std::string& path) override; Result> GetFileInfo( const std::vector& paths) override; Result> GetFileInfo( const arrow::fs::FileSelector& select) override; - Status CreateDir(const std::string& path, bool recursive = true) override; + Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; - Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override; + Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; @@ -107,10 +113,10 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { const std::string& path) override; Result> OpenOutputStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, - const std::shared_ptr& metadata = {}) override; + const std::shared_ptr& metadata) override; Result NormalizePath(std::string path) override; @@ -121,6 +127,4 @@ class ARROW_PYTHON_EXPORT PyFileSystem : public arrow::fs::FileSystem { PyFileSystemVtable vtable_; }; -} // namespace fs -} // namespace py -} // namespace arrow +} // namespace arrow::py::fs