Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <memory>
#include <utility>

#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/util/iterator.h"
#include "arrow/util/stl.h"

namespace arrow {
Expand All @@ -46,11 +48,10 @@ Status SimpleDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
return Status::OK();
}

Status Dataset::Make(const std::vector<std::shared_ptr<DataSource>>& sources,
const std::shared_ptr<Schema>& schema,
std::shared_ptr<Dataset>* out) {
Status Dataset::Make(std::vector<std::shared_ptr<DataSource>> sources,
std::shared_ptr<Schema> schema, std::shared_ptr<Dataset>* out) {
// TODO: Ensure schema and sources align.
*out = std::make_shared<Dataset>(sources, schema);
*out = std::make_shared<Dataset>(std::move(sources), std::move(schema));

return Status::OK();
}
Expand All @@ -61,5 +62,44 @@ Status Dataset::NewScan(std::unique_ptr<ScannerBuilder>* out) {
return Status::OK();
}

bool DataSource::AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
DCHECK_NE(simplified_scan_options, nullptr);
if (scan_options == nullptr) {
// null scan options; no selector to simplify
*simplified_scan_options = scan_options;
return true;
}

auto c = SelectorAssume(scan_options->selector, partition_expression_);
DCHECK_OK(c.status());
auto expr = std::move(c).ValueOrDie();

bool trivial = true;
if (expr->IsNull() || (expr->IsTrivialCondition(&trivial) && !trivial)) {
// selector is not satisfiable; yield no fragments
return false;
}

auto copy = std::make_shared<ScanOptions>(*scan_options);
copy->selector = ExpressionSelector(std::move(expr));
*simplified_scan_options = std::move(copy);
return true;
}

DataFragmentIterator DataSource::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
}

DataFragmentIterator SimpleDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
return MakeVectorIterator(fragments_);
}

} // namespace dataset
} // namespace arrow
40 changes: 29 additions & 11 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/iterator.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -77,11 +77,32 @@ class ARROW_DS_EXPORT DataSource {
public:
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
virtual DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) = 0;
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

/// \brief An expression which evaluates to true for all data viewed by this DataSource.
/// May be null, which indicates no information is available.
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

virtual std::string type() const = 0;

virtual ~DataSource() = default;

protected:
DataSource() = default;
explicit DataSource(std::shared_ptr<Expression> c)
: partition_expression_(std::move(c)) {}

virtual DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this DataSource.
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;

std::shared_ptr<Expression> partition_expression_;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Expand All @@ -90,9 +111,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
explicit SimpleDataSource(DataFragmentVector fragments)
: fragments_(std::move(fragments)) {}

DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) override {
return MakeVectorIterator(fragments_);
}
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type() const override { return "simple_data_source"; }

Expand All @@ -107,13 +126,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
/// WARNING, this constructor is not recommend, use Dataset::Make instead.
/// \param[in] sources one or more input data sources
/// \param[in] schema a known schema to conform to, may be nullptr
explicit Dataset(const std::vector<std::shared_ptr<DataSource>>& sources,
const std::shared_ptr<Schema>& schema)
: schema_(schema), sources_(sources) {}
explicit Dataset(std::vector<std::shared_ptr<DataSource>> sources,
std::shared_ptr<Schema> schema)
: schema_(std::move(schema)), sources_(std::move(sources)) {}

static Status Make(const std::vector<std::shared_ptr<DataSource>>& sources,
const std::shared_ptr<Schema>& schema,
std::shared_ptr<Dataset>* out);
static Status Make(std::vector<std::shared_ptr<DataSource>> sourcs,
std::shared_ptr<Schema> schema, std::shared_ptr<Dataset>* out);

/// \brief Begin to build a new Scan operation against this Dataset
Status NewScan(std::unique_ptr<ScannerBuilder>* out);
Expand Down
31 changes: 23 additions & 8 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <algorithm>
#include <vector>

#include "arrow/dataset/filter.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/util/iterator.h"
#include "arrow/util/stl.h"

namespace arrow {
Expand All @@ -47,18 +49,18 @@ Status FileBasedDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,

FileSystemBasedDataSource::FileSystemBasedDataSource(
fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<FileFormat> format, std::shared_ptr<Expression> partition_expression,
std::vector<fs::FileStats> stats)
: filesystem_(filesystem),
: DataSource(std::move(partition_expression)),
filesystem_(filesystem),
selector_(std::move(selector)),
format_(std::move(format)),
scan_options_(std::move(scan_options)),
stats_(std::move(stats)) {}

Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression,
std::unique_ptr<FileSystemBasedDataSource>* out) {
std::vector<fs::FileStats> stats;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats));
Expand All @@ -71,12 +73,25 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
stats.resize(new_end - stats.begin());

out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format),
std::move(scan_options), std::move(stats)));
std::move(partition_expression),
std::move(stats)));
return Status::OK();
}

DataFragmentIterator FileSystemBasedDataSource::GetFragments(
std::shared_ptr<ScanOptions> options) {
Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::unique_ptr<FileSystemBasedDataSource>* out) {
return Make(filesystem, selector, std::move(format), nullptr, out);
}

DataFragmentIterator FileSystemBasedDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}

struct Impl : DataFragmentIterator {
Impl(fs::FileSystem* filesystem, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options, std::vector<fs::FileStats> stats)
Expand Down Expand Up @@ -105,7 +120,7 @@ DataFragmentIterator FileSystemBasedDataSource::GetFragments(
std::vector<fs::FileStats> stats_;
};

return DataFragmentIterator(Impl(filesystem_, format_, options, stats_));
return DataFragmentIterator(Impl(filesystem_, format_, scan_options, stats_));
}

} // namespace dataset
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,26 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource {
public:
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::unique_ptr<FileSystemBasedDataSource>* out);

std::string type() const override { return "directory"; }
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<Expression> partition_expression,
std::unique_ptr<FileSystemBasedDataSource>* out);

DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) override;
std::string type() const override { return "directory"; }

protected:
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression,
std::vector<fs::FileStats> stats);

fs::FileSystem* filesystem_ = NULLPTR;
fs::Selector selector_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
std::vector<fs::FileStats> stats_;
};

Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ using RecordBatchReaderPtr = std::unique_ptr<RecordBatchReader>;
// A set of RowGroup identifiers
using RowGroupSet = std::vector<int>;

// TODO(bkietz) refactor this to use ProjectedRecordBatchReader
class ParquetScanTask : public ScanTask {
public:
static Status Make(RowGroupSet row_groups, const std::vector<int>& columns_projection,
Expand Down Expand Up @@ -128,7 +129,7 @@ class ParquetScanTaskIterator {
}

Status Next(ScanTaskPtr* task) {
auto partition = partitionner_.Next();
auto partition = partitioner_.Next();

// Iteration is done.
if (partition.size() == 0) {
Expand All @@ -145,7 +146,8 @@ class ParquetScanTaskIterator {
static Status InferColumnProjection(const parquet::FileMetaData& metadata,
const std::shared_ptr<ScanOptions>& options,
std::vector<int>* out) {
// TODO(fsaintjacques): Compute intersection _and_ validity
// TODO(fsaintjacques): Compute intersection _and_ validity, could probably reuse
// RecordBatchProjector here
*out = internal::Iota(metadata.num_columns());

return Status::OK();
Expand All @@ -155,11 +157,11 @@ class ParquetScanTaskIterator {
std::shared_ptr<parquet::FileMetaData> metadata,
std::unique_ptr<parquet::arrow::FileReader> reader)
: columns_projection_(columns_projection),
partitionner_(std::move(metadata)),
partitioner_(std::move(metadata)),
reader_(std::move(reader)) {}

std::vector<int> columns_projection_;
ParquetRowGroupPartitioner partitionner_;
ParquetRowGroupPartitioner partitioner_;
std::shared_ptr<parquet::arrow::FileReader> reader_;
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,9 @@ TEST_F(TestParquetFileSystemBasedDataSource, Recursive) { this->Recursive(); }

TEST_F(TestParquetFileSystemBasedDataSource, DeletedFile) { this->DeletedFile(); }

TEST_F(TestParquetFileSystemBasedDataSource, PredicatePushDown) {
this->PredicatePushDown();
}

} // namespace dataset
} // namespace arrow
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,9 @@ TEST_F(TestDummyFileSystemBasedDataSource, Recursive) { this->Recursive(); }

TEST_F(TestDummyFileSystemBasedDataSource, DeletedFile) { this->DeletedFile(); }

TEST_F(TestDummyFileSystemBasedDataSource, PredicatePushDown) {
this->PredicatePushDown();
}

} // namespace dataset
} // namespace arrow
29 changes: 29 additions & 0 deletions cpp/src/arrow/dataset/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/compute/context.h"
#include "arrow/compute/kernels/boolean.h"
#include "arrow/compute/kernels/compare.h"
#include "arrow/dataset/dataset.h"
#include "arrow/record_batch.h"
#include "arrow/util/logging.h"
#include "arrow/visitor_inline.h"
Expand Down Expand Up @@ -938,5 +939,33 @@ Result<std::shared_ptr<DataType>> FieldExpression::Validate(const Schema& schema
return null();
}

Result<std::shared_ptr<Expression>> SelectorAssume(
const std::shared_ptr<DataSelector>& selector,
const std::shared_ptr<Expression>& given) {
if (selector == nullptr || selector->filters.size() == 0) {
return ScalarExpression::Make(true);
}

auto get_expression = [](const std::shared_ptr<Filter>& f) {
DCHECK_EQ(f->type(), FilterType::EXPRESSION);
return checked_cast<const ExpressionFilter&>(*f).expression();
};

auto out_expr = get_expression(selector->filters[0]);
for (size_t i = 1; i < selector->filters.size(); ++i) {
out_expr = and_(std::move(out_expr), get_expression(selector->filters[i]));
}

if (given == nullptr) {
return std::move(out_expr);
}
return out_expr->Assume(*given);
}

std::shared_ptr<DataSelector> ExpressionSelector(std::shared_ptr<Expression> e) {
return std::make_shared<DataSelector>(
DataSelector{FilterVector{std::make_shared<ExpressionFilter>(std::move(e))}});
}

} // namespace dataset
} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <utility>

#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/compare.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
Expand Down Expand Up @@ -402,5 +403,12 @@ inline FieldExpression operator"" _(const char* name, size_t name_length) {
}
} // namespace string_literals

ARROW_DS_EXPORT Result<std::shared_ptr<Expression>> SelectorAssume(
const std::shared_ptr<DataSelector>& selector,
const std::shared_ptr<Expression>& given);

ARROW_DS_EXPORT std::shared_ptr<DataSelector> ExpressionSelector(
std::shared_ptr<Expression> e);

} // namespace dataset
} // namespace arrow
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>

#include "arrow/dataset/dataset.h"
#include "arrow/util/iterator.h"

namespace arrow {
namespace dataset {
Expand Down
Loading