From 1baede85900f0f63ac8b990a0ab2cf4ae875bf5a Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 11 Feb 2021 19:05:52 +0100 Subject: [PATCH 1/3] ARROW-10420: [C++] Refactor io and filesystem APIs to take an IOContext The `io::IOContext` class allows passing various settings such as the MemoryPool used for allocation and the Executor for async methods. --- cpp/src/arrow/csv/reader.cc | 24 +++---- cpp/src/arrow/csv/reader.h | 9 +-- cpp/src/arrow/csv/reader_test.cc | 4 +- cpp/src/arrow/dataset/discovery.h | 3 +- cpp/src/arrow/dataset/file_base.cc | 9 +++ cpp/src/arrow/dataset/file_base.h | 12 +++- cpp/src/arrow/dataset/file_ipc.cc | 10 +-- cpp/src/arrow/dataset/file_ipc.h | 15 ++-- cpp/src/arrow/dataset/file_parquet.cc | 22 +++--- cpp/src/arrow/dataset/file_parquet.h | 9 +-- cpp/src/arrow/filesystem/filesystem.cc | 83 +++++++++++++++-------- cpp/src/arrow/filesystem/filesystem.h | 24 ++++++- cpp/src/arrow/filesystem/hdfs.cc | 19 +++--- cpp/src/arrow/filesystem/hdfs.h | 5 +- cpp/src/arrow/filesystem/localfs.cc | 19 +++--- cpp/src/arrow/filesystem/localfs.h | 5 +- cpp/src/arrow/filesystem/localfs_test.cc | 9 ++- cpp/src/arrow/filesystem/mockfs.cc | 67 ++++++++++++------ cpp/src/arrow/filesystem/mockfs.h | 8 ++- cpp/src/arrow/filesystem/s3fs.cc | 47 +++++++++---- cpp/src/arrow/filesystem/s3fs.h | 5 +- cpp/src/arrow/filesystem/util_internal.cc | 5 +- cpp/src/arrow/filesystem/util_internal.h | 3 +- cpp/src/arrow/io/caching.cc | 4 +- cpp/src/arrow/io/caching.h | 4 +- cpp/src/arrow/io/file.cc | 2 +- cpp/src/arrow/io/file.h | 2 +- cpp/src/arrow/io/hdfs.cc | 26 ++++--- cpp/src/arrow/io/hdfs.h | 11 ++- cpp/src/arrow/io/interfaces.cc | 34 +++++++--- cpp/src/arrow/io/interfaces.h | 49 ++++++++++--- cpp/src/arrow/io/memory.cc | 2 +- cpp/src/arrow/io/memory.h | 2 +- cpp/src/arrow/io/type_fwd.h | 8 +++ cpp/src/arrow/ipc/type_fwd.h | 3 + cpp/src/arrow/util/parallel.h | 11 +-- cpp/src/parquet/arrow/reader.cc | 4 +- cpp/src/parquet/file_reader.cc | 4 +- cpp/src/parquet/file_reader.h | 2 +- cpp/src/parquet/properties.h | 6 +- r/src/filesystem.cpp | 3 +- 41 files changed, 389 insertions(+), 204 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index f0fa1f206d3..560382c6d6b 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -40,6 +40,7 @@ #include "arrow/status.h" #include "arrow/table.h" #include "arrow/type.h" +#include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" @@ -51,19 +52,12 @@ #include "arrow/util/utf8.h" namespace arrow { - -class MemoryPool; - -namespace io { - -class InputStream; - -} // namespace io - namespace csv { using internal::Executor; +namespace { + struct ConversionSchema { struct Column { std::string name; @@ -154,6 +148,7 @@ struct CSVBlock { std::function consume_bytes; }; +} // namespace } // namespace csv template <> @@ -162,6 +157,7 @@ struct IterationTraits { }; namespace csv { +namespace { // The == operator must be defined to be used as T in Iterator bool operator==(const CSVBlock& left, const CSVBlock& right) { @@ -935,17 +931,19 @@ class AsyncThreadedTableReader AsyncGenerator> buffer_generator_; }; +} // namespace + ///////////////////////////////////////////////////////////////////////// // Factory functions Result> TableReader::Make( - MemoryPool* pool, io::AsyncContext async_context, - std::shared_ptr input, const ReadOptions& read_options, - const ParseOptions& parse_options, const ConvertOptions& convert_options) { + MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options) { std::shared_ptr reader; if (read_options.use_threads) { reader = std::make_shared( - pool, input, read_options, parse_options, convert_options, async_context.executor, + pool, input, read_options, parse_options, convert_options, io_context.executor(), internal::GetCpuThreadPool()); } else { reader = std::make_shared(pool, input, read_options, parse_options, diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h index c361fbddce9..8aba3c45a75 100644 --- a/cpp/src/arrow/csv/reader.h +++ b/cpp/src/arrow/csv/reader.h @@ -46,12 +46,9 @@ class ARROW_EXPORT TableReader { virtual Future> ReadAsync() = 0; /// Create a TableReader instance - static Result> Make(MemoryPool* pool, - io::AsyncContext async_context, - std::shared_ptr input, - const ReadOptions&, - const ParseOptions&, - const ConvertOptions&); + static Result> Make( + MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, + const ReadOptions&, const ParseOptions&, const ConvertOptions&); }; /// Experimental diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index 64010ae481a..a3164930684 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -108,7 +108,7 @@ TableReaderFactory MakeSerialFactory() { auto read_options = ReadOptions::Defaults(); read_options.block_size = 1 << 10; read_options.use_threads = false; - return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream, + return TableReader::Make(default_memory_pool(), io::IOContext(), input_stream, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); }; @@ -132,7 +132,7 @@ Result MakeAsyncFactory( read_options.use_threads = true; read_options.block_size = 1 << 10; auto table_reader = TableReader::Make( - default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream, + default_memory_pool(), io::IOContext(thread_pool.get()), input_stream, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); return table_reader; }; diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index ca3274cc086..94c49ff0b85 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -30,8 +30,7 @@ #include "arrow/dataset/partition.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" -#include "arrow/filesystem/filesystem.h" -#include "arrow/filesystem/path_forest.h" +#include "arrow/filesystem/type_fwd.h" #include "arrow/result.h" #include "arrow/util/macros.h" #include "arrow/util/variant.h" diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 612c249861c..e468b686af5 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -160,6 +160,13 @@ Status FileWriter::Write(RecordBatchReader* batches) { return Status::OK(); } +Status FileWriter::Finish() { + RETURN_NOT_OK(FinishInternal()); + return destination_->Close(); +} + +namespace { + constexpr util::string_view kIntegerToken = "{i}"; Status ValidateBasenameTemplate(util::string_view basename_template) { @@ -257,6 +264,8 @@ class WriteQueue { std::shared_ptr schema_; }; +} // namespace + Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, std::shared_ptr scanner) { RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template)); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 708f7e02054..d058ac2c077 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -268,18 +268,24 @@ class ARROW_DS_EXPORT FileWriter { Status Write(RecordBatchReader* batches); - virtual Status Finish() = 0; + Status Finish(); const std::shared_ptr& format() const { return options_->format(); } const std::shared_ptr& schema() const { return schema_; } const std::shared_ptr& options() const { return options_; } protected: - FileWriter(std::shared_ptr schema, std::shared_ptr options) - : schema_(std::move(schema)), options_(std::move(options)) {} + FileWriter(std::shared_ptr schema, std::shared_ptr options, + std::shared_ptr destination) + : schema_(std::move(schema)), + options_(std::move(options)), + destination_(destination) {} + + virtual Status FinishInternal() = 0; std::shared_ptr schema_; std::shared_ptr options_; + std::shared_ptr destination_; }; struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 8bd01218344..b48b8c767cb 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -193,20 +193,22 @@ Result> IpcFileFormat::MakeWriter( ipc_options->metadata)); return std::shared_ptr( - new IpcFileWriter(std::move(writer), std::move(schema), std::move(ipc_options))); + new IpcFileWriter(std::move(destination), std::move(writer), std::move(schema), + std::move(ipc_options))); } -IpcFileWriter::IpcFileWriter(std::shared_ptr writer, +IpcFileWriter::IpcFileWriter(std::shared_ptr destination, + std::shared_ptr writer, std::shared_ptr schema, std::shared_ptr options) - : FileWriter(std::move(schema), std::move(options)), + : FileWriter(std::move(schema), std::move(options), std::move(destination)), batch_writer_(std::move(writer)) {} Status IpcFileWriter::Write(const std::shared_ptr& batch) { return batch_writer_->WriteRecordBatch(*batch); } -Status IpcFileWriter::Finish() { return batch_writer_->Close(); } +Status IpcFileWriter::FinishInternal() { return batch_writer_->Close(); } } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 2cdd837430e..35a76060408 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -25,15 +25,10 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/ipc/type_fwd.h" #include "arrow/result.h" namespace arrow { -namespace ipc { - -class RecordBatchWriter; -struct IpcWriteOptions; - -} // namespace ipc namespace dataset { /// \brief A FileFormat implementation that reads from and writes to Ipc files @@ -82,13 +77,15 @@ class ARROW_DS_EXPORT IpcFileWriter : public FileWriter { public: Status Write(const std::shared_ptr& batch) override; - Status Finish() override; - private: - IpcFileWriter(std::shared_ptr writer, + IpcFileWriter(std::shared_ptr destination, + std::shared_ptr writer, std::shared_ptr schema, std::shared_ptr options); + Status FinishInternal() override; + + std::shared_ptr destination_; std::shared_ptr batch_writer_; friend class IpcFileFormat; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index c26ad0490ba..05bff2d1f52 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -56,8 +56,7 @@ class ParquetScanTask : public ScanTask { ParquetScanTask(int row_group, std::vector column_projection, std::shared_ptr reader, std::shared_ptr pre_buffer_once, - std::vector pre_buffer_row_groups, - arrow::io::AsyncContext async_context, + std::vector pre_buffer_row_groups, arrow::io::IOContext io_context, arrow::io::CacheOptions cache_options, std::shared_ptr options, std::shared_ptr context) @@ -67,7 +66,7 @@ class ParquetScanTask : public ScanTask { reader_(std::move(reader)), pre_buffer_once_(std::move(pre_buffer_once)), pre_buffer_row_groups_(std::move(pre_buffer_row_groups)), - async_context_(async_context), + io_context_(io_context), cache_options_(cache_options) {} Result Execute() override { @@ -106,7 +105,7 @@ class ParquetScanTask : public ScanTask { BEGIN_PARQUET_CATCH_EXCEPTIONS std::call_once(*pre_buffer_once_, [this]() { reader_->parquet_reader()->PreBuffer(pre_buffer_row_groups_, column_projection_, - async_context_, cache_options_); + io_context_, cache_options_); }); END_PARQUET_CATCH_EXCEPTIONS } @@ -121,7 +120,7 @@ class ParquetScanTask : public ScanTask { // to be done. We assume all scan tasks have the same column projection. std::shared_ptr pre_buffer_once_; std::vector pre_buffer_row_groups_; - arrow::io::AsyncContext async_context_; + arrow::io::IOContext io_context_; arrow::io::CacheOptions cache_options_; }; @@ -362,7 +361,7 @@ Result ParquetFileFormat::ScanFile(std::shared_ptr( row_groups[i], column_projection, reader, pre_buffer_once, row_groups, - reader_options.async_context, reader_options.cache_options, options, context); + reader_options.io_context, reader_options.cache_options, options, context); } return MakeVectorIterator(std::move(tasks)); @@ -410,13 +409,14 @@ Result> ParquetFileFormat::MakeWriter( *schema, default_memory_pool(), destination, parquet_options->writer_properties, parquet_options->arrow_writer_properties, &parquet_writer)); - return std::shared_ptr( - new ParquetFileWriter(std::move(parquet_writer), std::move(parquet_options))); + return std::shared_ptr(new ParquetFileWriter( + std::move(destination), std::move(parquet_writer), std::move(parquet_options))); } -ParquetFileWriter::ParquetFileWriter(std::shared_ptr writer, +ParquetFileWriter::ParquetFileWriter(std::shared_ptr destination, + std::shared_ptr writer, std::shared_ptr options) - : FileWriter(writer->schema(), std::move(options)), + : FileWriter(writer->schema(), std::move(options), std::move(destination)), parquet_writer_(std::move(writer)) {} Status ParquetFileWriter::Write(const std::shared_ptr& batch) { @@ -424,7 +424,7 @@ Status ParquetFileWriter::Write(const std::shared_ptr& batch) { return parquet_writer_->WriteTable(*table, batch->num_rows()); } -Status ParquetFileWriter::Finish() { return parquet_writer_->Close(); } +Status ParquetFileWriter::FinishInternal() { return parquet_writer_->Close(); } // // ParquetFileFragment diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 6967ab30669..ed0a6f949d6 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -97,7 +97,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { std::unordered_set dict_columns; bool pre_buffer = false; arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults(); - arrow::io::AsyncContext async_context; + arrow::io::IOContext io_context; /// @} /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a @@ -226,12 +226,13 @@ class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter { Status Write(const std::shared_ptr& batch) override; - Status Finish() override; - private: - ParquetFileWriter(std::shared_ptr writer, + ParquetFileWriter(std::shared_ptr destination, + std::shared_ptr writer, std::shared_ptr options); + Status FinishInternal() override; + std::shared_ptr parquet_writer_; friend class ParquetFileFormat; diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 6945aa06465..15441e7ae64 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -167,7 +167,9 @@ Result> FileSystem::OpenInputFile( SubTreeFileSystem::SubTreeFileSystem(const std::string& base_path, std::shared_ptr base_fs) - : base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()), base_fs_(base_fs) {} + : FileSystem(base_fs->io_context()), + base_path_(NormalizeBasePath(base_path, base_fs).ValueOrDie()), + base_fs_(base_fs) {} SubTreeFileSystem::~SubTreeFileSystem() {} @@ -344,15 +346,19 @@ Result> SubTreeFileSystem::OpenAppendStream( SlowFileSystem::SlowFileSystem(std::shared_ptr base_fs, std::shared_ptr latencies) - : base_fs_(base_fs), latencies_(latencies) {} + : FileSystem(base_fs->io_context()), base_fs_(base_fs), latencies_(latencies) {} SlowFileSystem::SlowFileSystem(std::shared_ptr base_fs, double average_latency) - : base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency)) {} + : FileSystem(base_fs->io_context()), + base_fs_(base_fs), + latencies_(io::LatencyGenerator::Make(average_latency)) {} SlowFileSystem::SlowFileSystem(std::shared_ptr base_fs, double average_latency, int32_t seed) - : base_fs_(base_fs), latencies_(io::LatencyGenerator::Make(average_latency, seed)) {} + : FileSystem(base_fs->io_context()), + base_fs_(base_fs), + latencies_(io::LatencyGenerator::Make(average_latency, seed)) {} bool SlowFileSystem::Equals(const FileSystem& other) const { return this == &other; } @@ -443,34 +449,37 @@ Result> SlowFileSystem::OpenAppendStream( } Status CopyFiles(const std::vector& sources, - const std::vector& destinations, int64_t chunk_size, - bool use_threads) { + const std::vector& destinations, + const io::IOContext& io_context, int64_t chunk_size, bool use_threads) { if (sources.size() != destinations.size()) { return Status::Invalid("Trying to copy ", sources.size(), " files into ", destinations.size(), " paths."); } - return ::arrow::internal::OptionalParallelFor( - use_threads, static_cast(sources.size()), [&](int i) { - if (sources[i].filesystem->Equals(destinations[i].filesystem)) { - return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path); - } + auto copy_one_file = [&](int i) { + if (sources[i].filesystem->Equals(destinations[i].filesystem)) { + return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path); + } + + ARROW_ASSIGN_OR_RAISE(auto source, + sources[i].filesystem->OpenInputStream(sources[i].path)); - ARROW_ASSIGN_OR_RAISE(auto source, - sources[i].filesystem->OpenInputStream(sources[i].path)); + ARROW_ASSIGN_OR_RAISE(auto destination, destinations[i].filesystem->OpenOutputStream( + destinations[i].path)); + RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size, io_context)); + return destination->Close(); + }; - ARROW_ASSIGN_OR_RAISE( - auto destination, - destinations[i].filesystem->OpenOutputStream(destinations[i].path)); - return internal::CopyStream(source, destination, chunk_size); - }); + return ::arrow::internal::OptionalParallelFor( + use_threads, static_cast(sources.size()), std::move(copy_one_file), + io_context.executor()); } Status CopyFiles(const std::shared_ptr& source_fs, const FileSelector& source_sel, const std::shared_ptr& destination_fs, - const std::string& destination_base_dir, int64_t chunk_size, - bool use_threads) { + const std::string& destination_base_dir, const io::IOContext& io_context, + int64_t chunk_size, bool use_threads) { ARROW_ASSIGN_OR_RAISE(auto source_infos, source_fs->GetFileInfo(source_sel)); if (source_infos.empty()) { return Status::OK(); @@ -497,12 +506,14 @@ Status CopyFiles(const std::shared_ptr& source_fs, } } + auto create_one_dir = [&](int i) { return destination_fs->CreateDir(dirs[i]); }; + dirs = internal::MinimalCreateDirSet(std::move(dirs)); RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( - use_threads, static_cast(dirs.size()), - [&](int i) { return destination_fs->CreateDir(dirs[i]); })); + use_threads, static_cast(dirs.size()), std::move(create_one_dir), + io_context.executor())); - return CopyFiles(sources, destinations, chunk_size, use_threads); + return CopyFiles(sources, destinations, io_context, chunk_size, use_threads); } namespace { @@ -526,6 +537,7 @@ Result ParseFileSystemUri(const std::string& uri_string) { Result> FileSystemFromUriReal(const Uri& uri, const std::string& uri_string, + const io::IOContext& io_context, std::string* out_path) { const auto scheme = uri.scheme(); @@ -535,7 +547,7 @@ Result> FileSystemFromUriReal(const Uri& uri, if (out_path != nullptr) { *out_path = path; } - return std::make_shared(options); + return std::make_shared(options, io_context); } if (scheme == "hdfs" || scheme == "viewfs") { #ifdef ARROW_HDFS @@ -543,7 +555,7 @@ Result> FileSystemFromUriReal(const Uri& uri, if (out_path != nullptr) { *out_path = uri.path(); } - ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options)); + ARROW_ASSIGN_OR_RAISE(auto hdfs, HadoopFileSystem::Make(options, io_context)); return hdfs; #else return Status::NotImplemented("Got HDFS URI but Arrow compiled without HDFS support"); @@ -553,7 +565,7 @@ Result> FileSystemFromUriReal(const Uri& uri, #ifdef ARROW_S3 RETURN_NOT_OK(EnsureS3Initialized()); ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path)); - ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options)); + ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context)); return s3fs; #else return Status::NotImplemented("Got S3 URI but Arrow compiled without S3 support"); @@ -566,7 +578,8 @@ Result> FileSystemFromUriReal(const Uri& uri, if (out_path != nullptr) { *out_path = std::string(RemoveLeadingSlash(uri.path())); } - return std::make_shared(internal::CurrentTimePoint()); + return std::make_shared(internal::CurrentTimePoint(), + io_context); } return Status::Invalid("Unrecognized filesystem type in URI: ", uri_string); @@ -576,12 +589,24 @@ Result> FileSystemFromUriReal(const Uri& uri, Result> FileSystemFromUri(const std::string& uri_string, std::string* out_path) { + return FileSystemFromUri(uri_string, io::default_io_context(), out_path); +} + +Result> FileSystemFromUri(const std::string& uri_string, + const io::IOContext& io_context, + std::string* out_path) { ARROW_ASSIGN_OR_RAISE(auto fsuri, ParseFileSystemUri(uri_string)); - return FileSystemFromUriReal(fsuri, uri_string, out_path); + return FileSystemFromUriReal(fsuri, uri_string, io_context, out_path); } Result> FileSystemFromUriOrPath(const std::string& uri_string, std::string* out_path) { + return FileSystemFromUriOrPath(uri_string, io::default_io_context(), out_path); +} + +Result> FileSystemFromUriOrPath( + const std::string& uri_string, const io::IOContext& io_context, + std::string* out_path) { if (internal::DetectAbsolutePath(uri_string)) { // Normalize path separators if (out_path != nullptr) { @@ -589,7 +614,7 @@ Result> FileSystemFromUriOrPath(const std::string& u } return std::make_shared(); } - return FileSystemFromUri(uri_string, out_path); + return FileSystemFromUri(uri_string, io_context, out_path); } Status FileSystemFromUri(const std::string& uri, std::shared_ptr* out_fs, diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index 9eeb3d86841..fd7f448b9b1 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -26,7 +26,7 @@ #include #include "arrow/filesystem/type_fwd.h" -#include "arrow/io/type_fwd.h" +#include "arrow/io/interfaces.h" #include "arrow/type_fwd.h" #include "arrow/util/compare.h" #include "arrow/util/macros.h" @@ -147,6 +147,9 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this virtual std::string type_name() const = 0; + /// EXPERIMENTAL: The IOContext associated with this filesystem. + const io::IOContext& io_context() const { return io_context_; } + /// Normalize path for the given filesystem /// /// The default implementation of this method is a no-op, but subclasses @@ -250,6 +253,12 @@ class ARROW_EXPORT FileSystem : public std::enable_shared_from_this /// If the target doesn't exist, a new empty file is created. virtual Result> OpenAppendStream( const std::string& path) = 0; + + protected: + explicit FileSystem(const io::IOContext& io_context = io::default_io_context()) + : io_context_(io_context) {} + + io::IOContext io_context_; }; /// \brief A FileSystem implementation that delegates to another @@ -382,6 +391,12 @@ ARROW_EXPORT Result> FileSystemFromUri(const std::string& uri, std::string* out_path = NULLPTR); +/// XXX +ARROW_EXPORT +Result> FileSystemFromUri(const std::string& uri, + const io::IOContext& io_context, + std::string* out_path = NULLPTR); + /// \brief Create a new FileSystem by URI /// /// Same as FileSystemFromUri, but in addition also recognize non-URIs @@ -391,6 +406,11 @@ ARROW_EXPORT Result> FileSystemFromUriOrPath( const std::string& uri, std::string* out_path = NULLPTR); +ARROW_EXPORT +Result> FileSystemFromUriOrPath( + const std::string& uri, const io::IOContext& io_context, + std::string* out_path = NULLPTR); + /// @} /// \brief Copy files, including from one FileSystem to another @@ -401,6 +421,7 @@ Result> FileSystemFromUriOrPath( ARROW_EXPORT Status CopyFiles(const std::vector& sources, const std::vector& destinations, + const io::IOContext& io_context = io::default_io_context(), int64_t chunk_size = 1024 * 1024, bool use_threads = true); /// \brief Copy selected files, including from one FileSystem to another @@ -411,6 +432,7 @@ Status CopyFiles(const std::shared_ptr& source_fs, const FileSelector& source_sel, const std::shared_ptr& destination_fs, const std::string& destination_base_dir, + const io::IOContext& io_context = io::default_io_context(), int64_t chunk_size = 1024 * 1024, bool use_threads = true); struct FileSystemGlobalOptions { diff --git a/cpp/src/arrow/filesystem/hdfs.cc b/cpp/src/arrow/filesystem/hdfs.cc index 1841bf1ff6c..2fc549565cd 100644 --- a/cpp/src/arrow/filesystem/hdfs.cc +++ b/cpp/src/arrow/filesystem/hdfs.cc @@ -43,7 +43,8 @@ using internal::RemoveLeadingSlash; class HadoopFileSystem::Impl { public: - explicit Impl(HdfsOptions options) : options_(std::move(options)) {} + Impl(HdfsOptions options, const io::IOContext& io_context) + : options_(std::move(options)), io_context_(io_context) {} ~Impl() { Status st = Close(); @@ -205,13 +206,13 @@ class HadoopFileSystem::Impl { Result> OpenInputStream(const std::string& path) { std::shared_ptr file; - RETURN_NOT_OK(client_->OpenReadable(path, &file)); + RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file)); return file; } Result> OpenInputFile(const std::string& path) { std::shared_ptr file; - RETURN_NOT_OK(client_->OpenReadable(path, &file)); + RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file)); return file; } @@ -226,7 +227,8 @@ class HadoopFileSystem::Impl { } protected: - HdfsOptions options_; + const HdfsOptions options_; + const io::IOContext io_context_; std::shared_ptr<::arrow::io::HadoopFileSystem> client_; void PathInfoToFileInfo(const io::HdfsPathInfo& info, FileInfo* out) { @@ -393,14 +395,15 @@ Result HdfsOptions::FromUri(const std::string& uri_string) { return FromUri(uri); } -HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options) - : impl_(new Impl{options}) {} +HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options, + const io::IOContext& io_context) + : FileSystem(io_context), impl_(new Impl{options, io_context_}) {} HadoopFileSystem::~HadoopFileSystem() {} Result> HadoopFileSystem::Make( - const HdfsOptions& options) { - std::shared_ptr ptr(new HadoopFileSystem(options)); + const HdfsOptions& options, const io::IOContext& io_context) { + std::shared_ptr ptr(new HadoopFileSystem(options, io_context)); RETURN_NOT_OK(ptr->impl_->Init()); return ptr; } diff --git a/cpp/src/arrow/filesystem/hdfs.h b/cpp/src/arrow/filesystem/hdfs.h index 5f6340e79b3..72cb469b79d 100644 --- a/cpp/src/arrow/filesystem/hdfs.h +++ b/cpp/src/arrow/filesystem/hdfs.h @@ -97,10 +97,11 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { const std::string& path) override; /// Create a HdfsFileSystem instance from the given options. - static Result> Make(const HdfsOptions& options); + static Result> Make( + const HdfsOptions& options, const io::IOContext& = io::default_io_context()); protected: - explicit HadoopFileSystem(const HdfsOptions& options); + HadoopFileSystem(const HdfsOptions& options, const io::IOContext&); class Impl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index 88ce46137ec..490bacea413 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -264,10 +264,12 @@ Result LocalFileSystemOptions::FromUri( return LocalFileSystemOptions(); } -LocalFileSystem::LocalFileSystem() : options_(LocalFileSystemOptions::Defaults()) {} +LocalFileSystem::LocalFileSystem(const io::IOContext& io_context) + : FileSystem(io_context), options_(LocalFileSystemOptions::Defaults()) {} -LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options) - : options_(options) {} +LocalFileSystem::LocalFileSystem(const LocalFileSystemOptions& options, + const io::IOContext& io_context) + : FileSystem(io_context), options_(options) {} LocalFileSystem::~LocalFileSystem() {} @@ -378,7 +380,7 @@ Status LocalFileSystem::CopyFile(const std::string& src, const std::string& dest #else ARROW_ASSIGN_OR_RAISE(auto is, OpenInputStream(src)); ARROW_ASSIGN_OR_RAISE(auto os, OpenOutputStream(dest)); - RETURN_NOT_OK(internal::CopyStream(is, os, 1024 * 1024 /* chunk_size */)); + RETURN_NOT_OK(internal::CopyStream(is, os, 1024 * 1024 /* chunk_size */, io_context())); RETURN_NOT_OK(os->Close()); return is->Close(); #endif @@ -388,11 +390,12 @@ namespace { template Result> OpenInputStreamGeneric( - const std::string& path, const LocalFileSystemOptions& options) { + const std::string& path, const LocalFileSystemOptions& options, + const io::IOContext& io_context) { if (options.use_mmap) { return io::MemoryMappedFile::Open(path, io::FileMode::READ); } else { - return io::ReadableFile::Open(path); + return io::ReadableFile::Open(path, io_context.pool()); } } @@ -400,12 +403,12 @@ Result> OpenInputStreamGeneric( Result> LocalFileSystem::OpenInputStream( const std::string& path) { - return OpenInputStreamGeneric(path, options_); + return OpenInputStreamGeneric(path, options_, io_context()); } Result> LocalFileSystem::OpenInputFile( const std::string& path) { - return OpenInputStreamGeneric(path, options_); + return OpenInputStreamGeneric(path, options_, io_context()); } namespace { diff --git a/cpp/src/arrow/filesystem/localfs.h b/cpp/src/arrow/filesystem/localfs.h index add57c6d266..d660dd36a5d 100644 --- a/cpp/src/arrow/filesystem/localfs.h +++ b/cpp/src/arrow/filesystem/localfs.h @@ -55,8 +55,9 @@ struct ARROW_EXPORT LocalFileSystemOptions { /// followed, except when deleting an entry). class ARROW_EXPORT LocalFileSystem : public FileSystem { public: - LocalFileSystem(); - explicit LocalFileSystem(const LocalFileSystemOptions&); + explicit LocalFileSystem(const io::IOContext& = io::default_io_context()); + explicit LocalFileSystem(const LocalFileSystemOptions&, + const io::IOContext& = io::default_io_context()); ~LocalFileSystem() override; std::string type_name() const override { return "local"; } diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index dbe19a1f46f..e338160951a 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -73,6 +73,11 @@ Result> FSFromUri(const std::string& uri, return FileSystemFromUri(uri, out_path); } +Result> FSFromUriOrPath(const std::string& uri, + std::string* out_path = NULLPTR) { + return FileSystemFromUriOrPath(uri, out_path); +} + //////////////////////////////////////////////////////////////////////////// // Misc tests @@ -192,7 +197,7 @@ class TestLocalFS : public LocalFSTestMixin { } void TestFileSystemFromUriOrPath(const std::string& uri) { - CheckFileSystemFromUriFunc(uri, FileSystemFromUriOrPath); + CheckFileSystemFromUriFunc(uri, FSFromUriOrPath); } template @@ -213,7 +218,7 @@ class TestLocalFS : public LocalFSTestMixin { } void TestLocalUriOrPath(const std::string& uri, const std::string& expected_path) { - CheckLocalUri(uri, expected_path, FileSystemFromUriOrPath); + CheckLocalUri(uri, expected_path, FSFromUriOrPath); } void TestInvalidUri(const std::string& uri) { diff --git a/cpp/src/arrow/filesystem/mockfs.cc b/cpp/src/arrow/filesystem/mockfs.cc index 7cef8ac2b54..294cc85531a 100644 --- a/cpp/src/arrow/filesystem/mockfs.cc +++ b/cpp/src/arrow/filesystem/mockfs.cc @@ -25,12 +25,14 @@ #include #include "arrow/buffer.h" +#include "arrow/buffer_builder.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/util/logging.h" +#include "arrow/util/string_view.h" #include "arrow/util/variant.h" #include "arrow/util/windows_fixup.h" @@ -48,11 +50,19 @@ class Entry; struct File { TimePoint mtime; std::string name; - std::string data; + std::shared_ptr data; File(TimePoint mtime, std::string name) : mtime(mtime), name(std::move(name)) {} - int64_t size() const { return static_cast(data.length()); } + int64_t size() const { return data ? data->size() : 0; } + + explicit operator util::string_view() const { + if (data) { + return util::string_view(*data); + } else { + return ""; + } + } }; struct Directory { @@ -172,13 +182,17 @@ class Entry : public EntryBase { class MockFSOutputStream : public io::OutputStream { public: - explicit MockFSOutputStream(File* file) : file_(file), closed_(false) {} + MockFSOutputStream(File* file, MemoryPool* pool) + : file_(file), builder_(pool), closed_(false) {} ~MockFSOutputStream() override = default; // Implement the OutputStream interface Status Close() override { - closed_ = true; + if (!closed_) { + RETURN_NOT_OK(builder_.Finish(&file_->data)); + closed_ = true; + } return Status::OK(); } @@ -187,8 +201,8 @@ class MockFSOutputStream : public io::OutputStream { // MockFSOutputStream is mainly used for debugging and testing, so // mark an aborted file's contents explicitly. std::stringstream ss; - ss << "MockFSOutputStream aborted after " << file_->data.size() << " bytes written"; - file_->data = ss.str(); + ss << "MockFSOutputStream aborted after " << file_->size() << " bytes written"; + file_->data = Buffer::FromString(ss.str()); closed_ = true; } return Status::OK(); @@ -200,19 +214,19 @@ class MockFSOutputStream : public io::OutputStream { if (closed_) { return Status::Invalid("Invalid operation on closed stream"); } - return file_->size(); + return builder_.length(); } Status Write(const void* data, int64_t nbytes) override { if (closed_) { return Status::Invalid("Invalid operation on closed stream"); } - file_->data.append(reinterpret_cast(data), static_cast(nbytes)); - return Status::OK(); + return builder_.Append(data, nbytes); } protected: File* file_; + BufferBuilder builder_; bool closed_; }; @@ -234,12 +248,14 @@ std::ostream& operator<<(std::ostream& os, const MockFileInfo& di) { class MockFileSystem::Impl { public: TimePoint current_time; + MemoryPool* pool; + // The root directory Entry root; std::mutex mutex; - explicit Impl(TimePoint current_time) - : current_time(current_time), root(Directory("", current_time)) {} + Impl(TimePoint current_time, MemoryPool* pool) + : current_time(current_time), pool(pool), root(Directory("", current_time)) {} std::unique_lock lock_guard() { return std::unique_lock(mutex); @@ -333,7 +349,7 @@ class MockFileSystem::Impl { Entry* child = pair.second.get(); if (child->is_file()) { auto& file = child->as_file(); - out->push_back({path + file.name, file.mtime, file.data}); + out->push_back({path + file.name, file.mtime, util::string_view(file)}); } else if (child->is_dir()) { DumpFiles(path, child->as_dir(), out); } @@ -352,18 +368,22 @@ class MockFileSystem::Impl { // Find the file in the parent dir, or create it const auto& name = parts.back(); Entry* child = parent->as_dir().Find(name); + File* file; if (child == nullptr) { child = new Entry(File(current_time, name)); parent->as_dir().AssignEntry(name, std::unique_ptr(child)); + file = &child->as_file(); } else if (child->is_file()) { - child->as_file().mtime = current_time; - if (!append) { - child->as_file().data.clear(); - } + file = &child->as_file(); + file->mtime = current_time; } else { return NotAFile(path); } - return std::make_shared(&child->as_file()); + auto ptr = std::make_shared(file, pool); + if (append && file->data) { + RETURN_NOT_OK(ptr->Write(file->data->data(), file->data->size())); + } + return ptr; } Result> OpenInputReader(const std::string& path) { @@ -377,14 +397,19 @@ class MockFileSystem::Impl { if (!entry->is_file()) { return NotAFile(path); } - return std::make_shared(Buffer::FromString(entry->as_file().data)); + const auto& file = entry->as_file(); + if (file.data) { + return std::make_shared(file.data); + } else { + return std::make_shared(""); + } } }; MockFileSystem::~MockFileSystem() = default; -MockFileSystem::MockFileSystem(TimePoint current_time) { - impl_ = std::unique_ptr(new Impl(current_time)); +MockFileSystem::MockFileSystem(TimePoint current_time, const io::IOContext& io_context) { + impl_ = std::unique_ptr(new Impl(current_time, io_context.pool())); } bool MockFileSystem::Equals(const FileSystem& other) const { return this == &other; } @@ -689,7 +714,7 @@ std::vector MockFileSystem::AllFiles() { return result; } -Status MockFileSystem::CreateFile(const std::string& path, const std::string& contents, +Status MockFileSystem::CreateFile(const std::string& path, util::string_view contents, bool recursive) { auto parent = fs::internal::GetAbstractPathParent(path).first; diff --git a/cpp/src/arrow/filesystem/mockfs.h b/cpp/src/arrow/filesystem/mockfs.h index 847b4898ec7..212caf6d7fe 100644 --- a/cpp/src/arrow/filesystem/mockfs.h +++ b/cpp/src/arrow/filesystem/mockfs.h @@ -23,6 +23,7 @@ #include #include "arrow/filesystem/filesystem.h" +#include "arrow/util/string_view.h" #include "arrow/util/windows_fixup.h" namespace arrow { @@ -43,7 +44,7 @@ struct MockDirInfo { struct MockFileInfo { std::string full_path; TimePoint mtime; - std::string data; + util::string_view data; bool operator==(const MockFileInfo& other) const { return mtime == other.mtime && full_path == other.full_path && data == other.data; @@ -58,7 +59,8 @@ struct MockFileInfo { /// and bootstrapping FileSystem-based APIs. class ARROW_EXPORT MockFileSystem : public FileSystem { public: - explicit MockFileSystem(TimePoint current_time); + explicit MockFileSystem(TimePoint current_time, + const io::IOContext& = io::default_io_context()); ~MockFileSystem() override; std::string type_name() const override { return "mock"; } @@ -98,7 +100,7 @@ class ARROW_EXPORT MockFileSystem : public FileSystem { std::vector AllFiles(); // Create a File with a content from a string. - Status CreateFile(const std::string& path, const std::string& content, + Status CreateFile(const std::string& path, util::string_view content, bool recursive = true); // Create a MockFileSystem out of (empty) FileInfo. The content of every diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index cc8ae1148e0..d71e8537250 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -631,8 +631,13 @@ Result GetObjectRange(Aws::S3::S3Client* client, class ObjectInputFile final : public io::RandomAccessFile { public: ObjectInputFile(std::shared_ptr fs, Aws::S3::S3Client* client, - const S3Path& path, int64_t size = kNoSize) - : fs_(std::move(fs)), client_(client), path_(path), content_length_(size) {} + const io::IOContext& io_context, const S3Path& path, + int64_t size = kNoSize) + : fs_(std::move(fs)), + client_(client), + io_context_(io_context), + path_(path), + content_length_(size) {} Status Init() { // Issue a HEAD Object to get the content-length and ensure any @@ -735,7 +740,7 @@ class ObjectInputFile final : public io::RandomAccessFile { // No need to allocate more than the remaining number of bytes nbytes = std::min(nbytes, content_length_ - position); - ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool())); if (nbytes > 0) { ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buf->mutable_data())); @@ -760,7 +765,9 @@ class ObjectInputFile final : public io::RandomAccessFile { protected: std::shared_ptr fs_; // Owner of S3Client Aws::S3::S3Client* client_; + const io::IOContext io_context_; S3Path path_; + bool closed_ = false; int64_t pos_ = 0; int64_t content_length_ = kNoSize; @@ -779,8 +786,13 @@ class ObjectOutputStream final : public io::OutputStream { public: ObjectOutputStream(std::shared_ptr fs, Aws::S3::S3Client* client, - const S3Path& path, const S3Options& options) - : fs_(std::move(fs)), client_(client), path_(path), options_(options) {} + const io::IOContext& io_context, const S3Path& path, + const S3Options& options) + : fs_(std::move(fs)), + client_(client), + io_context_(io_context), + path_(path), + options_(options) {} ~ObjectOutputStream() override { // For compliance with the rest of the IO stack, Close rather than Abort, @@ -910,8 +922,9 @@ class ObjectOutputStream final : public io::OutputStream { } // Can't upload data on its own, need to buffer it if (!current_part_) { - ARROW_ASSIGN_OR_RAISE(current_part_, - io::BufferOutputStream::Create(part_upload_threshold_)); + ARROW_ASSIGN_OR_RAISE( + current_part_, + io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool())); current_part_size_ = 0; } RETURN_NOT_OK(current_part_->Write(data, nbytes)); @@ -974,7 +987,7 @@ class ObjectOutputStream final : public io::OutputStream { // If the data isn't owned, make an immutable copy for the lifetime of the closure if (owned_buffer == nullptr) { - ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); memcpy(owned_buffer->mutable_data(), data, nbytes); } else { DCHECK_EQ(data, owned_buffer->data()); @@ -1048,8 +1061,10 @@ class ObjectOutputStream final : public io::OutputStream { protected: std::shared_ptr fs_; // Owner of S3Client Aws::S3::S3Client* client_; + const io::IOContext io_context_; S3Path path_; const S3Options& options_; + Aws::String upload_id_; bool closed_ = true; int64_t pos_ = 0; @@ -1586,8 +1601,8 @@ class S3FileSystem::Impl { ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); RETURN_NOT_OK(ValidateFilePath(path)); - auto ptr = - std::make_shared(fs->shared_from_this(), client_.get(), path); + auto ptr = std::make_shared(fs->shared_from_this(), client_.get(), + fs->io_context(), path); RETURN_NOT_OK(ptr->Init()); return ptr; } @@ -1605,20 +1620,22 @@ class S3FileSystem::Impl { RETURN_NOT_OK(ValidateFilePath(path)); auto ptr = std::make_shared(fs->shared_from_this(), client_.get(), - path, info.size()); + fs->io_context(), path, info.size()); RETURN_NOT_OK(ptr->Init()); return ptr; } }; -S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {} +S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context) + : FileSystem(io_context), impl_(new Impl{options}) {} S3FileSystem::~S3FileSystem() {} -Result> S3FileSystem::Make(const S3Options& options) { +Result> S3FileSystem::Make( + const S3Options& options, const io::IOContext& io_context) { RETURN_NOT_OK(CheckS3Initialized()); - std::shared_ptr ptr(new S3FileSystem(options)); + std::shared_ptr ptr(new S3FileSystem(options, io_context)); RETURN_NOT_OK(ptr->impl_->Init()); return ptr; } @@ -1890,7 +1907,7 @@ Result> S3FileSystem::OpenOutputStream( RETURN_NOT_OK(ValidateFilePath(path)); auto ptr = std::make_shared( - shared_from_this(), impl_->client_.get(), path, impl_->options()); + shared_from_this(), impl_->client_.get(), io_context(), path, impl_->options()); RETURN_NOT_OK(ptr->Init()); return ptr; } diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index bd8f1fadbb9..ac384fcba71 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -199,10 +199,11 @@ class ARROW_EXPORT S3FileSystem : public FileSystem { const std::string& path) override; /// Create a S3FileSystem instance from the given options. - static Result> Make(const S3Options& options); + static Result> Make( + const S3Options& options, const io::IOContext& = io::default_io_context()); protected: - explicit S3FileSystem(const S3Options& options); + explicit S3FileSystem(const S3Options& options, const io::IOContext&); class Impl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/filesystem/util_internal.cc b/cpp/src/arrow/filesystem/util_internal.cc index a9c6a1c2120..8f86707375d 100644 --- a/cpp/src/arrow/filesystem/util_internal.cc +++ b/cpp/src/arrow/filesystem/util_internal.cc @@ -31,8 +31,9 @@ TimePoint CurrentTimePoint() { } Status CopyStream(const std::shared_ptr& src, - const std::shared_ptr& dest, int64_t chunk_size) { - ARROW_ASSIGN_OR_RAISE(auto chunk, AllocateBuffer(chunk_size)); + const std::shared_ptr& dest, int64_t chunk_size, + const io::IOContext& io_context) { + ARROW_ASSIGN_OR_RAISE(auto chunk, AllocateBuffer(chunk_size, io_context.pool())); while (true) { ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, diff --git a/cpp/src/arrow/filesystem/util_internal.h b/cpp/src/arrow/filesystem/util_internal.h index ffcb2b2a82e..915c8d03d46 100644 --- a/cpp/src/arrow/filesystem/util_internal.h +++ b/cpp/src/arrow/filesystem/util_internal.h @@ -34,7 +34,8 @@ TimePoint CurrentTimePoint(); ARROW_EXPORT Status CopyStream(const std::shared_ptr& src, - const std::shared_ptr& dest, int64_t chunk_size); + const std::shared_ptr& dest, int64_t chunk_size, + const io::IOContext& io_context); ARROW_EXPORT Status PathNotFound(const std::string& path); diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc index a306ca7d286..31a426dffd7 100644 --- a/cpp/src/arrow/io/caching.cc +++ b/cpp/src/arrow/io/caching.cc @@ -131,7 +131,7 @@ struct RangeCacheEntry { struct ReadRangeCache::Impl { std::shared_ptr file; - AsyncContext ctx; + IOContext ctx; CacheOptions options; // Ordered by offset (so as to find a matching region by binary search) @@ -150,7 +150,7 @@ struct ReadRangeCache::Impl { } }; -ReadRangeCache::ReadRangeCache(std::shared_ptr file, AsyncContext ctx, +ReadRangeCache::ReadRangeCache(std::shared_ptr file, IOContext ctx, CacheOptions options) : impl_(new Impl()) { impl_->file = std::move(file); diff --git a/cpp/src/arrow/io/caching.h b/cpp/src/arrow/io/caching.h index fd2a652369e..089c1c554dc 100644 --- a/cpp/src/arrow/io/caching.h +++ b/cpp/src/arrow/io/caching.h @@ -82,11 +82,11 @@ class ARROW_EXPORT ReadRangeCache { static constexpr int64_t kDefaultRangeSizeLimit = 32 * 1024 * 1024; /// Construct a read cache with default - explicit ReadRangeCache(std::shared_ptr file, AsyncContext ctx) + explicit ReadRangeCache(std::shared_ptr file, IOContext ctx) : ReadRangeCache(file, std::move(ctx), CacheOptions::Defaults()) {} /// Construct a read cache with given options - explicit ReadRangeCache(std::shared_ptr file, AsyncContext ctx, + explicit ReadRangeCache(std::shared_ptr file, IOContext ctx, CacheOptions options); ~ReadRangeCache(); diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index dbc2af89ea0..8a4976db4aa 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -699,7 +699,7 @@ Result> MemoryMappedFile::Read(int64_t nbytes) { return buffer; } -Future> MemoryMappedFile::ReadAsync(const AsyncContext&, +Future> MemoryMappedFile::ReadAsync(const IOContext&, int64_t position, int64_t nbytes) { return Future>::MakeFinished(ReadAt(position, nbytes)); diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 87bb8b9f81c..50d4f2c4dfc 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -185,7 +185,7 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { Result ReadAt(int64_t position, int64_t nbytes, void* out) override; // Synchronous ReadAsync override - Future> ReadAsync(const AsyncContext&, int64_t position, + Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes) override; Status WillNeed(const std::vector& ranges) override; diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index f4c391d3de1..af91b35ed3c 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -222,11 +222,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int32_t buffer_size_; }; -HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { - if (pool == nullptr) { - pool = default_memory_pool(); - } - impl_.reset(new HdfsReadableFileImpl(pool)); +HdfsReadableFile::HdfsReadableFile(const io::IOContext& io_context) { + impl_.reset(new HdfsReadableFileImpl(io_context.pool())); } HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); } @@ -498,6 +495,7 @@ class HadoopFileSystem::HadoopFileSystemImpl { } Status OpenReadable(const std::string& path, int32_t buffer_size, + const io::IOContext& io_context, std::shared_ptr* file) { hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); @@ -508,7 +506,7 @@ class HadoopFileSystem::HadoopFileSystemImpl { } // std::make_shared does not work with private ctors - *file = std::shared_ptr(new HdfsReadableFile()); + *file = std::shared_ptr(new HdfsReadableFile(io_context)); (*file)->impl_->set_members(path, driver_, fs_, handle); (*file)->impl_->set_buffer_size(buffer_size); @@ -627,12 +625,24 @@ Status HadoopFileSystem::ListDirectory(const std::string& path, Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, std::shared_ptr* file) { - return impl_->OpenReadable(path, buffer_size, file); + return impl_->OpenReadable(path, buffer_size, io::default_io_context(), file); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, + std::shared_ptr* file) { + return OpenReadable(path, kDefaultHdfsBufferSize, io::default_io_context(), file); +} + +Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, + const io::IOContext& io_context, + std::shared_ptr* file) { + return impl_->OpenReadable(path, buffer_size, io_context, file); } Status HadoopFileSystem::OpenReadable(const std::string& path, + const io::IOContext& io_context, std::shared_ptr* file) { - return OpenReadable(path, kDefaultHdfsBufferSize, file); + return OpenReadable(path, kDefaultHdfsBufferSize, io_context, file); } Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index f91dfb618e6..21b0cd8a282 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -184,8 +184,15 @@ class ARROW_EXPORT HadoopFileSystem : public FileSystem { Status OpenReadable(const std::string& path, int32_t buffer_size, std::shared_ptr* file); + Status OpenReadable(const std::string& path, int32_t buffer_size, + const io::IOContext& io_context, + std::shared_ptr* file); + Status OpenReadable(const std::string& path, std::shared_ptr* file); + Status OpenReadable(const std::string& path, const io::IOContext& io_context, + std::shared_ptr* file); + // FileMode::WRITE options // @param path complete file path // @param buffer_size 0 by default @@ -228,10 +235,8 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile { Result Tell() const override; Result GetSize() override; - void set_memory_pool(MemoryPool* pool); - private: - explicit HdfsReadableFile(MemoryPool* pool = NULLPTR); + explicit HdfsReadableFile(const io::IOContext&); class ARROW_NO_EXPORT HdfsReadableFileImpl; std::unique_ptr impl_; diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 309d487c52c..22abbb27bce 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -46,18 +46,22 @@ using internal::ThreadPool; namespace io { -AsyncContext::AsyncContext() : AsyncContext(internal::GetIOThreadPool()) {} +static IOContext g_default_io_context{}; -AsyncContext::AsyncContext(Executor* executor) : executor(executor) {} +IOContext::IOContext(MemoryPool* pool) : IOContext(pool, internal::GetIOThreadPool()) {} + +const IOContext& default_io_context() { return g_default_io_context; } FileInterface::~FileInterface() = default; Status FileInterface::Abort() { return Close(); } +namespace { + class InputStreamBlockIterator { public: InputStreamBlockIterator(std::shared_ptr stream, int64_t block_size) - : stream_(stream), block_size_(block_size) {} + : stream_(std::move(stream)), block_size_(block_size) {} Result> Next() { if (done_) { @@ -81,6 +85,10 @@ class InputStreamBlockIterator { bool done_ = false; }; +} // namespace + +const IOContext& Readable::io_context() const { return g_default_io_context; } + Status InputStream::Advance(int64_t nbytes) { return Read(nbytes).status(); } Result InputStream::Peek(int64_t ARROW_ARG_UNUSED(nbytes)) { @@ -98,14 +106,13 @@ Result>> MakeInputStreamIterator( return Iterator>(InputStreamBlockIterator(stream, block_size)); } -struct RandomAccessFile::RandomAccessFileImpl { +struct RandomAccessFile::Impl { std::mutex lock_; }; RandomAccessFile::~RandomAccessFile() = default; -RandomAccessFile::RandomAccessFile() - : interface_impl_(new RandomAccessFile::RandomAccessFileImpl()) {} +RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {} Result RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, void* out) { std::lock_guard lock(interface_impl_->lock_); @@ -121,25 +128,30 @@ Result> RandomAccessFile::ReadAt(int64_t position, } // Default ReadAsync() implementation: simply issue the read on the context's executor -Future> RandomAccessFile::ReadAsync(const AsyncContext& ctx, +Future> RandomAccessFile::ReadAsync(const IOContext& ctx, int64_t position, int64_t nbytes) { auto self = shared_from_this(); TaskHints hints; hints.io_size = nbytes; - hints.external_id = ctx.external_id; - return DeferNotOk(ctx.executor->Submit(std::move(hints), [self, position, nbytes] { + hints.external_id = ctx.external_id(); + return DeferNotOk(ctx.executor()->Submit(std::move(hints), [self, position, nbytes] { return self->ReadAt(position, nbytes); })); } +Future> RandomAccessFile::ReadAsync(int64_t position, + int64_t nbytes) { + return ReadAsync(io_context(), position, nbytes); +} + // Default WillNeed() implementation: no-op Status RandomAccessFile::WillNeed(const std::vector& ranges) { return Status::OK(); } -Status Writable::Write(const std::string& data) { - return Write(data.c_str(), static_cast(data.size())); +Status Writable::Write(util::string_view data) { + return Write(data.data(), static_cast(data.size())); } Status Writable::Write(const std::shared_ptr& data) { diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index b5a1f1220f6..3b6d9c6477a 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -49,16 +49,36 @@ struct ReadRange { }; // EXPERIMENTAL -struct ARROW_EXPORT AsyncContext { - ::arrow::internal::Executor* executor; +struct ARROW_EXPORT IOContext { + // No specified executor: will use a global IO thread pool + IOContext() : IOContext(default_memory_pool()) {} + + // No specified executor: will use a global IO thread pool + explicit IOContext(MemoryPool* pool); + + explicit IOContext(MemoryPool* pool, ::arrow::internal::Executor* executor, + int64_t external_id = -1) + : pool_(pool), executor_(executor), external_id_(external_id) {} + + explicit IOContext(::arrow::internal::Executor* executor, int64_t external_id = -1) + : pool_(default_memory_pool()), executor_(executor), external_id_(external_id) {} + + MemoryPool* pool() const { return pool_; } + + ::arrow::internal::Executor* executor() const { return executor_; } + // An application-specific ID, forwarded to executor task submissions - int64_t external_id = -1; + int64_t external_id() const { return external_id_; } - // Set `executor` to a global IO-specific thread pool. - AsyncContext(); - explicit AsyncContext(::arrow::internal::Executor* executor); + private: + MemoryPool* pool_; + ::arrow::internal::Executor* executor_; + int64_t external_id_; }; +// Deprecated name (renamed to IOContext in 4.0.0) +using AsyncContext = IOContext; + class ARROW_EXPORT FileInterface { public: virtual ~FileInterface() = 0; @@ -127,7 +147,7 @@ class ARROW_EXPORT Writable { /// \brief Flush buffered bytes, if any virtual Status Flush(); - Status Write(const std::string& data); + Status Write(util::string_view data); }; class ARROW_EXPORT Readable { @@ -148,6 +168,12 @@ class ARROW_EXPORT Readable { /// In some cases (e.g. a memory-mapped file), this method may avoid a /// memory copy. virtual Result> Read(int64_t nbytes) = 0; + + /// EXPERIMENTAL: The IOContext associated with this file. + /// + /// By default, this is the same as default_io_context(), but it may be + /// overriden by subclasses. + virtual const IOContext& io_context() const; }; class ARROW_EXPORT OutputStream : virtual public FileInterface, public Writable { @@ -234,9 +260,12 @@ class ARROW_EXPORT RandomAccessFile virtual Result> ReadAt(int64_t position, int64_t nbytes); /// EXPERIMENTAL: Read data asynchronously. - virtual Future> ReadAsync(const AsyncContext&, int64_t position, + virtual Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes); + /// EXPERIMENTAL: Read data asynchronously, using the file's IOContext. + Future> ReadAsync(int64_t position, int64_t nbytes); + /// EXPERIMENTAL: Inform that the given ranges may be read soon. /// /// Some implementations might arrange to prefetch some of the data. @@ -248,8 +277,8 @@ class ARROW_EXPORT RandomAccessFile RandomAccessFile(); private: - struct ARROW_NO_EXPORT RandomAccessFileImpl; - std::unique_ptr interface_impl_; + struct ARROW_NO_EXPORT Impl; + std::unique_ptr interface_impl_; }; class ARROW_EXPORT WritableFile : public OutputStream, public Seekable { diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 1ac435ab642..a953c8f28a7 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -320,7 +320,7 @@ Status BufferReader::WillNeed(const std::vector& ranges) { return st; } -Future> BufferReader::ReadAsync(const AsyncContext&, +Future> BufferReader::ReadAsync(const IOContext&, int64_t position, int64_t nbytes) { return Future>::MakeFinished(DoReadAt(position, nbytes)); diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 075398a180b..bfebe9945f8 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -160,7 +160,7 @@ class ARROW_EXPORT BufferReader std::shared_ptr buffer() const { return buffer_; } // Synchronous ReadAsync override - Future> ReadAsync(const AsyncContext&, int64_t position, + Future> ReadAsync(const IOContext&, int64_t position, int64_t nbytes) override; Status WillNeed(const std::vector& ranges) override; diff --git a/cpp/src/arrow/io/type_fwd.h b/cpp/src/arrow/io/type_fwd.h index 130ced9db67..041b825c988 100644 --- a/cpp/src/arrow/io/type_fwd.h +++ b/cpp/src/arrow/io/type_fwd.h @@ -17,6 +17,8 @@ #pragma once +#include "arrow/util/visibility.h" + namespace arrow { namespace io { @@ -24,6 +26,12 @@ struct FileMode { enum type { READ, WRITE, READWRITE }; }; +struct IOContext; + +/// EXPERIMENTAL: convenience global singleton for default IOContext settings +ARROW_EXPORT +const IOContext& default_io_context(); + class FileInterface; class Seekable; class Writable; diff --git a/cpp/src/arrow/ipc/type_fwd.h b/cpp/src/arrow/ipc/type_fwd.h index bef9776c6a0..d3f5c5b82e4 100644 --- a/cpp/src/arrow/ipc/type_fwd.h +++ b/cpp/src/arrow/ipc/type_fwd.h @@ -47,6 +47,9 @@ enum class MessageType { SPARSE_TENSOR }; +struct IpcReadOptions; +struct IpcWriteOptions; + class MessageReader; class RecordBatchStreamReader; diff --git a/cpp/src/arrow/util/parallel.h b/cpp/src/arrow/util/parallel.h index e2c87a534a6..e56a71b91af 100644 --- a/cpp/src/arrow/util/parallel.h +++ b/cpp/src/arrow/util/parallel.h @@ -30,12 +30,12 @@ namespace internal { // arguments between 0 and `num_tasks - 1`, on an arbitrary number of threads. template -Status ParallelFor(int num_tasks, FUNCTION&& func) { - auto pool = internal::GetCpuThreadPool(); +Status ParallelFor(int num_tasks, FUNCTION&& func, + Executor* executor = internal::GetCpuThreadPool()) { std::vector> futures(num_tasks); for (int i = 0; i < num_tasks; ++i) { - ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(func, i)); + ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i)); } auto st = Status::OK(); for (auto& fut : futures) { @@ -49,9 +49,10 @@ Status ParallelFor(int num_tasks, FUNCTION&& func) { // depending on the input boolean. template -Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func) { +Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func, + Executor* executor = internal::GetCpuThreadPool()) { if (use_threads) { - return ParallelFor(num_tasks, std::forward(func)); + return ParallelFor(num_tasks, std::forward(func), executor); } else { for (int i = 0; i < num_tasks; ++i) { RETURN_NOT_OK(func(i)); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index e784d391016..1e66d5c52c0 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -890,7 +890,7 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, if (reader_properties_.pre_buffer()) { // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled BEGIN_PARQUET_CATCH_EXCEPTIONS - reader_->PreBuffer(row_groups, column_indices, reader_properties_.async_context(), + reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(), reader_properties_.cache_options()); END_PARQUET_CATCH_EXCEPTIONS } @@ -990,7 +990,7 @@ Status FileReaderImpl::ReadRowGroups(const std::vector& row_groups, if (reader_properties_.pre_buffer()) { BEGIN_PARQUET_CATCH_EXCEPTIONS parquet_reader()->PreBuffer(row_groups, column_indices, - reader_properties_.async_context(), + reader_properties_.io_context(), reader_properties_.cache_options()); END_PARQUET_CATCH_EXCEPTIONS } diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index 39ef337c3eb..730f5b9fb9b 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -252,7 +252,7 @@ class SerializedFile : public ParquetFileReader::Contents { void PreBuffer(const std::vector& row_groups, const std::vector& column_indices, - const ::arrow::io::AsyncContext& ctx, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options) { cached_source_ = std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options); @@ -595,7 +595,7 @@ std::shared_ptr ParquetFileReader::RowGroup(int i) { void ParquetFileReader::PreBuffer(const std::vector& row_groups, const std::vector& column_indices, - const ::arrow::io::AsyncContext& ctx, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options) { // Access private methods here SerializedFile* file = diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h index 79af3cd2b35..12c28783291 100644 --- a/cpp/src/parquet/file_reader.h +++ b/cpp/src/parquet/file_reader.h @@ -138,7 +138,7 @@ class PARQUET_EXPORT ParquetFileReader { /// only one row group at a time may be useful. void PreBuffer(const std::vector& row_groups, const std::vector& column_indices, - const ::arrow::io::AsyncContext& ctx, + const ::arrow::io::IOContext& ctx, const ::arrow::io::CacheOptions& options); private: diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index f0422012122..be17f447a38 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -616,16 +616,16 @@ class PARQUET_EXPORT ArrowReaderProperties { ::arrow::io::CacheOptions cache_options() const { return cache_options_; } /// Set execution context for read coalescing. - void set_async_context(::arrow::io::AsyncContext ctx) { async_context_ = ctx; } + void set_io_context(const ::arrow::io::IOContext& ctx) { io_context_ = ctx; } - ::arrow::io::AsyncContext async_context() const { return async_context_; } + const ::arrow::io::IOContext& io_context() const { return io_context_; } private: bool use_threads_; std::unordered_set read_dict_indices_; int64_t batch_size_; bool pre_buffer_; - ::arrow::io::AsyncContext async_context_; + ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; }; diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index 066e5b540f2..7dcf85dca78 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -23,6 +23,7 @@ #include namespace fs = ::arrow::fs; +namespace io = ::arrow::io; namespace cpp11 { @@ -268,7 +269,7 @@ void fs___CopyFiles(const std::shared_ptr& source_fs, const std::string& destination_base_dir, int64_t chunk_size = 1024 * 1024, bool use_threads = true) { StopIfNotOk(fs::CopyFiles(source_fs, *source_sel, destination_fs, destination_base_dir, - chunk_size, use_threads)); + io::IOContext{}, chunk_size, use_threads)); } #endif From d750f4ca6970cb448418ff949e71979c8f92d19c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 25 Feb 2021 15:10:43 +0100 Subject: [PATCH 2/3] Fix reversed CPU and IO executor in CSV reader --- cpp/src/arrow/csv/reader.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 560382c6d6b..278acffbde4 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -942,9 +942,11 @@ Result> TableReader::Make( const ConvertOptions& convert_options) { std::shared_ptr reader; if (read_options.use_threads) { - reader = std::make_shared( - pool, input, read_options, parse_options, convert_options, io_context.executor(), - internal::GetCpuThreadPool()); + auto cpu_executor = internal::GetCpuThreadPool(); + auto io_executor = io_context.executor(); + reader = std::make_shared(pool, input, read_options, + parse_options, convert_options, + cpu_executor, io_executor); } else { reader = std::make_shared(pool, input, read_options, parse_options, convert_options); From da3ece91279509cd21130c7fee24a73e4b81eb4f Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 25 Feb 2021 20:14:17 +0100 Subject: [PATCH 3/3] Address review comments --- c_glib/arrow-glib/reader.cpp | 3 +-- cpp/examples/minimal_build/example.cc | 2 +- cpp/src/arrow/csv/reader.cc | 28 +++++++++++++++++++++------ cpp/src/arrow/csv/reader.h | 7 +++++++ cpp/src/arrow/csv/reader_test.cc | 11 +++++------ cpp/src/arrow/filesystem/filesystem.h | 14 +++++++++++++- cpp/src/arrow/io/interfaces.h | 12 +++++++++--- docs/source/cpp/csv.rst | 6 ++---- python/pyarrow/_csv.pyx | 5 ++--- python/pyarrow/includes/libarrow.pxd | 9 ++++++--- r/src/csv.cpp | 6 +++--- r/src/filesystem.cpp | 2 +- 12 files changed, 72 insertions(+), 33 deletions(-) diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp index 17100e76a3c..db6fa544069 100644 --- a/c_glib/arrow-glib/reader.cpp +++ b/c_glib/arrow-glib/reader.cpp @@ -1591,8 +1591,7 @@ garrow_csv_reader_new(GArrowInputStream *input, } auto arrow_reader = - arrow::csv::TableReader::Make(arrow::default_memory_pool(), - arrow::io::AsyncContext(), + arrow::csv::TableReader::Make(arrow::io::default_io_context(), arrow_input, read_options, parse_options, diff --git a/cpp/examples/minimal_build/example.cc b/cpp/examples/minimal_build/example.cc index 8f58de5777a..e1b5c123a85 100644 --- a/cpp/examples/minimal_build/example.cc +++ b/cpp/examples/minimal_build/example.cc @@ -39,7 +39,7 @@ Status RunMain(int argc, char** argv) { ARROW_ASSIGN_OR_RAISE( auto csv_reader, arrow::csv::TableReader::Make(arrow::default_memory_pool(), - arrow::io::AsyncContext(), + arrow::io::default_io_context(), input_file, arrow::csv::ReadOptions::Defaults(), arrow::csv::ParseOptions::Defaults(), diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 278acffbde4..bbba60c79c1 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -931,12 +931,7 @@ class AsyncThreadedTableReader AsyncGenerator> buffer_generator_; }; -} // namespace - -///////////////////////////////////////////////////////////////////////// -// Factory functions - -Result> TableReader::Make( +Result> MakeTableReader( MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options) { @@ -955,6 +950,27 @@ Result> TableReader::Make( return reader; } +} // namespace + +///////////////////////////////////////////////////////////////////////// +// Factory functions + +Result> TableReader::Make( + io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options) { + return MakeTableReader(io_context.pool(), io_context, std::move(input), read_options, + parse_options, convert_options); +} + +Result> TableReader::Make( + MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, + const ReadOptions& read_options, const ParseOptions& parse_options, + const ConvertOptions& convert_options) { + return MakeTableReader(pool, io_context, std::move(input), read_options, parse_options, + convert_options); +} + Result> StreamingReader::Make( MemoryPool* pool, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h index 8aba3c45a75..b18dc04eb65 100644 --- a/cpp/src/arrow/csv/reader.h +++ b/cpp/src/arrow/csv/reader.h @@ -46,6 +46,13 @@ class ARROW_EXPORT TableReader { virtual Future> ReadAsync() = 0; /// Create a TableReader instance + static Result> Make(io::IOContext io_context, + std::shared_ptr input, + const ReadOptions&, + const ParseOptions&, + const ConvertOptions&); + + ARROW_DEPRECATED("Use MemoryPool-less overload (the IOContext holds a pool already)") static Result> Make( MemoryPool* pool, io::IOContext io_context, std::shared_ptr input, const ReadOptions&, const ParseOptions&, const ConvertOptions&); diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index a3164930684..602adf2f2a6 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -108,9 +108,8 @@ TableReaderFactory MakeSerialFactory() { auto read_options = ReadOptions::Defaults(); read_options.block_size = 1 << 10; read_options.use_threads = false; - return TableReader::Make(default_memory_pool(), io::IOContext(), input_stream, - read_options, ParseOptions::Defaults(), - ConvertOptions::Defaults()); + return TableReader::Make(io::default_io_context(), input_stream, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults()); }; } @@ -131,9 +130,9 @@ Result MakeAsyncFactory( ReadOptions read_options = ReadOptions::Defaults(); read_options.use_threads = true; read_options.block_size = 1 << 10; - auto table_reader = TableReader::Make( - default_memory_pool(), io::IOContext(thread_pool.get()), input_stream, - read_options, ParseOptions::Defaults(), ConvertOptions::Defaults()); + auto table_reader = + TableReader::Make(io::IOContext(thread_pool.get()), input_stream, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults()); return table_reader; }; } diff --git a/cpp/src/arrow/filesystem/filesystem.h b/cpp/src/arrow/filesystem/filesystem.h index fd7f448b9b1..9d7aca98529 100644 --- a/cpp/src/arrow/filesystem/filesystem.h +++ b/cpp/src/arrow/filesystem/filesystem.h @@ -391,7 +391,14 @@ ARROW_EXPORT Result> FileSystemFromUri(const std::string& uri, std::string* out_path = NULLPTR); -/// XXX +/// \brief Create a new FileSystem by URI with a custom IO context +/// +/// Recognized schemes are "file", "mock", "hdfs" and "s3fs". +/// +/// \param[in] uri a URI-based path, ex: file:///some/local/path +/// \param[in] io_context an IOContext which will be associated with the filesystem +/// \param[out] out_path (optional) Path inside the filesystem. +/// \return out_fs FileSystem instance. ARROW_EXPORT Result> FileSystemFromUri(const std::string& uri, const io::IOContext& io_context, @@ -406,6 +413,11 @@ ARROW_EXPORT Result> FileSystemFromUriOrPath( const std::string& uri, std::string* out_path = NULLPTR); +/// \brief Create a new FileSystem by URI with a custom IO context +/// +/// Same as FileSystemFromUri, but in addition also recognize non-URIs +/// and treat them as local filesystem paths. Only absolute local filesystem +/// paths are allowed. ARROW_EXPORT Result> FileSystemFromUriOrPath( const std::string& uri, const io::IOContext& io_context, diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 3b6d9c6477a..07c01324ea1 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -48,7 +48,12 @@ struct ReadRange { } }; -// EXPERIMENTAL +/// EXPERIMENTAL: options provider for IO tasks +/// +/// Includes an Executor (which will be used to execute asynchronous reads), +/// a MemoryPool (which will be used to allocate buffers when zero copy reads +/// are not possible), and an external id (in case the executor receives tasks from +/// multiple sources and must distinguish tasks associated with this IOContext). struct ARROW_EXPORT IOContext { // No specified executor: will use a global IO thread pool IOContext() : IOContext(default_memory_pool()) {} @@ -76,8 +81,9 @@ struct ARROW_EXPORT IOContext { int64_t external_id_; }; -// Deprecated name (renamed to IOContext in 4.0.0) -using AsyncContext = IOContext; +struct ARROW_DEPRECATED("renamed to IOContext in 4.0.0") AsyncContext : public IOContext { + using IOContext::IOContext; +}; class ARROW_EXPORT FileInterface { public: diff --git a/docs/source/cpp/csv.rst b/docs/source/cpp/csv.rst index 44dc1498f18..f8a508c3f94 100644 --- a/docs/source/cpp/csv.rst +++ b/docs/source/cpp/csv.rst @@ -41,8 +41,7 @@ A CSV file is read from a :class:`~arrow::io::InputStream`. { // ... - arrow::MemoryPool* pool = default_memory_pool(); - arrow::io::AsyncContext async_context; + arrow::io::IOContext io_context = arrow::io::default_io_context(); std::shared_ptr input = ...; auto read_options = arrow::csv::ReadOptions::Defaults(); @@ -51,8 +50,7 @@ A CSV file is read from a :class:`~arrow::io::InputStream`. // Instantiate TableReader from input stream and options auto maybe_reader = - arrow::csv::TableReader::Make(pool, - async_context, + arrow::csv::TableReader::Make(io_context, input, read_options, parse_options, diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 4068a0b9141..9b88d7d5277 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -700,7 +700,6 @@ def read_csv(input_file, read_options=None, parse_options=None, CCSVConvertOptions c_convert_options shared_ptr[CCSVReader] reader shared_ptr[CTable] table - CAsyncContext c_async_ctx = CAsyncContext() _get_reader(input_file, read_options, &stream) _get_read_options(read_options, &c_read_options) @@ -708,8 +707,8 @@ def read_csv(input_file, read_options=None, parse_options=None, _get_convert_options(convert_options, &c_convert_options) reader = GetResultValue(CCSVReader.Make( - maybe_unbox_memory_pool(memory_pool), c_async_ctx, stream, - c_read_options, c_parse_options, c_convert_options)) + CIOContext(maybe_unbox_memory_pool(memory_pool)), + stream, c_read_options, c_parse_options, c_convert_options)) with nogil: table = GetResultValue(reader.get().Read()) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index ba3c3ad7d2b..3f9aa8b6881 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1140,8 +1140,11 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: ObjectType_FILE" arrow::io::ObjectType::FILE" ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" - cdef cppclass CAsyncContext" arrow::io::AsyncContext": - CAsyncContext() + cdef cppclass CIOContext" arrow::io::IOContext": + CIOContext() + CIOContext(CMemoryPool*) + + CIOContext c_default_io_context "arrow::io::default_io_context"() cdef cppclass FileStatistics: int64_t size @@ -1621,7 +1624,7 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: cdef cppclass CCSVReader" arrow::csv::TableReader": @staticmethod CResult[shared_ptr[CCSVReader]] Make( - CMemoryPool*, CAsyncContext, shared_ptr[CInputStream], + CIOContext, shared_ptr[CInputStream], CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions) CResult[shared_ptr[CTable]] Read() diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 69b834a6be0..0ce4cd699f8 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -141,9 +141,9 @@ std::shared_ptr csv___TableReader__Make( const std::shared_ptr& read_options, const std::shared_ptr& parse_options, const std::shared_ptr& convert_options) { - return ValueOrStop( - arrow::csv::TableReader::Make(gc_memory_pool(), arrow::io::AsyncContext(), input, - *read_options, *parse_options, *convert_options)); + return ValueOrStop(arrow::csv::TableReader::Make(arrow::io::IOContext(gc_memory_pool()), + input, *read_options, *parse_options, + *convert_options)); } // [[arrow::export]] diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index 7dcf85dca78..fced8abdd07 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -269,7 +269,7 @@ void fs___CopyFiles(const std::shared_ptr& source_fs, const std::string& destination_base_dir, int64_t chunk_size = 1024 * 1024, bool use_threads = true) { StopIfNotOk(fs::CopyFiles(source_fs, *source_sel, destination_fs, destination_base_dir, - io::IOContext{}, chunk_size, use_threads)); + io::default_io_context(), chunk_size, use_threads)); } #endif