From 44518138644388412bf0bb10c6800678b913cb53 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 11 Jan 2022 10:14:20 -0500 Subject: [PATCH 1/3] ARROW-15300: [C++] Update Skyhook for async dataset interfaces --- cpp/src/skyhook/client/file_skyhook.cc | 110 +++++++++++------------- cpp/src/skyhook/client/file_skyhook.h | 11 ++- cpp/src/skyhook/cls/cls_skyhook_test.cc | 1 + 3 files changed, 56 insertions(+), 66 deletions(-) diff --git a/cpp/src/skyhook/client/file_skyhook.cc b/cpp/src/skyhook/client/file_skyhook.cc index f8b57f441d2..0902a3e8c88 100644 --- a/cpp/src/skyhook/client/file_skyhook.cc +++ b/cpp/src/skyhook/client/file_skyhook.cc @@ -22,63 +22,11 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/file_ipc.h" #include "arrow/dataset/file_parquet.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" namespace skyhook { -/// A ScanTask to scan a file fragment in Skyhook format. -class SkyhookScanTask : public arrow::dataset::ScanTask { - public: - SkyhookScanTask(std::shared_ptr options, - std::shared_ptr fragment, - arrow::dataset::FileSource source, - std::shared_ptr doa, - skyhook::SkyhookFileType::type file_format, - arrow::compute::Expression partition_expression) - : ScanTask(std::move(options), std::move(fragment)), - source_(std::move(source)), - doa_(std::move(doa)), - file_format_(file_format), - partition_expression_(std::move(partition_expression)) {} - - arrow::Result Execute() override { - /// Retrieve the size of the file using POSIX `stat`. - struct stat st {}; - RETURN_NOT_OK(doa_->Stat(source_.path(), st)); - - /// Create a ScanRequest instance. - skyhook::ScanRequest req; - req.filter_expression = options_->filter; - req.partition_expression = partition_expression_; - req.projection_schema = options_->projected_schema; - req.dataset_schema = options_->dataset_schema; - req.file_size = st.st_size; - req.file_format = file_format_; - - /// Serialize the ScanRequest into a ceph bufferlist. - ceph::bufferlist request; - RETURN_NOT_OK(skyhook::SerializeScanRequest(req, &request)); - - /// Execute the Ceph object class method `scan_op`. - ceph::bufferlist result; - RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result)); - - /// Read RecordBatches from the result bufferlist. Since, this step might use - /// threads for decompressing compressed batches, to avoid running into - /// [ARROW-12597], we switch off threaded decompression to avoid nested threading - /// scenarios when scan tasks are executed in parallel by the CpuThreadPool. - arrow::RecordBatchVector batches; - RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, &batches)); - return arrow::MakeVectorIterator(std::move(batches)); - } - - protected: - arrow::dataset::FileSource source_; - std::shared_ptr doa_; - skyhook::SkyhookFileType::type file_format_; - arrow::compute::Expression partition_expression_; -}; - class SkyhookFileFormat::Impl { public: Impl(std::shared_ptr ctx, std::string file_format) @@ -95,10 +43,13 @@ class SkyhookFileFormat::Impl { return arrow::Status::OK(); } - arrow::Result ScanFile( + arrow::Result ScanBatchesAsync( + const std::shared_ptr& format, const std::shared_ptr& options, const std::shared_ptr& file) const { - /// Make sure client-side filtering and projection is turned off. + // TODO: investigate "true" async version + + // Make sure client-side filtering and projection is turned off. file->apply_compute = false; /// Convert string file format name to Enum. @@ -111,9 +62,46 @@ class SkyhookFileFormat::Impl { return arrow::Status::Invalid("Unsupported file format ", file_format_); } - arrow::dataset::ScanTaskVector v{std::make_shared( - options, file, file->source(), doa_, file_format, file->partition_expression())}; - return arrow::MakeVectorIterator(v); + auto fut = arrow::DeferNotOk(options->io_context.executor()->Submit( + [file, file_format, format, + options]() -> arrow::Result { + auto self = format->impl_.get(); + + /// Retrieve the size of the file using POSIX `stat`. + struct stat st {}; + RETURN_NOT_OK(self->doa_->Stat(file->source().path(), st)); + + /// Create a ScanRequest instance. + skyhook::ScanRequest req; + req.filter_expression = options->filter; + req.partition_expression = file->partition_expression(); + req.projection_schema = options->projected_schema; + req.dataset_schema = options->dataset_schema; + req.file_size = st.st_size; + req.file_format = file_format; + + /// Serialize the ScanRequest into a ceph bufferlist. + ceph::bufferlist request; + RETURN_NOT_OK(skyhook::SerializeScanRequest(req, &request)); + + /// Execute the Ceph object class method `scan_op`. + ceph::bufferlist result; + RETURN_NOT_OK(self->doa_->Exec(st.st_ino, "scan_op", request, result)); + + /// Read RecordBatches from the result bufferlist. Since, this step might use + /// threads for decompressing compressed batches, to avoid running into + /// [ARROW-12597], we switch off threaded decompression to avoid nested + /// threading scenarios when scan tasks are executed in parallel by the + /// CpuThreadPool. + arrow::RecordBatchVector batches; + RETURN_NOT_OK( + skyhook::DeserializeTable(result, !options->use_threads, &batches)); + auto gen = arrow::MakeVectorGenerator(std::move(batches)); + // Keep Ceph client alive + arrow::RecordBatchGenerator gen_with_client = [format, gen]() { return gen(); }; + return gen_with_client; + })); + return arrow::MakeFromFuture(std::move(fut)); } arrow::Result> Inspect( @@ -160,10 +148,12 @@ arrow::Result> SkyhookFileFormat::Inspect( return impl_->Inspect(source); } -arrow::Result SkyhookFileFormat::ScanFile( +arrow::Result SkyhookFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { - return impl_->ScanFile(options, file); + return impl_->ScanBatchesAsync( + arrow::internal::checked_pointer_cast(shared_from_this()), + options, file); } std::shared_ptr diff --git a/cpp/src/skyhook/client/file_skyhook.h b/cpp/src/skyhook/client/file_skyhook.h index 52a19f5bf3b..bec41fbdfa6 100644 --- a/cpp/src/skyhook/client/file_skyhook.h +++ b/cpp/src/skyhook/client/file_skyhook.h @@ -16,11 +16,14 @@ // under the License. #pragma once -#include "arrow/api.h" +#include +#include + #include "arrow/dataset/file_parquet.h" #include "arrow/dataset/scanner.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/type_fwd.h" namespace skyhook { @@ -77,11 +80,7 @@ class SkyhookFileFormat : public arrow::dataset::FileFormat { arrow::Result> Inspect( const arrow::dataset::FileSource& source) const override; - /// \brief Scan a file fragment. - /// \param[in] options The ScanOptions to use. - /// \param[in] file The file fragment to scan. - /// \return An iterator of ScanTasks. - arrow::Result ScanFile( + arrow::Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const override; diff --git a/cpp/src/skyhook/cls/cls_skyhook_test.cc b/cpp/src/skyhook/cls/cls_skyhook_test.cc index 461cdd6bc79..ebb1f4e0b2b 100644 --- a/cpp/src/skyhook/cls/cls_skyhook_test.cc +++ b/cpp/src/skyhook/cls/cls_skyhook_test.cc @@ -21,6 +21,7 @@ #include "arrow/dataset/file_base.h" #include "arrow/filesystem/api.h" #include "arrow/io/api.h" +#include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" From 819469a037176828b602dd7428fe04e003480934 Mon Sep 17 00:00:00 2001 From: David Li Date: Tue, 11 Jan 2022 15:11:11 -0500 Subject: [PATCH 2/3] ARROW-15300: [C++] Don't doubly-link Arrow into the tests --- cpp/src/skyhook/CMakeLists.txt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/skyhook/CMakeLists.txt b/cpp/src/skyhook/CMakeLists.txt index 22a414c5f26..992c4674132 100644 --- a/cpp/src/skyhook/CMakeLists.txt +++ b/cpp/src/skyhook/CMakeLists.txt @@ -63,11 +63,12 @@ add_dependencies(cls_skyhook ${ARROW_SKYHOOK_CLS_LIBRARIES}) # define the test builds if(ARROW_TEST_LINKAGE STREQUAL "static") - set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_dataset_static ${ARROW_TEST_STATIC_LINK_LIBS}) + set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_static arrow_dataset_static + ${ARROW_TEST_STATIC_LINK_LIBS}) else() - set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_dataset_shared ${ARROW_TEST_SHARED_LINK_LIBS}) + set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_shared arrow_dataset_shared + ${ARROW_TEST_SHARED_LINK_LIBS}) endif() -list(APPEND ARROW_SKYHOOK_TEST_LINK_LIBS ${ARROW_SKYHOOK_CLIENT_LIBRARIES}) # build the cls and protocol tests add_arrow_test(cls_test From 72d8216e0f093f8d96718202660a41783e652873 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 12 Jan 2022 13:33:10 -0500 Subject: [PATCH 3/3] ARROW-15300: [C++] Remove apply_compute --- cpp/src/arrow/dataset/dataset.h | 3 --- cpp/src/skyhook/client/file_skyhook.cc | 5 ----- 2 files changed, 8 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 21df820099e..9f4fee52154 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -78,9 +78,6 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { virtual ~Fragment() = default; - /// \brief Decide whether to apply filters and projections to this Fragment. - bool apply_compute = true; - protected: Fragment() = default; explicit Fragment(compute::Expression partition_expression, diff --git a/cpp/src/skyhook/client/file_skyhook.cc b/cpp/src/skyhook/client/file_skyhook.cc index 0902a3e8c88..cc262803533 100644 --- a/cpp/src/skyhook/client/file_skyhook.cc +++ b/cpp/src/skyhook/client/file_skyhook.cc @@ -47,11 +47,6 @@ class SkyhookFileFormat::Impl { const std::shared_ptr& format, const std::shared_ptr& options, const std::shared_ptr& file) const { - // TODO: investigate "true" async version - - // Make sure client-side filtering and projection is turned off. - file->apply_compute = false; - /// Convert string file format name to Enum. skyhook::SkyhookFileType::type file_format; if (file_format_ == "parquet") {