Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ SimpleDataFragment::SimpleDataFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches)
: record_batches_(std::move(record_batches)) {}

Status SimpleDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) {
Status SimpleDataFragment::Scan(std::unique_ptr<ScanTaskIterator>* out) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto it = MakeVectorIterator(record_batches_);
Expand All @@ -56,8 +55,7 @@ Status Dataset::Make(const std::vector<std::shared_ptr<DataSource>>& sources,
}

Status Dataset::NewScan(std::unique_ptr<ScannerBuilder>* out) {
auto context = std::make_shared<ScanContext>();
out->reset(new ScannerBuilder(this->shared_from_this(), context));
out->reset(new ScannerBuilder(this->shared_from_this()));
return Status::OK();
}

Expand Down
18 changes: 9 additions & 9 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>
#include <vector>

#include "arrow/dataset/scanner.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/iterator.h"
Expand All @@ -38,18 +39,17 @@ class ARROW_DS_EXPORT DataFragment {
public:
/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this DataFragment.
virtual Status Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) = 0;
virtual Status Scan(std::unique_ptr<ScanTaskIterator>* out) = 0;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
virtual bool splittable() const = 0;

/// \brief Filtering, schema reconciliation, and partition options to use when
/// \brief Filtering, schema reconciliation, and partition keys to use when
/// scanning this fragment. May be nullptr, which indicates that no filtering
/// or schema reconciliation will be performed and all partitions will be
/// scanned.
virtual std::shared_ptr<ScanOptions> scan_options() const = 0;
virtual const ScanOptions& scan_options() const = 0;

virtual ~DataFragment() = default;
};
Expand All @@ -60,15 +60,15 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment {
public:
explicit SimpleDataFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches);

Status Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) override;
Status Scan(std::unique_ptr<ScanTaskIterator>* out) override;

bool splittable() const override { return false; }

std::shared_ptr<ScanOptions> scan_options() const override { return NULLPTR; }
const ScanOptions& scan_options() const override { return scan_options_; }

protected:
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
ScanOptions scan_options_;
};

/// \brief A basic component of a Dataset which yields zero or more
Expand All @@ -79,7 +79,7 @@ class ARROW_DS_EXPORT DataSource {
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
virtual std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) = 0;
const ScanOptions& scan_options) = 0;

virtual std::string type() const = 0;

Expand All @@ -93,7 +93,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
: fragments_(std::move(fragments)) {}

std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) override {
const ScanOptions& scan_options) override {
return MakeVectorIterator(fragments_);
}

Expand Down
27 changes: 12 additions & 15 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,22 @@ Status FileSource::Open(std::shared_ptr<arrow::io::RandomAccessFile>* out) const
return Status::OK();
}

Status FileBasedDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) {
return format_->ScanFile(source_, scan_options_, scan_context, out);
Status FileBasedDataFragment::Scan(std::unique_ptr<ScanTaskIterator>* out) {
return format_->ScanFile(source_, scan_options_, out);
}

FileSystemBasedDataSource::FileSystemBasedDataSource(
fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format, std::shared_ptr<ScanOptions> scan_options,
std::vector<fs::FileStats> stats)
FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::vector<fs::FileStats> stats)
: filesystem_(filesystem),
selector_(std::move(selector)),
format_(std::move(format)),
scan_options_(std::move(scan_options)),
stats_(std::move(stats)) {}

Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::unique_ptr<FileSystemBasedDataSource>* out) {
std::vector<fs::FileStats> stats;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats));
Expand All @@ -71,18 +68,18 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
stats.resize(new_end - stats.begin());

out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format),
std::move(scan_options), std::move(stats)));
std::move(stats)));
return Status::OK();
}

std::unique_ptr<DataFragmentIterator> FileSystemBasedDataSource::GetFragments(
std::shared_ptr<ScanOptions> options) {
const ScanOptions& scan_options) {
struct Impl : DataFragmentIterator {
Impl(fs::FileSystem* filesystem, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options, std::vector<fs::FileStats> stats)
const ScanOptions& scan_options, std::vector<fs::FileStats> stats)
: filesystem_(filesystem),
format_(std::move(format)),
scan_options_(std::move(scan_options)),
scan_options_(scan_options),
stats_(std::move(stats)) {}

Status Next(std::shared_ptr<DataFragment>* out) {
Expand All @@ -101,11 +98,11 @@ std::unique_ptr<DataFragmentIterator> FileSystemBasedDataSource::GetFragments(
size_t i_ = 0;
fs::FileSystem* filesystem_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
ScanOptions scan_options_;
std::vector<fs::FileStats> stats_;
};

return internal::make_unique<Impl>(filesystem_, format_, options, stats_);
return internal::make_unique<Impl>(filesystem_, format_, scan_options, stats_);
}

} // namespace dataset
Expand Down
37 changes: 7 additions & 30 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,6 @@ class ARROW_DS_EXPORT FileSource {
std::shared_ptr<Buffer> buffer_;
};

/// \brief Base class for file scanning options
class ARROW_DS_EXPORT FileScanOptions : public ScanOptions {
public:
/// \brief The name of the file format this options corresponds to
virtual std::string file_type() const = 0;
};

/// \brief Base class for file writing options
class ARROW_DS_EXPORT FileWriteOptions : public WriteOptions {
public:
virtual ~FileWriteOptions() = default;

/// \brief The name of the file format this options corresponds to
virtual std::string file_type() const = 0;
};

/// \brief Base class for file format implementation
class ARROW_DS_EXPORT FileFormat {
public:
Expand All @@ -128,38 +112,34 @@ class ARROW_DS_EXPORT FileFormat {
virtual bool IsKnownExtension(const std::string& ext) const = 0;

/// \brief Open a file for scanning
virtual Status ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
virtual Status ScanFile(const FileSource& source, const ScanOptions& scan_options,
std::unique_ptr<ScanTaskIterator>* out) const = 0;

/// \brief Open a fragment
virtual Status MakeFragment(const FileSource& location,
std::shared_ptr<ScanOptions> opts,
virtual Status MakeFragment(const FileSource& location, const ScanOptions& scan_options,
std::unique_ptr<DataFragment>* out) = 0;
};

/// \brief A DataFragment that is stored in a file with a known format
class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment {
public:
FileBasedDataFragment(const FileSource& source, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options)
const ScanOptions& scan_options)
: source_(source),
format_(std::move(format)),
scan_options_(std::move(scan_options)) {}

Status Scan(std::shared_ptr<ScanContext> scan_context,
std::unique_ptr<ScanTaskIterator>* out) override;
Status Scan(std::unique_ptr<ScanTaskIterator>* out) override;

const FileSource& source() const { return source_; }
std::shared_ptr<FileFormat> format() const { return format_; }

std::shared_ptr<ScanOptions> scan_options() const override { return scan_options_; }
const ScanOptions& scan_options() const override { return scan_options_; }

protected:
FileSource source_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
ScanOptions scan_options_;
};

/// \brief A DataSource which takes files of one format from a directory
Expand All @@ -171,24 +151,21 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource {
public:
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::unique_ptr<FileSystemBasedDataSource>* out);

std::string type() const override { return "directory"; }

std::unique_ptr<DataFragmentIterator> GetFragments(
std::shared_ptr<ScanOptions> options) override;
const ScanOptions& scan_options) override;

protected:
FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::vector<fs::FileStats> stats);

fs::FileSystem* filesystem_ = NULLPTR;
fs::Selector selector_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
std::vector<fs::FileStats> stats_;
};

Expand Down
40 changes: 23 additions & 17 deletions cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,6 @@ class FileSystem;

namespace dataset {

class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions {
public:
std::string file_type() const override;

private:
csv::ParseOptions parse_options_;
csv::ConvertOptions convert_options_;
csv::ReadOptions read_options_;
};

class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions {
public:
std::string file_type() const override;
};

/// \brief A FileFormat implementation that reads from CSV files
class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
public:
Expand All @@ -60,9 +45,30 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
bool IsKnownExtension(const std::string& ext) const override;

/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
Status ScanFile(const FileSource& source, const ScanOptions& scan_options,
std::unique_ptr<ScanTaskIterator>* out) const override;

Status MakeFragment(const FileSource& source, const ScanOptions& scan_options,
std::unique_ptr<DataFragment>* out) override;
};

class ARROW_DS_EXPORT CsvScanOptions : public ScanOptions::FileOptions {
public:
std::shared_ptr<FileFormat> file_format() const override {
return std::make_shared<CsvFileFormat>();
}

private:
csv::ParseOptions parse_options_;
csv::ConvertOptions convert_options_;
csv::ReadOptions read_options_;
};

class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions {
public:
std::shared_ptr<FileFormat> file_format() const override {
return std::make_shared<CsvFileFormat>();
}
};

} // namespace dataset
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/dataset/file_feather.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
namespace arrow {
namespace dataset {

class ARROW_DS_EXPORT FeatherScanOptions : public FileScanOptions {
class ARROW_DS_EXPORT FeatherScanOptions : public ScanOptions::FileOptions {
public:
std::string file_type() const override;
std::shared_ptr<FileFormat> file_format() const override;
};

class ARROW_DS_EXPORT FeatherWriterOptions : public FileWriteOptions {
public:
std::string file_type() const override;
std::shared_ptr<FileFormat> file_format() const override;
};

/// \brief A FileFormat implementation that reads from Feather (Arrow
Expand All @@ -47,8 +47,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat {
bool IsKnownExtension(const std::string& ext) const override;

/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
Status ScanFile(const FileSource& source, const ScanOptions& scan_options,
std::unique_ptr<ScanTaskIterator>* out) const override;
};

Expand Down
39 changes: 22 additions & 17 deletions cpp/src/arrow/dataset/file_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,6 @@
namespace arrow {
namespace dataset {

class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions {
public:
///
std::string file_type() const override;

private:
json::ParseOptions parse_options_;
json::ReadOptions read_options_;
};

class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions {
public:
std::string file_type() const override;
};

/// \brief A FileFormat implementation that reads from JSON files
class ARROW_DS_EXPORT JsonFileFormat : public FileFormat {
public:
Expand All @@ -52,9 +37,29 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat {
bool IsKnownExtension(const std::string& ext) const override;

/// \brief Open a file for scanning
Status ScanFile(const FileSource& source, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context,
Status ScanFile(const FileSource& source, const ScanOptions& scan_options,
std::unique_ptr<ScanTaskIterator>* out) const override;

Status MakeFragment(const FileSource& source, const ScanOptions& scan_options,
std::unique_ptr<DataFragment>* out) override;
};

class ARROW_DS_EXPORT JsonScanOptions : public ScanOptions::FileOptions {
public:
std::shared_ptr<FileFormat> file_format() const override {
return std::make_shared<JsonFileFormat>();
}

private:
json::ParseOptions parse_options_;
json::ReadOptions read_options_;
};

class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions {
public:
std::shared_ptr<FileFormat> file_format() const override {
return std::make_shared<JsonFileFormat>();
}
};

} // namespace dataset
Expand Down
Loading