Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {

virtual ~Fragment() = default;

/// \brief Decide whether to apply filters and projections to this Fragment.
bool apply_compute = true;

protected:
Fragment() = default;
explicit Fragment(compute::Expression partition_expression,
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/skyhook/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ add_dependencies(cls_skyhook ${ARROW_SKYHOOK_CLS_LIBRARIES})

# define the test builds
if(ARROW_TEST_LINKAGE STREQUAL "static")
set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_dataset_static ${ARROW_TEST_STATIC_LINK_LIBS})
set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_static arrow_dataset_static
${ARROW_TEST_STATIC_LINK_LIBS})
else()
set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_dataset_shared ${ARROW_TEST_SHARED_LINK_LIBS})
set(ARROW_SKYHOOK_TEST_LINK_LIBS arrow_skyhook_client_shared arrow_dataset_shared
${ARROW_TEST_SHARED_LINK_LIBS})
endif()
list(APPEND ARROW_SKYHOOK_TEST_LINK_LIBS ${ARROW_SKYHOOK_CLIENT_LIBRARIES})

# build the cls and protocol tests
add_arrow_test(cls_test
Expand Down
109 changes: 47 additions & 62 deletions cpp/src/skyhook/client/file_skyhook.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,63 +22,11 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/file_ipc.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"

namespace skyhook {

/// A ScanTask to scan a file fragment in Skyhook format.
class SkyhookScanTask : public arrow::dataset::ScanTask {
public:
SkyhookScanTask(std::shared_ptr<arrow::dataset::ScanOptions> options,
std::shared_ptr<arrow::dataset::Fragment> fragment,
arrow::dataset::FileSource source,
std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa,
skyhook::SkyhookFileType::type file_format,
arrow::compute::Expression partition_expression)
: ScanTask(std::move(options), std::move(fragment)),
source_(std::move(source)),
doa_(std::move(doa)),
file_format_(file_format),
partition_expression_(std::move(partition_expression)) {}

arrow::Result<arrow::RecordBatchIterator> Execute() override {
/// Retrieve the size of the file using POSIX `stat`.
struct stat st {};
RETURN_NOT_OK(doa_->Stat(source_.path(), st));

/// Create a ScanRequest instance.
skyhook::ScanRequest req;
req.filter_expression = options_->filter;
req.partition_expression = partition_expression_;
req.projection_schema = options_->projected_schema;
req.dataset_schema = options_->dataset_schema;
req.file_size = st.st_size;
req.file_format = file_format_;

/// Serialize the ScanRequest into a ceph bufferlist.
ceph::bufferlist request;
RETURN_NOT_OK(skyhook::SerializeScanRequest(req, &request));

/// Execute the Ceph object class method `scan_op`.
ceph::bufferlist result;
RETURN_NOT_OK(doa_->Exec(st.st_ino, "scan_op", request, result));

/// Read RecordBatches from the result bufferlist. Since, this step might use
/// threads for decompressing compressed batches, to avoid running into
/// [ARROW-12597], we switch off threaded decompression to avoid nested threading
/// scenarios when scan tasks are executed in parallel by the CpuThreadPool.
arrow::RecordBatchVector batches;
RETURN_NOT_OK(skyhook::DeserializeTable(result, !options_->use_threads, &batches));
return arrow::MakeVectorIterator(std::move(batches));
}

protected:
arrow::dataset::FileSource source_;
std::shared_ptr<skyhook::SkyhookDirectObjectAccess> doa_;
skyhook::SkyhookFileType::type file_format_;
arrow::compute::Expression partition_expression_;
};

class SkyhookFileFormat::Impl {
public:
Impl(std::shared_ptr<RadosConnCtx> ctx, std::string file_format)
Expand All @@ -95,12 +43,10 @@ class SkyhookFileFormat::Impl {
return arrow::Status::OK();
}

arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<const SkyhookFileFormat>& format,
const std::shared_ptr<arrow::dataset::ScanOptions>& options,
const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
/// Make sure client-side filtering and projection is turned off.
file->apply_compute = false;

/// Convert string file format name to Enum.
skyhook::SkyhookFileType::type file_format;
if (file_format_ == "parquet") {
Expand All @@ -111,9 +57,46 @@ class SkyhookFileFormat::Impl {
return arrow::Status::Invalid("Unsupported file format ", file_format_);
}

arrow::dataset::ScanTaskVector v{std::make_shared<SkyhookScanTask>(
options, file, file->source(), doa_, file_format, file->partition_expression())};
return arrow::MakeVectorIterator(v);
auto fut = arrow::DeferNotOk(options->io_context.executor()->Submit(
[file, file_format, format,
options]() -> arrow::Result<arrow::RecordBatchGenerator> {
auto self = format->impl_.get();

/// Retrieve the size of the file using POSIX `stat`.
struct stat st {};
RETURN_NOT_OK(self->doa_->Stat(file->source().path(), st));

/// Create a ScanRequest instance.
skyhook::ScanRequest req;
req.filter_expression = options->filter;
req.partition_expression = file->partition_expression();
req.projection_schema = options->projected_schema;
req.dataset_schema = options->dataset_schema;
req.file_size = st.st_size;
req.file_format = file_format;

/// Serialize the ScanRequest into a ceph bufferlist.
ceph::bufferlist request;
RETURN_NOT_OK(skyhook::SerializeScanRequest(req, &request));

/// Execute the Ceph object class method `scan_op`.
ceph::bufferlist result;
RETURN_NOT_OK(self->doa_->Exec(st.st_ino, "scan_op", request, result));

/// Read RecordBatches from the result bufferlist. Since, this step might use
/// threads for decompressing compressed batches, to avoid running into
/// [ARROW-12597], we switch off threaded decompression to avoid nested
/// threading scenarios when scan tasks are executed in parallel by the
/// CpuThreadPool.
arrow::RecordBatchVector batches;
RETURN_NOT_OK(
skyhook::DeserializeTable(result, !options->use_threads, &batches));
auto gen = arrow::MakeVectorGenerator(std::move(batches));
// Keep Ceph client alive
arrow::RecordBatchGenerator gen_with_client = [format, gen]() { return gen(); };
return gen_with_client;
}));
return arrow::MakeFromFuture(std::move(fut));
}

arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(
Expand Down Expand Up @@ -160,10 +143,12 @@ arrow::Result<std::shared_ptr<arrow::Schema>> SkyhookFileFormat::Inspect(
return impl_->Inspect(source);
}

arrow::Result<arrow::dataset::ScanTaskIterator> SkyhookFileFormat::ScanFile(
arrow::Result<arrow::RecordBatchGenerator> SkyhookFileFormat::ScanBatchesAsync(
const std::shared_ptr<arrow::dataset::ScanOptions>& options,
const std::shared_ptr<arrow::dataset::FileFragment>& file) const {
return impl_->ScanFile(options, file);
return impl_->ScanBatchesAsync(
arrow::internal::checked_pointer_cast<const SkyhookFileFormat>(shared_from_this()),
options, file);
}

std::shared_ptr<arrow::dataset::FileWriteOptions>
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/skyhook/client/file_skyhook.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
// under the License.
#pragma once

#include "arrow/api.h"
#include <memory>
#include <string>

#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/scanner.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/type_fwd.h"

namespace skyhook {

Expand Down Expand Up @@ -77,11 +80,7 @@ class SkyhookFileFormat : public arrow::dataset::FileFormat {
arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(
const arrow::dataset::FileSource& source) const override;

/// \brief Scan a file fragment.
/// \param[in] options The ScanOptions to use.
/// \param[in] file The file fragment to scan.
/// \return An iterator of ScanTasks.
arrow::Result<arrow::dataset::ScanTaskIterator> ScanFile(
arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<arrow::dataset::ScanOptions>& options,
const std::shared_ptr<arrow::dataset::FileFragment>& file) const override;

Expand Down
1 change: 1 addition & 0 deletions cpp/src/skyhook/cls/cls_skyhook_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/api.h"
#include "arrow/io/api.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
Expand Down