Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 53 additions & 23 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ const std::shared_ptr<Schema>& Fragment::schema() const {
return scan_options_->schema();
}

InMemoryFragment::InMemoryFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options)
InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options)
: Fragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {}

InMemoryFragment::InMemoryFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: Fragment(std::move(scan_options), std::move(partition_expression)),
record_batches_(std::move(record_batches)) {}

Expand All @@ -57,7 +55,7 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanContext> con
// RecordBatch -> ScanTask
auto scan_options = scan_options_;
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
std::vector<std::shared_ptr<RecordBatch>> batches{batch};
RecordBatchVector batches{batch};
return ::arrow::internal::make_unique<InMemoryScanTask>(
std::move(batches), std::move(scan_options), std::move(context));
};
Expand Down Expand Up @@ -106,20 +104,6 @@ FragmentIterator Dataset::GetFragments(std::shared_ptr<ScanOptions> scan_options
return GetFragmentsImpl(std::move(simplified_scan_options));
}

struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(std::vector<std::shared_ptr<RecordBatch>> batches)
: batches_(std::move(batches)) {}

RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); }

std::vector<std::shared_ptr<RecordBatch>> batches_;
};

InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches)
: Dataset(std::move(schema)),
get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}

struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
: table_(std::move(table)) {}
Expand All @@ -130,13 +114,59 @@ struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
return MakeFunctionIterator([reader, table] { return reader->Next(); });
}

bool uniform_schema_guaranteed() const final { return true; }

std::shared_ptr<Table> table_;
};

InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
get_batches_(new TableRecordBatchGenerator(std::move(table))) {}

struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(RecordBatchVector batches)
: batches_(std::move(batches)) {}

RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); }

bool uniform_schema_guaranteed() const final { return true; }

RecordBatchVector batches_;
};

Result<std::shared_ptr<InMemoryDataset>> InMemoryDataset::Make(
RecordBatchVector batches) {
if (batches.empty()) {
return Status::Invalid(
"InMemoryDataset::Make requires at least one batch or an explicit schema");
}
return Make(batches[0]->schema(), std::move(batches));
}

Result<std::shared_ptr<InMemoryDataset>> InMemoryDataset::Make(
std::shared_ptr<Schema> schema, RecordBatchVector batches) {
for (const auto& batch : batches) {
if (!batch->schema()->Equals(*schema)) {
return Status::TypeError("InMemoryDataset::Make requires uniform schemas");
}
}
return std::make_shared<InMemoryDataset>(
std::move(schema),
internal::make_unique<VectorRecordBatchGenerator>(std::move(batches)));
}

Result<std::shared_ptr<InMemoryDataset>> InMemoryDataset::Make(
std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanContext> context) {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(context));

TableAggregator aggregator;
RETURN_NOT_OK(
aggregator.AppendFrom(FilterAndProjectScanTask::Wrap(std::move(scan_task_it)),
fragment->scan_options(), context));

return Make(fragment->schema(), std::move(aggregator.batches));
}

FragmentIterator InMemoryDataset::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
auto schema = this->schema();
Expand All @@ -149,7 +179,7 @@ FragmentIterator InMemoryDataset::GetFragmentsImpl(
" which did not match InMemorySource's: ", *schema);
}

std::vector<std::shared_ptr<RecordBatch>> batches;
RecordBatchVector batches;

auto batch_size = scan_options->batch_size;
auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);
Expand Down
18 changes: 12 additions & 6 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class ARROW_DS_EXPORT Fragment {
/// RecordBatch.
class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
public:
InMemoryFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options);

InMemoryFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression);

Expand All @@ -97,7 +97,7 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
std::string type_name() const override { return "in-memory"; }

protected:
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
RecordBatchVector record_batches_;
};

/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism
Expand Down Expand Up @@ -152,17 +152,23 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
public:
virtual ~RecordBatchGenerator() = default;
virtual RecordBatchIterator Get() const = 0;
virtual bool uniform_schema_guaranteed() const { return false; }
};

InMemoryDataset(std::shared_ptr<Schema> schema,
std::unique_ptr<RecordBatchGenerator> get_batches)
: Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}

explicit InMemoryDataset(std::shared_ptr<Table> table);

// Convenience constructor taking a fixed list of batches
InMemoryDataset(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches);
static Result<std::shared_ptr<InMemoryDataset>> Make(RecordBatchVector batches);
static Result<std::shared_ptr<InMemoryDataset>> Make(std::shared_ptr<Schema> schema,
RecordBatchVector batches);

explicit InMemoryDataset(std::shared_ptr<Table> table);
// Convenience constructor which scans a fragment's batches
static Result<std::shared_ptr<InMemoryDataset>> Make(
std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanContext> context);

FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

Expand Down
99 changes: 99 additions & 0 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@
#pragma once

#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>

#include "arrow/dataset/dataset.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/type.h"
#include "arrow/util/iterator.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -64,5 +69,99 @@ inline std::shared_ptr<Schema> SchemaFromColumnNames(
return schema(std::move(columns));
}

inline std::shared_ptr<internal::TaskGroup> MakeTaskGroup(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<ScanContext>& context) {
return options->use_threads ? internal::TaskGroup::MakeThreaded(context->thread_pool)
: internal::TaskGroup::MakeSerial();
}

static inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it,
const ExpressionEvaluator& evaluator,
const Expression& filter,
MemoryPool* pool) {
return MakeMaybeMapIterator(
[&filter, &evaluator, pool](std::shared_ptr<RecordBatch> in) {
return evaluator.Evaluate(filter, *in, pool).Map([&](compute::Datum selection) {
return evaluator.Filter(selection, in);
});
},
std::move(it));
}

static inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it,
RecordBatchProjector* projector,
MemoryPool* pool) {
return MakeMaybeMapIterator(
[=](std::shared_ptr<RecordBatch> in) { return projector->Project(*in, pool); },
std::move(it));
}

class FilterAndProjectScanTask : public ScanTask {
public:
explicit FilterAndProjectScanTask(std::shared_ptr<ScanTask> task)
: ScanTask(task->options(), task->context()), task_(std::move(task)) {}

Result<RecordBatchIterator> Execute() override {
ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute());
auto filter_it = FilterRecordBatch(std::move(it), *options_->evaluator,
*options_->filter, context_->pool);
return ProjectRecordBatch(std::move(filter_it), &task_->options()->projector,
context_->pool);
}

static ScanTaskIterator Wrap(ScanTaskIterator scan_task_it) {
auto wrap_scan_task =
[](std::shared_ptr<ScanTask> task) -> std::shared_ptr<ScanTask> {
return std::make_shared<FilterAndProjectScanTask>(std::move(task));
};
return MakeMapIterator(wrap_scan_task, std::move(scan_task_it));
}

private:
std::shared_ptr<ScanTask> task_;
};

struct TableAggregator {
void Append(std::shared_ptr<RecordBatch> batch) {
std::lock_guard<std::mutex> lock(m);
batches.emplace_back(std::move(batch));
}

template <typename... Args>
Status AppendFrom(ScanTaskIterator scan_task_it, Args&&... args) {
auto task_group = MakeTaskGroup(std::forward<Args>(args)...);

for (auto maybe_scan_task : scan_task_it) {
ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));
AppendFrom(std::move(scan_task), task_group.get());
}

// Wait for all tasks to complete, or the first error.
return task_group->Finish();
}

void AppendFrom(std::shared_ptr<ScanTask> scan_task, internal::TaskGroup* task_group) {
task_group->Append([this, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
for (auto maybe_batch : batch_it) {
ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
Append(std::move(batch));
}

return Status::OK();
});
}

Result<std::shared_ptr<Table>> Finish(const std::shared_ptr<Schema>& schema) {
std::shared_ptr<Table> out;
RETURN_NOT_OK(Table::FromRecordBatches(schema, batches, &out));
return out;
}

std::mutex m;
std::vector<std::shared_ptr<RecordBatch>> batches;
};

} // namespace dataset
} // namespace arrow
23 changes: 13 additions & 10 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ TEST_F(TestInMemoryDataset, GetFragments) {

// It is safe to copy fragment multiple time since Scan() does not consume
// the internal array.
auto dataset = std::make_shared<InMemoryDataset>(
schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch});
ASSERT_OK_AND_ASSIGN(
auto dataset,
InMemoryDataset::Make(
schema_, RecordBatchVector{static_cast<size_t>(kNumberBatches), batch}));

AssertDatasetEquals(reader.get(), dataset.get());
}
Expand All @@ -84,8 +86,10 @@ TEST_F(TestUnionDataset, GetFragments) {
// Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the
// leaves are InMemoryDataset containing kChildPerNode fragments.

auto l1_leaf_dataset = std::make_shared<InMemoryDataset>(
schema_, RecordBatchVector{static_cast<size_t>(kChildPerNode), batch});
ASSERT_OK_AND_ASSIGN(
auto l1_leaf_dataset,
InMemoryDataset::Make(
schema_, RecordBatchVector{static_cast<size_t>(kChildPerNode), batch}));

ASSERT_OK_AND_ASSIGN(
auto l2_leaf_tree_dataset,
Expand Down Expand Up @@ -117,10 +121,9 @@ TEST_F(TestDataset, TrivialScan) {
std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
batch};

DatasetVector children = {
std::make_shared<InMemoryDataset>(schema_, batches),
std::make_shared<InMemoryDataset>(schema_, batches),
};
DatasetVector children(2, nullptr);
ASSERT_OK_AND_ASSIGN(children[0], InMemoryDataset::Make(schema_, batches));
ASSERT_OK_AND_ASSIGN(children[1], InMemoryDataset::Make(schema_, batches));

const int64_t total_batches = children.size() * kNumberBatches;
auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);
Expand Down Expand Up @@ -231,7 +234,7 @@ TEST(TestProjector, NonTrivial) {
}

class TestEndToEnd : public TestDataset {
void SetUp() {
void SetUp() override {
bool nullable = false;
SetSchema({
field("region", utf8(), nullable),
Expand Down Expand Up @@ -407,7 +410,7 @@ class TestSchemaUnification : public TestDataset {
public:
using i32 = util::optional<int32_t>;

void SetUp() {
void SetUp() override {
using PathAndContent = std::vector<std::pair<std::string, std::string>>;

// The following test creates 2 sources with divergent but compatible
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class MockDatasetFactory : public DatasetFactory {

Result<std::shared_ptr<Dataset>> Finish(
const std::shared_ptr<Schema>& schema) override {
return std::make_shared<InMemoryDataset>(schema,
std::vector<std::shared_ptr<RecordBatch>>{});
return InMemoryDataset::Make(schema, RecordBatchVector{});
}

protected:
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
return std::make_shared<::arrow::io::BufferReader>(buffer());
}

Result<std::shared_ptr<Fragment>> FileFormat::MakeFragment(FileSource source) {
ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(source));
auto options = ScanOptions::Make(std::move(schema));
return MakeFragment(std::move(source), std::move(options), scalar(true));
}

Result<std::shared_ptr<Fragment>> FileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options) {
return MakeFragment(std::move(source), std::move(options), scalar(true));
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma

Result<std::shared_ptr<Fragment>> MakeFragment(FileSource source,
std::shared_ptr<ScanOptions> options);

Result<std::shared_ptr<Fragment>> MakeFragment(FileSource source);
};

/// \brief A Fragment that is stored in a file with a known format
Expand Down
Loading