Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ SimpleDataFragment::SimpleDataFragment(
: record_batches_(std::move(record_batches)) {}

Status SimpleDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) {
ScanTaskIterator* out) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto it = MakeVectorIterator(record_batches_);
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ARROW_DS_EXPORT DataFragment {
/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this DataFragment.
virtual Status Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) = 0;
ScanTaskIterator* out) = 0;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
Expand All @@ -60,8 +60,7 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment {
public:
explicit SimpleDataFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches);

Status Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) override;
Status Scan(std::shared_ptr<ScanContext> scan_context, ScanTaskIterator* out) override;

bool splittable() const override { return false; }

Expand All @@ -78,8 +77,7 @@ class ARROW_DS_EXPORT DataSource {
public:
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
virtual std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) = 0;
virtual DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) = 0;

virtual std::string type() const = 0;

Expand All @@ -92,8 +90,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
explicit SimpleDataSource(DataFragmentVector fragments)
: fragments_(std::move(fragments)) {}

std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) override {
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) override {
return MakeVectorIterator(fragments_);
}

Expand Down
13 changes: 6 additions & 7 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,15 @@ constexpr int RecordBatchProjector::kNoMatch;
class ProjectedRecordBatchReader : public RecordBatchReader {
public:
static Status Make(MemoryPool* pool, RecordBatchProjector projector,
std::unique_ptr<RecordBatchIterator> wrapped,
std::unique_ptr<RecordBatchIterator>* out) {
out->reset(
new ProjectedRecordBatchReader(pool, std::move(projector), std::move(wrapped)));
RecordBatchIterator wrapped, RecordBatchIterator* out) {
*out = RecordBatchIterator(
ProjectedRecordBatchReader(pool, std::move(projector), std::move(wrapped)));
return Status::OK();
}

Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
std::shared_ptr<RecordBatch> rb;
RETURN_NOT_OK(wrapped_->Next(&rb));
RETURN_NOT_OK(wrapped_.Next(&rb));
if (rb == nullptr) {
*out = nullptr;
return Status::OK();
Expand All @@ -185,11 +184,11 @@ class ProjectedRecordBatchReader : public RecordBatchReader {

private:
ProjectedRecordBatchReader(MemoryPool* pool, RecordBatchProjector projector,
std::unique_ptr<RecordBatchIterator> wrapped)
RecordBatchIterator wrapped)
: projector_(std::move(projector)), wrapped_(std::move(wrapped)) {}

RecordBatchProjector projector_;
std::unique_ptr<RecordBatchIterator> wrapped_;
RecordBatchIterator wrapped_;
};

} // namespace dataset
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Status FileSource::Open(std::shared_ptr<arrow::io::RandomAccessFile>* out) const
}

Status FileBasedDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) {
ScanTaskIterator* out) {
return format_->ScanFile(source_, scan_options_, scan_context, out);
}

Expand Down Expand Up @@ -75,7 +75,7 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
return Status::OK();
}

std::unique_ptr<DataFragmentIterator> FileSystemBasedDataSource::GetFragments(
DataFragmentIterator FileSystemBasedDataSource::GetFragments(
std::shared_ptr<ScanOptions> options) {
struct Impl : DataFragmentIterator {
Impl(fs::FileSystem* filesystem, std::shared_ptr<FileFormat> format,
Expand Down Expand Up @@ -105,7 +105,7 @@ std::unique_ptr<DataFragmentIterator> FileSystemBasedDataSource::GetFragments(
std::vector<fs::FileStats> stats_;
};

return internal::make_unique<Impl>(filesystem_, format_, options, stats_);
return DataFragmentIterator(Impl(filesystem_, format_, options, stats_));
}

} // namespace dataset
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class ARROW_DS_EXPORT FileFormat {
virtual Status ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const = 0;
ScanTaskIterator* out) const = 0;

/// \brief Open a fragment
virtual Status MakeFragment(const FileSource& location,
Expand All @@ -148,8 +148,7 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment {
format_(std::move(format)),
scan_options_(std::move(scan_options)) {}

Status Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) override;
Status Scan(std::shared_ptr<ScanContext> scan_context, ScanTaskIterator* out) override;

const FileSource& source() const { return source_; }
std::shared_ptr<FileFormat> format() const { return format_; }
Expand All @@ -176,8 +175,7 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource {

std::string type() const override { return "directory"; }

std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) override;
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) override;

protected:
FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const override;
ScanTaskIterator* out) const override;
};

} // namespace dataset
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_feather.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat {
/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const override;
ScanTaskIterator* out) const override;
};

} // namespace dataset
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat {
/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const override;
ScanTaskIterator* out) const override;
};

} // namespace dataset
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class ParquetScanTask : public ScanTask {
return Status::OK();
}

std::unique_ptr<RecordBatchIterator> Scan() override {
return std::move(record_batch_reader_);
RecordBatchIterator Scan() {
return MakePointerIterator(std::move(record_batch_reader_));
}

private:
Expand Down Expand Up @@ -107,11 +107,11 @@ class ParquetRowGroupPartitioner {
int num_row_groups_;
};

class ParquetScanTaskIterator : public ScanTaskIterator {
class ParquetScanTaskIterator {
public:
static Status Make(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context, ParquetFileReaderPtr reader,
std::unique_ptr<ScanTaskIterator>* out) {
ScanTaskIterator* out) {
auto metadata = reader->metadata();

std::vector<int> columns_projection;
Expand All @@ -121,13 +121,13 @@ class ParquetScanTaskIterator : public ScanTaskIterator {
RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader),
&arrow_reader));

out->reset(new ParquetScanTaskIterator(columns_projection, metadata,
std::move(arrow_reader)));
*out = ScanTaskIterator(
ParquetScanTaskIterator(columns_projection, metadata, std::move(arrow_reader)));

return Status::OK();
}

Status Next(ScanTaskPtr* task) override {
Status Next(ScanTaskPtr* task) {
auto partition = partitionner_.Next();

// Iteration is done.
Expand Down Expand Up @@ -166,7 +166,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator {
Status ParquetFileFormat::ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const {
ScanTaskIterator* out) const {
std::shared_ptr<io::RandomAccessFile> input;
RETURN_NOT_OK(source.Open(&input));

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) const override;
ScanTaskIterator* out) const override;

Status MakeFragment(const FileSource& source, std::shared_ptr<ScanOptions> opts,
std::unique_ptr<DataFragment>* out) override;
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ Status WriteRecordBatchReader(RecordBatchReader* reader, FileWriter* writer) {
"'");
}

return reader->Visit([&](std::shared_ptr<RecordBatch> batch) -> Status {
return WriteRecordBatch(*batch, writer);
});
return MakePointerIterator(reader).Visit(
[&](std::shared_ptr<RecordBatch> batch) -> Status {
return WriteRecordBatch(*batch, writer);
});
}

Status WriteRecordBatchReader(
Expand Down Expand Up @@ -156,13 +157,13 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) {
auto source = GetFileSource(reader.get());
auto fragment = std::make_shared<ParquetFragment>(*source, opts_);

std::unique_ptr<ScanTaskIterator> it;
ScanTaskIterator it;
ASSERT_OK(fragment->Scan(ctx_, &it));
int64_t row_count = 0;

ASSERT_OK(it->Visit([&row_count](std::unique_ptr<ScanTask> task) -> Status {
ASSERT_OK(it.Visit([&row_count](std::unique_ptr<ScanTask> task) -> Status {
auto batch_it = task->Scan();
return batch_it->Visit([&row_count](std::shared_ptr<RecordBatch> batch) -> Status {
return batch_it.Visit([&row_count](std::shared_ptr<RecordBatch> batch) -> Status {
row_count += batch->num_rows();
return Status::OK();
});
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ class ARROW_DS_EXPORT Partition : public DataSource {
/// e.g. for the top-level partitioned source container
virtual const PartitionKey* key() const = 0;

virtual std::unique_ptr<DataFragmentIterator> GetFragments(
const Selector& selector) = 0;
virtual DataFragmentIterator GetFragments(const Selector& selector) = 0;
};

/// \brief Simple implementation of Partition, which consists of a
Expand All @@ -176,8 +175,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition {
const PartitionVector& subpartitions() const { return subpartitions_; }
const DataFragmentVector& data_fragments() const { return data_fragments_; }

std::unique_ptr<DataFragmentIterator> GetFragments(
const FilterVector& filters) override;
DataFragmentIterator GetFragments(const FilterVector& filters) override;

private:
std::unique_ptr<PartitionKey> key_;
Expand All @@ -200,13 +198,12 @@ class ARROW_DS_EXPORT LazyPartition : public Partition {
public:
const PartitionKey* key() const override;

std::unique_ptr<DataFragmentIterator> GetFragments(
const& DataSelector selector) override;
DataFragmentIterator GetFragments(const& DataSelector selector) override;

// TODO(wesm): Iterate over subpartitions

protected:
std::unique_ptr<PartitionIterator> partition_iter_;
PartitionIterator partition_iter_;

// By default, once this source is consumed using GetFragments, it
// cannot be consumed again. By setting this to true, we cache
Expand Down
20 changes: 9 additions & 11 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@
namespace arrow {
namespace dataset {

std::unique_ptr<RecordBatchIterator> SimpleScanTask::Scan() {
return MakeVectorIterator(record_batches_);
}
RecordBatchIterator SimpleScanTask::Scan() { return MakeVectorIterator(record_batches_); }

/// \brief GetFragmentsIterator transforms a vector<DataSource> in a flattened
/// Iterator<DataFragment>.
static std::unique_ptr<DataFragmentIterator> GetFragmentsIterator(
static DataFragmentIterator GetFragmentsIterator(
const std::vector<std::shared_ptr<DataSource>>& sources,
std::shared_ptr<ScanOptions> options) {
// Iterator<DataSource>
auto sources_it = MakeVectorIterator(sources);

// DataSource -> Iterator<DataFragment>
auto fn = [options](std::shared_ptr<DataSource> source)
-> std::unique_ptr<DataFragmentIterator> { return source->GetFragments(options); };
auto fn = [options](std::shared_ptr<DataSource> source) -> DataFragmentIterator {
return source->GetFragments(options);
};

// Iterator<Iterator<DataFragment>>
auto fragments_it = MakeMapIterator(fn, std::move(sources_it));
Expand All @@ -49,12 +48,11 @@ static std::unique_ptr<DataFragmentIterator> GetFragmentsIterator(

/// \brief GetScanTaskIterator transforms an Iterator<DataFragment> in a
/// flattened Iterator<ScanTask>.
static std::unique_ptr<ScanTaskIterator> GetScanTaskIterator(
std::unique_ptr<DataFragmentIterator> fragments,
std::shared_ptr<ScanContext> context) {
static ScanTaskIterator GetScanTaskIterator(DataFragmentIterator fragments,
std::shared_ptr<ScanContext> context) {
// DataFragment -> ScanTaskIterator
auto fn = [context](std::shared_ptr<DataFragment> fragment,
std::unique_ptr<ScanTaskIterator>* out) -> Status {
ScanTaskIterator* out) -> Status {
return fragment->Scan(context, out);
};

Expand All @@ -65,7 +63,7 @@ static std::unique_ptr<ScanTaskIterator> GetScanTaskIterator(
return MakeFlattenIterator(std::move(maybe_scantask_it));
}

std::unique_ptr<ScanTaskIterator> SimpleScanner::Scan() {
ScanTaskIterator SimpleScanner::Scan() {
// First, transforms DataSources in a flat Iterator<DataFragment>. This
// iterator is lazily constructed, i.e. DataSource::GetFragments is never
// invoked.
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ARROW_DS_EXPORT ScanTask {
/// \brief Iterate through sequence of materialized record batches
/// resulting from the Scan. Execution semantics encapsulated in the
/// particular ScanTask implementation
virtual std::unique_ptr<RecordBatchIterator> Scan() = 0;
virtual RecordBatchIterator Scan() = 0;

virtual ~ScanTask() = default;
};
Expand All @@ -75,7 +75,7 @@ class ARROW_DS_EXPORT SimpleScanTask : public ScanTask {
explicit SimpleScanTask(std::vector<std::shared_ptr<RecordBatch>> record_batches)
: record_batches_(std::move(record_batches)) {}

std::unique_ptr<RecordBatchIterator> Scan() override;
RecordBatchIterator Scan() override;

protected:
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
Expand All @@ -95,7 +95,7 @@ class ARROW_DS_EXPORT Scanner {
/// \brief The Scan operator returns a stream of ScanTask. The caller is
/// responsible to dispatch/schedule said tasks. Tasks should be safe to run
/// in a concurrent fashion and outlive the iterator.
virtual std::unique_ptr<ScanTaskIterator> Scan() = 0;
virtual ScanTaskIterator Scan() = 0;

virtual ~Scanner() = default;
};
Expand All @@ -122,7 +122,7 @@ class ARROW_DS_EXPORT SimpleScanner : public Scanner {
options_(std::move(options)),
context_(std::move(context)) {}

std::unique_ptr<ScanTaskIterator> Scan() override;
ScanTaskIterator Scan() override;

private:
std::vector<std::shared_ptr<DataSource>> sources_;
Expand Down
Loading