Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cpp/src/arrow/csv/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "arrow/util/thread_pool.h"

namespace arrow {

using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;

namespace csv {
Expand Down
164 changes: 75 additions & 89 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/scanner.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/vector.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -56,43 +59,34 @@ Result<std::shared_ptr<Schema>> InMemoryFragment::ReadPhysicalSchemaImpl() {
return physical_schema_;
}

InMemoryFragment::InMemoryFragment(std::shared_ptr<Schema> schema,
RecordBatchVector record_batches,
namespace {
struct VectorIterable {
Result<RecordBatchGenerator> operator()() { return MakeVectorGenerator(batches); }
RecordBatchVector batches;
};
} // namespace

InMemoryFragment::InMemoryFragment(std::shared_ptr<Schema> physical_schema,
RecordBatchIterable get_batches,
Expression partition_expression)
: Fragment(std::move(partition_expression), std::move(schema)),
record_batches_(std::move(record_batches)) {
: Fragment(std::move(partition_expression), std::move(physical_schema)),
get_batches_(std::move(get_batches)) {
DCHECK_NE(physical_schema_, nullptr);
}

InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
InMemoryFragment::InMemoryFragment(std::shared_ptr<Schema> physical_schema,
RecordBatchVector batches,
Expression partition_expression)
: Fragment(std::move(partition_expression), /*schema=*/nullptr),
record_batches_(std::move(record_batches)) {
// Order of argument evaluation is undefined, so compute physical_schema here
physical_schema_ = record_batches_.empty() ? schema({}) : record_batches_[0]->schema();
: Fragment(std::move(partition_expression), std::move(physical_schema)),
get_batches_(VectorIterable{std::move(batches)}) {
DCHECK_NE(physical_schema_, nullptr);
}

Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> options) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);

auto batch_size = options->batch_size;
// RecordBatch -> ScanTask
Future<ScanTaskVector> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> options) {
auto self = shared_from_this();
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
RecordBatchVector batches;

auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);
for (int i = 0; i < n_batches; i++) {
batches.push_back(batch->Slice(batch_size * i, batch_size));
}

return ::arrow::internal::make_unique<InMemoryScanTask>(std::move(batches),
std::move(options), self);
};

return MakeMapIterator(fn, std::move(batches_it));
ScanTaskVector scan_tasks{std::make_shared<InMemoryScanTask>(
get_batches_, std::move(options), std::move(self))};
return Future<ScanTaskVector>::MakeFinished(scan_tasks);
}

Dataset::Dataset(std::shared_ptr<Schema> schema, Expression partition_expression)
Expand All @@ -108,92 +102,84 @@ Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanOptions>());
}

Result<FragmentIterator> Dataset::GetFragments() {
Future<FragmentVector> Dataset::GetFragmentsAsync() const {
ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_));
return GetFragments(std::move(predicate));
return GetFragmentsAsync(std::move(predicate));
}

Result<FragmentIterator> Dataset::GetFragments(Expression predicate) {
Future<FragmentVector> Dataset::GetFragmentsAsync(Expression predicate) const {
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate))
: MakeEmptyIterator<std::shared_ptr<Fragment>>();
: FragmentVector{};
}

struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(RecordBatchVector batches)
: batches_(std::move(batches)) {}

RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); }
namespace {

RecordBatchVector batches_;
struct TableIterable {
Result<RecordBatchGenerator> operator()() {
auto reader = std::make_shared<TableBatchReader>(*table);
return [reader] { return reader->Next(); };
}
std::shared_ptr<Table> table;
};

InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
RecordBatchVector batches)
: Dataset(std::move(schema)),
get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}

struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
: table_(std::move(table)) {}
struct ReaderIterableState {
explicit ReaderIterableState(std::shared_ptr<RecordBatchReader> reader)
: reader(std::move(reader)), consumed(0) {}

RecordBatchIterator Get() const final {
auto reader = std::make_shared<TableBatchReader>(*table_);
auto table = table_;
return MakeFunctionIterator([reader, table] { return reader->Next(); });
std::shared_ptr<RecordBatchReader> reader;
std::atomic<uint8_t> consumed;
};
struct ReaderIterable {
explicit ReaderIterable(std::shared_ptr<RecordBatchReader> reader)
: state(std::make_shared<ReaderIterableState>(std::move(reader))) {}

Result<RecordBatchGenerator> operator()() {
if (state->consumed.fetch_or(1)) {
return Status::Invalid(
"A dataset created from a RecordBatchReader can only be scanned once");
}
auto reader_capture = state->reader;
return [reader_capture] { return reader_capture->Next(); };
}

std::shared_ptr<Table> table_;
std::shared_ptr<ReaderIterableState> state;
};

InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
get_batches_(new TableRecordBatchGenerator(std::move(table))) {}

struct ReaderRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit ReaderRecordBatchGenerator(std::shared_ptr<RecordBatchReader> reader)
: reader_(std::move(reader)), consumed_(false) {}
} // namespace

RecordBatchIterator Get() const final {
if (consumed_) {
return MakeErrorIterator<std::shared_ptr<RecordBatch>>(Status::Invalid(
"RecordBatchReader-backed InMemoryDataset was already consumed"));
}
consumed_ = true;
auto reader = reader_;
return MakeFunctionIterator([reader] { return reader->Next(); });
}
std::shared_ptr<InMemoryDataset> InMemoryDataset::FromTable(
std::shared_ptr<Table> table) {
auto schema = table->schema();
return std::make_shared<InMemoryDataset>(std::move(schema),
TableIterable{std::move(table)});
}

std::shared_ptr<RecordBatchReader> reader_;
mutable bool consumed_;
};
std::shared_ptr<InMemoryDataset> InMemoryDataset::FromReader(
std::shared_ptr<RecordBatchReader> reader) {
auto schema = reader->schema();
return std::make_shared<InMemoryDataset>(std::move(schema),
ReaderIterable{std::move(reader)});
}

InMemoryDataset::InMemoryDataset(std::shared_ptr<RecordBatchReader> reader)
: Dataset(reader->schema()),
get_batches_(new ReaderRecordBatchGenerator(std::move(reader))) {}
std::shared_ptr<InMemoryDataset> InMemoryDataset::FromBatches(
std::shared_ptr<Schema> schema, RecordBatchVector batches) {
return std::make_shared<InMemoryDataset>(std::move(schema),
VectorIterable{std::move(batches)});
}

Result<std::shared_ptr<Dataset>> InMemoryDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::make_shared<InMemoryDataset>(std::move(schema), get_batches_);
return std::make_shared<InMemoryDataset>(std::move(schema), std::move(get_batches_));
}

Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(Expression) {
Future<FragmentVector> InMemoryDataset::GetFragmentsImpl(Expression) const {
auto schema = this->schema();

auto create_fragment =
[schema](std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
if (!batch->schema()->Equals(schema)) {
return Status::TypeError("yielded batch had schema ", *batch->schema(),
" which did not match InMemorySource's: ", *schema);
}

RecordBatchVector batches{batch};
return std::make_shared<InMemoryFragment>(std::move(batches));
};

return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
FragmentVector fragments{std::make_shared<InMemoryFragment>(schema, get_batches_)};
return Future<FragmentVector>::MakeFinished(std::move(fragments));
}

Result<std::shared_ptr<UnionDataset>> UnionDataset::Make(std::shared_ptr<Schema> schema,
Expand All @@ -220,7 +206,7 @@ Result<std::shared_ptr<Dataset>> UnionDataset::ReplaceSchema(
new UnionDataset(std::move(schema), std::move(children)));
}

Result<FragmentIterator> UnionDataset::GetFragmentsImpl(Expression predicate) {
Future<FragmentVector> UnionDataset::GetFragmentsImpl(Expression predicate) const {
return GetFragmentsFromDatasets(children_, predicate);
}

Expand Down
70 changes: 39 additions & 31 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@
#include "arrow/dataset/expression.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/macros.h"
#include "arrow/util/mutex.h"

namespace arrow {
namespace dataset {

using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;
using RecordBatchIterable = std::function<Result<RecordBatchGenerator>()>;

/// \brief A granular piece of a Dataset, such as an individual file.
///
/// A Fragment can be read/scanned separately from other fragments. It yields a
Expand All @@ -52,7 +58,7 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
/// The schema is cached after being read once, or may be specified at construction.
Result<std::shared_ptr<Schema>> ReadPhysicalSchema();

/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// \brief Scan returns a generator of ScanTasks, each of which yields
/// RecordBatches from this Fragment.
///
/// Note that batches yielded using this method will not be filtered and may not align
Expand All @@ -62,10 +68,7 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
/// columns may be absent if they were not present in this fragment.
///
/// To receive a record batch stream which is fully filtered and projected, use Scanner.
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) = 0;

/// \brief Return true if the fragment can benefit from parallel scanning.
virtual bool splittable() const = 0;
virtual Future<ScanTaskVector> Scan(std::shared_ptr<ScanOptions> options) = 0;

virtual std::string type_name() const = 0;
virtual std::string ToString() const { return type_name(); }
Expand Down Expand Up @@ -105,20 +108,21 @@ class ARROW_DS_EXPORT FragmentScanOptions {
/// RecordBatch.
class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
public:
InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchIterable get_batches,
Expression = literal(true));
explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true));

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector batches,
Expression = literal(true));
// explicit InMemoryFragment(RecordBatchIterable get_batches, Expression =
// literal(true));

bool splittable() const override { return false; }
Future<ScanTaskVector> Scan(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "in-memory"; }

protected:
Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;

RecordBatchVector record_batches_;
RecordBatchIterable get_batches_;
};

/// \brief A container of zero or more Fragments.
Expand All @@ -133,8 +137,19 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Result<std::shared_ptr<ScannerBuilder>> NewScan();

/// \brief GetFragments returns an iterator of Fragments given a predicate.
Result<FragmentIterator> GetFragments(Expression predicate);
Result<FragmentIterator> GetFragments();
Future<FragmentVector> GetFragmentsAsync(Expression predicate) const;
Result<FragmentIterator> GetFragments(Expression predicate) const {
auto fut = GetFragmentsAsync(predicate);
fut.Wait();
ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result());
return MakeVectorIterator(fragments_vec);
}
Future<FragmentVector> GetFragmentsAsync() const;
Result<FragmentIterator> GetFragments() const {
auto fut = GetFragmentsAsync();
ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result());
return MakeVectorIterator(fragments_vec);
}

const std::shared_ptr<Schema>& schema() const { return schema_; }

Expand All @@ -159,7 +174,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {

Dataset(std::shared_ptr<Schema> schema, Expression partition_expression);

virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) = 0;
virtual Future<FragmentVector> GetFragmentsImpl(Expression predicate) const = 0;

std::shared_ptr<Schema> schema_;
Expression partition_expression_ = literal(true);
Expand All @@ -170,31 +185,24 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
/// The record batches must match the schema provided to the source at construction.
class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
public:
class RecordBatchGenerator {
public:
virtual ~RecordBatchGenerator() = default;
virtual RecordBatchIterator Get() const = 0;
};

InMemoryDataset(std::shared_ptr<Schema> schema,
std::shared_ptr<RecordBatchGenerator> get_batches)
InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchIterable get_batches)
: Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}

// Convenience constructor taking a fixed list of batches
InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);

explicit InMemoryDataset(std::shared_ptr<Table> table);
explicit InMemoryDataset(std::shared_ptr<RecordBatchReader> reader);

std::string type_name() const override { return "in-memory"; }

Result<std::shared_ptr<Dataset>> ReplaceSchema(
std::shared_ptr<Schema> schema) const override;

static std::shared_ptr<InMemoryDataset> FromTable(std::shared_ptr<Table> table);
static std::shared_ptr<InMemoryDataset> FromReader(
std::shared_ptr<RecordBatchReader> reader);
static std::shared_ptr<InMemoryDataset> FromBatches(std::shared_ptr<Schema> schema,
RecordBatchVector batches);

protected:
Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override;
Future<FragmentVector> GetFragmentsImpl(Expression predicate) const override;

std::shared_ptr<RecordBatchGenerator> get_batches_;
RecordBatchIterable get_batches_;
};

/// \brief A Dataset wrapping child Datasets.
Expand All @@ -216,7 +224,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset {
std::shared_ptr<Schema> schema) const override;

protected:
Result<FragmentIterator> GetFragmentsImpl(Expression predicate) override;
Future<FragmentVector> GetFragmentsImpl(Expression predicate) const override;

explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
: Dataset(std::move(schema)), children_(std::move(children)) {}
Expand Down
Loading