Skip to content
55 changes: 52 additions & 3 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,55 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> opt
return MakeMapIterator(fn, std::move(batches_it));
}

Result<RecordBatchGenerator> InMemoryFragment::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) {
struct State {
State(std::shared_ptr<InMemoryFragment> fragment, int64_t batch_size)
: fragment(std::move(fragment)),
batch_index(0),
offset(0),
batch_size(batch_size) {}

std::shared_ptr<RecordBatch> Next() {
const auto& next_parent = fragment->record_batches_[batch_index];
if (offset < next_parent->num_rows()) {
auto next = next_parent->Slice(offset, batch_size);
offset += batch_size;
return next;
}
batch_index++;
offset = 0;
return nullptr;
}

bool Finished() { return batch_index >= fragment->record_batches_.size(); }

std::shared_ptr<InMemoryFragment> fragment;
std::size_t batch_index;
int64_t offset;
int64_t batch_size;
};

struct Generator {
Generator(std::shared_ptr<InMemoryFragment> fragment, int64_t batch_size)
: state(std::make_shared<State>(std::move(fragment), batch_size)) {}

Future<std::shared_ptr<RecordBatch>> operator()() {
while (!state->Finished()) {
auto next = state->Next();
if (next) {
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(next));
}
}
return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
}

std::shared_ptr<State> state;
};
return Generator(internal::checked_pointer_cast<InMemoryFragment>(shared_from_this()),
options->batch_size);
}

Dataset::Dataset(std::shared_ptr<Schema> schema, Expression partition_expression)
: schema_(std::move(schema)),
partition_expression_(std::move(partition_expression)) {}
Expand Down Expand Up @@ -189,11 +238,11 @@ Result<FragmentIterator> InMemoryDataset::GetFragmentsImpl(Expression) {
" which did not match InMemorySource's: ", *schema);
}

RecordBatchVector batches{batch};
return std::make_shared<InMemoryFragment>(std::move(batches));
return std::make_shared<InMemoryFragment>(RecordBatchVector{std::move(batch)});
};

return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
auto batches_it = get_batches_->Get();
return MakeMaybeMapIterator(std::move(create_fragment), std::move(batches_it));
}

Result<std::shared_ptr<UnionDataset>> UnionDataset::Make(std::shared_ptr<Schema> schema,
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
namespace arrow {
namespace dataset {

using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;

/// \brief A granular piece of a Dataset, such as an individual file.
///
/// A Fragment can be read/scanned separately from other fragments. It yields a
Expand Down Expand Up @@ -64,6 +66,10 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
/// To receive a record batch stream which is fully filtered and projected, use Scanner.
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) = 0;

/// An asynchronous version of Scan
virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) = 0;

virtual std::string type_name() const = 0;
virtual std::string ToString() const { return type_name(); }

Expand Down Expand Up @@ -113,6 +119,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true));

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override;

std::string type_name() const override { return "in-memory"; }

Expand Down
59 changes: 58 additions & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,70 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
std::move(partition_expression), std::move(physical_schema)));
}

// TODO(ARROW-12355[CSV], ARROW-11772[IPC], ARROW-11843[Parquet]) The following
// implementation of ScanBatchesAsync is both ugly and terribly ineffecient. Each of the
// formats should provide their own efficient implementation.
Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
struct State {
State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
: scan_options(std::move(scan_options)),
scan_task_it(std::move(scan_task_it)),
current_rb_it(),
finished(false) {}

std::shared_ptr<ScanOptions> scan_options;
ScanTaskIterator scan_task_it;
RecordBatchIterator current_rb_it;
bool finished;
};
struct Generator {
Future<std::shared_ptr<RecordBatch>> operator()() {
while (!state->finished) {
if (!state->current_rb_it) {
RETURN_NOT_OK(PumpScanTask());
if (state->finished) {
return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
}
}
ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next());
if (IsIterationEnd(next_batch)) {
state->current_rb_it = RecordBatchIterator();
} else {
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(next_batch);
}
}
return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
}
Status PumpScanTask() {
ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next());
if (IsIterationEnd(next_task)) {
state->finished = true;
} else {
ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute());
}
return Status::OK();
}
std::shared_ptr<State> state;
};
return Generator{std::make_shared<State>(scan_options, std::move(scan_task_it))};
}

Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchemaImpl() {
return format_->Inspect(source_);
}

Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options) {
auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
return format_->ScanFile(std::move(options), self);
return format_->ScanFile(options, self);
}

Result<RecordBatchGenerator> FileFragment::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) {
auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
return format_->ScanBatchesAsync(options, self);
}

struct FileSystemDataset::FragmentSubtrees {
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
/// \brief Open a FileFragment for scanning.
/// May populate lazy properties of the FileFragment.
virtual Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const = 0;

virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file);

/// \brief Open a fragment
virtual Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, Expression partition_expression,
Expand All @@ -178,6 +182,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
class ARROW_DS_EXPORT FileFragment : public Fragment {
public:
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override;

std::string type_name() const override { return format_->type_name(); }
std::string ToString() const override { return source_.path(); };
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,10 @@ Result<std::shared_ptr<Schema>> CsvFileFormat::Inspect(const FileSource& source)
}

Result<ScanTaskIterator> CsvFileFormat::ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const {
auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
auto task =
std::make_shared<CsvScanTask>(std::move(this_), std::move(options), fragment);
auto task = std::make_shared<CsvScanTask>(std::move(this_), options, fragment);

return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {

/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const override;

Result<std::shared_ptr<FileWriter>> MakeWriter(
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source)
}

Result<ScanTaskIterator> IpcFileFormat::ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const {
return IpcScanTaskIterator::Make(std::move(options), std::move(fragment));
return IpcScanTaskIterator::Make(options, fragment);
}

//
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {

/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const override;

Result<std::shared_ptr<FileWriter>> MakeWriter(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
}

Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const {
auto* parquet_fragment = checked_cast<ParquetFileFragment*>(fragment.get());
std::vector<int> row_groups;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {

/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const override;

using FileFormat::MakeFragment;
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/test_util.h"
#include "arrow/status.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/io_util.h"

Expand Down Expand Up @@ -82,6 +83,51 @@ TEST(FileSource, BufferBased) {
ASSERT_EQ(source1.buffer(), source3.buffer());
}

constexpr int kNumScanTasks = 2;
constexpr int kBatchesPerScanTask = 2;
constexpr int kRowsPerBatch = 1024;
class MockFileFormat : public FileFormat {
virtual std::string type_name() const { return "mock"; }
virtual bool Equals(const FileFormat& other) const { return false; }
virtual Result<bool> IsSupported(const FileSource& source) const { return true; }
virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const {
return Status::NotImplemented("Not needed for test");
}
virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const {
return Status::NotImplemented("Not needed for test");
}
virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() { return nullptr; }

virtual Result<ScanTaskIterator> ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
auto sch = schema({field("i32", int32())});
ScanTaskVector scan_tasks;
for (int i = 0; i < kNumScanTasks; i++) {
RecordBatchVector batches;
for (int j = 0; j < kBatchesPerScanTask; j++) {
batches.push_back(ConstantArrayGenerator::Zeroes(kRowsPerBatch, sch));
}
scan_tasks.push_back(std::make_shared<InMemoryScanTask>(
std::move(batches), std::make_shared<ScanOptions>(), nullptr));
}
return MakeVectorIterator(std::move(scan_tasks));
}
};

TEST(FileFormat, ScanAsync) {
MockFileFormat format;
auto scan_options = std::make_shared<ScanOptions>();
ASSERT_OK_AND_ASSIGN(auto batch_gen, format.ScanBatchesAsync(scan_options, nullptr));
ASSERT_FINISHES_OK_AND_ASSIGN(auto batches, CollectAsyncGenerator(batch_gen));
ASSERT_EQ(kNumScanTasks * kBatchesPerScanTask, static_cast<int>(batches.size()));
for (int i = 0; i < kNumScanTasks * kBatchesPerScanTask; i++) {
ASSERT_EQ(kRowsPerBatch, batches[i]->num_rows());
}
}

TEST_F(TestFileSystemDataset, Basic) {
MakeDataset({});
AssertFragmentsAreFromPath(*dataset_->GetFragments(), {});
Expand Down
Loading