Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 34 additions & 58 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,41 @@
namespace arrow {
namespace dataset {

Fragment::Fragment(std::shared_ptr<ScanOptions> scan_options)
: scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {}
Fragment::Fragment(std::shared_ptr<Expression> partition_expression)
: partition_expression_(partition_expression ? partition_expression : scalar(true)) {}

const std::shared_ptr<Schema>& Fragment::schema() const {
return scan_options_->schema();
}
Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchema() { return schema_; }

InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options)
: Fragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {}
InMemoryFragment::InMemoryFragment(std::shared_ptr<Schema> schema,
RecordBatchVector record_batches,
std::shared_ptr<Expression> partition_expression)
: Fragment(std::move(partition_expression)),
schema_(std::move(schema)),
record_batches_(std::move(record_batches)) {}

InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: Fragment(std::move(scan_options), std::move(partition_expression)),
record_batches_(std::move(record_batches)) {}
: InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(),
std::move(record_batches), std::move(partition_expression)) {}

Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanContext> context) {
Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);

auto batch_size = options->batch_size;
// RecordBatch -> ScanTask
auto scan_options = scan_options_;
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
RecordBatchVector batches{batch};
RecordBatchVector batches;

auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);
for (int i = 0; i < n_batches; i++) {
batches.push_back(batch->Slice(batch_size * i, batch_size));
}

return ::arrow::internal::make_unique<InMemoryScanTask>(
std::move(batches), std::move(scan_options), std::move(context));
std::move(batches), std::move(options), std::move(context));
};

return MakeMapIterator(fn, std::move(batches_it));
Expand All @@ -72,36 +79,15 @@ Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanContext>());
}

bool Dataset::AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
if (partition_expression_ == nullptr) {
if (simplified_scan_options != nullptr) {
*simplified_scan_options = scan_options;
}
return true;
}

auto expr = scan_options->filter->Assume(*partition_expression_);
if (expr->IsNull() || expr->Equals(false)) {
// selector is not satisfiable; yield no fragments
return false;
}
FragmentIterator Dataset::GetFragments() { return GetFragments(scalar(true)); }

if (simplified_scan_options != nullptr) {
auto copy = std::make_shared<ScanOptions>(*scan_options);
copy->filter = std::move(expr);
*simplified_scan_options = std::move(copy);
FragmentIterator Dataset::GetFragments(std::shared_ptr<Expression> predicate) {
if (partition_expression_) {
predicate = predicate->Assume(*partition_expression_);
}
return true;
}

FragmentIterator Dataset::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<std::shared_ptr<Fragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
return predicate->IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
: MakeEmptyIterator<std::shared_ptr<Fragment>>();
}

struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
Expand Down Expand Up @@ -141,28 +127,18 @@ Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
}

FragmentIterator InMemoryDataset::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
FragmentIterator InMemoryDataset::GetFragmentsImpl(std::shared_ptr<Expression>) {
auto schema = this->schema();

auto create_fragment =
[scan_options,
schema](std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
[schema](std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
if (!batch->schema()->Equals(schema)) {
return Status::TypeError("yielded batch had schema ", *batch->schema(),
" which did not match InMemorySource's: ", *schema);
}

RecordBatchVector batches;

auto batch_size = scan_options->batch_size;
auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);

for (int i = 0; i < n_batches; i++) {
batches.push_back(batch->Slice(batch_size * i, batch_size));
}

return std::make_shared<InMemoryFragment>(std::move(batches), scan_options);
RecordBatchVector batches{batch};
return std::make_shared<InMemoryFragment>(std::move(batches));
};

return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
Expand Down Expand Up @@ -192,8 +168,8 @@ Result<std::shared_ptr<Dataset>> UnionDataset::ReplaceSchema(
new UnionDataset(std::move(schema), std::move(children)));
}

FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromDatasets(children_, options);
FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr<Expression> predicate) {
return GetFragmentsFromDatasets(children_, predicate);
}

} // namespace dataset
Expand Down
76 changes: 38 additions & 38 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,23 @@
namespace arrow {
namespace dataset {

/// \brief A granular piece of a Dataset, such as an individual file, which can be
/// read/scanned separately from other fragments.
/// \brief A granular piece of a Dataset, such as an individual file.
///
/// A Fragment yields a collection of RecordBatch, encapsulated in one or more ScanTasks.
/// A Fragment can be read/scanned separately from other fragments. It yields a
/// collection of RecordBatches when scanned, encapsulated in one or more
/// ScanTasks.
///
/// Note that Fragments have well defined physical schemas which are reconciled by
/// the Datasets which contain them; these physical schemas may differ from a parent
/// Dataset's schema and the physical schemas of sibling Fragments.
class ARROW_DS_EXPORT Fragment {
public:
/// \brief Return the physical schema of the Fragment.
///
/// The physical schema is also called the writer schema.
/// This method is blocking and may suffer from high latency filesystem.
virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchema() = 0;

/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this Fragment.
///
Expand All @@ -46,70 +57,65 @@ class ARROW_DS_EXPORT Fragment {
/// columns may be absent if they were not present in this fragment.
///
/// To receive a record batch stream which is fully filtered and projected, use Scanner.
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) = 0;
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) = 0;

/// \brief Return true if the fragment can benefit from parallel scanning.
virtual bool splittable() const = 0;

virtual std::string type_name() const = 0;

/// \brief Filtering, schema reconciliation, and partition options to use when
/// scanning this fragment.
const std::shared_ptr<ScanOptions>& scan_options() const { return scan_options_; }

const std::shared_ptr<Schema>& schema() const;

virtual ~Fragment() = default;

/// \brief An expression which evaluates to true for all data viewed by this
/// Fragment.
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

protected:
explicit Fragment(std::shared_ptr<ScanOptions> scan_options);
virtual ~Fragment() = default;

Fragment(std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: scan_options_(std::move(scan_options)),
partition_expression_(std::move(partition_expression)) {}
protected:
explicit Fragment(std::shared_ptr<Expression> partition_expression = NULLPTR);

std::shared_ptr<ScanOptions> scan_options_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A trivial Fragment that yields ScanTask out of a fixed set of
/// RecordBatch.
class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
public:
InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options);
InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
std::shared_ptr<Expression> = NULLPTR);
explicit InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<Expression> = NULLPTR);

InMemoryFragment(RecordBatchVector record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression);
Result<std::shared_ptr<Schema>> ReadPhysicalSchema() override;

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) override;

bool splittable() const override { return false; }

std::string type_name() const override { return "in-memory"; }

protected:
std::shared_ptr<Schema> schema_;
RecordBatchVector record_batches_;
};

/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism
/// of Fragments and partitions, e.g. files deeply nested in a directory.
/// \brief A container of zero or more Fragments.
///
/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
/// directory. A Dataset has a schema to which Fragments must align during a
/// scan operation. This is analogous to Avro's reader and writer schema.
class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
public:
/// \brief Begin to build a new Scan operation against this Dataset
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

/// \brief GetFragments returns an iterator of Fragments given ScanOptions.
FragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);
/// \brief GetFragments returns an iterator of Fragments given a predicate.
FragmentIterator GetFragments(std::shared_ptr<Expression> predicate);
FragmentIterator GetFragments();

const std::shared_ptr<Schema>& schema() const { return schema_; }

Expand Down Expand Up @@ -138,13 +144,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
: schema_(std::move(schema)), partition_expression_(std::move(e)) {}
Dataset() = default;

virtual FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this Dataset.
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;
virtual FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) = 0;

std::shared_ptr<Schema> schema_;
std::shared_ptr<Expression> partition_expression_;
Expand Down Expand Up @@ -176,7 +176,7 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
std::shared_ptr<Schema> schema) const override;

protected:
FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;
FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) override;

std::shared_ptr<RecordBatchGenerator> get_batches_;
};
Expand All @@ -200,7 +200,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset {
std::shared_ptr<Schema> schema) const override;

protected:
FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;
FragmentIterator GetFragmentsImpl(std::shared_ptr<Expression> predicate) override;

explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
: Dataset(std::move(schema)), children_(std::move(children)) {}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ namespace dataset {
/// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
/// flattened FragmentIterator.
static inline FragmentIterator GetFragmentsFromDatasets(
const DatasetVector& datasets, std::shared_ptr<ScanOptions> options) {
const DatasetVector& datasets, std::shared_ptr<Expression> predicate) {
// Iterator<Dataset>
auto datasets_it = MakeVectorIterator(datasets);

// Dataset -> Iterator<Fragment>
auto fn = [options](std::shared_ptr<Dataset> dataset) -> FragmentIterator {
return dataset->GetFragments(options);
auto fn = [predicate](std::shared_ptr<Dataset> dataset) -> FragmentIterator {
return dataset->GetFragments(predicate);
};

// Iterator<Iterator<Fragment>>
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ TEST_F(TestInMemoryFragment, Scan) {
auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);

// Creates a InMemoryFragment of the same repeated batch.
auto fragment =
InMemoryFragment({static_cast<size_t>(kNumberBatches), batch}, options_);
auto fragment = InMemoryFragment({static_cast<size_t>(kNumberBatches), batch});

AssertFragmentEquals(reader.get(), &fragment);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
}
options_ = ScanOptions::Make(schema);
ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema));
AssertFragmentsAreFromPath(dataset_->GetFragments(options_), paths);
AssertFragmentsAreFromPath(dataset_->GetFragments(), paths);
}

protected:
Expand Down
Loading