Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,17 @@ std::shared_ptr<ds::Dataset> GetDatasetFromFile(
std::string file) {
ds::FileSystemFactoryOptions options;
// The factory will try to build a child dataset.
auto factory = ds::FileSystemDatasetFactory::Make(fs, {file}, format, options).ValueOrDie();
auto factory =
ds::FileSystemDatasetFactory::Make(fs, {file}, format, options).ValueOrDie();

// Try to infer a common schema for all files.
auto schema = factory->Inspect(conf.inspect_options).ValueOrDie();
// Caller can optionally decide another schema as long as it is compatible
// with the previous one, e.g. `factory->Finish(compatible_schema)`.
auto child = factory->Finish(conf.finish_options).ValueOrDie();

ds::DatasetVector children{conf.repeat, child};
ds::DatasetVector children;
children.resize(conf.repeat, child);
auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children));

return dataset.ValueOrDie();
Expand Down
39 changes: 19 additions & 20 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,23 @@ namespace arrow {
namespace dataset {

Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
if (id() == PATH) {
return filesystem()->OpenInputFile(path());
if (filesystem_) {
return filesystem_->OpenInputFile(path_);
}

return std::make_shared<::arrow::io::BufferReader>(buffer());
}

Result<std::shared_ptr<arrow::io::OutputStream>> FileSource::OpenWritable() const {
if (!writable_) {
return Status::Invalid("file source '", path(), "' is not writable");
if (buffer_) {
return std::make_shared<::arrow::io::BufferReader>(buffer_);
}

if (id() == PATH) {
return filesystem()->OpenOutputStream(path());
return custom_open_();
}

Result<std::shared_ptr<arrow::io::OutputStream>> WritableFileSource::Open() const {
if (filesystem_) {
return filesystem_->OpenOutputStream(path_);
}

auto b = internal::checked_pointer_cast<ResizableBuffer>(buffer());
return std::make_shared<::arrow::io::BufferOutputStream>(b);
return std::make_shared<::arrow::io::BufferOutputStream>(buffer_);
}

Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(FileSource source) {
Expand All @@ -66,7 +65,7 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
}

Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
FileSource destination, std::shared_ptr<Fragment> fragment,
WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context) {
return Status::NotImplemented("writing fragment of format ", type_name());
Expand Down Expand Up @@ -168,15 +167,15 @@ Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write(
const auto& input_fragment = op.fragment();
FileSource dest(path, filesystem);

ARROW_ASSIGN_OR_RAISE(
auto fragment,
plan.format->MakeFragment(dest, input_fragment->partition_expression()));
fragments.push_back(std::move(fragment));
ARROW_ASSIGN_OR_RAISE(auto write_task,
plan.format->WriteFragment({path, filesystem}, input_fragment,
scan_options, scan_context));
task_group->Append([write_task] { return write_task->Execute(); });

ARROW_ASSIGN_OR_RAISE(
auto write_task,
plan.format->WriteFragment(dest, input_fragment, scan_options, scan_context));
task_group->Append([write_task] { return write_task->Execute(); });
auto fragment, plan.format->MakeFragment(
{path, filesystem}, input_fragment->partition_expression()));
fragments.push_back(std::move(fragment));
}
}

Expand Down
140 changes: 81 additions & 59 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
Expand All @@ -44,88 +45,109 @@ namespace dataset {
/// be read like a file
class ARROW_DS_EXPORT FileSource {
public:
// NOTE(kszucs): it'd be better to separate the BufferSource from FileSource
enum SourceKind { PATH, BUFFER };

FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED,
bool writable = true)
: source_kind_(PATH),
path_(std::move(path)),
Compression::type compression = Compression::UNCOMPRESSED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not satisfied how this class is transforming into a Franken-class. The need of static fake properties and the semi-broken default constructor.

I'd say make FileSource an interface and use inheritance, make Open() virtual, the path() and filesystem() will be specific to one implementation (maybe name them Source, FileSource, BufferSource, ...). We can make an accept visitor for classes who wants to touch properties like the path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that FileSource would benefit from refactoring but I think that doing so further in this PR (including resorbing WritableFileSource) is not necessary. I'll make a follow up JIRA

: path_(std::move(path)),
filesystem_(std::move(filesystem)),
compression_(compression),
writable_(writable) {}
compression_(compression) {}

explicit FileSource(std::shared_ptr<Buffer> buffer,
Compression::type compression = Compression::UNCOMPRESSED)
: source_kind_(BUFFER), buffer_(std::move(buffer)), compression_(compression) {}
: buffer_(std::move(buffer)), compression_(compression) {}

using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;
explicit FileSource(CustomOpen open) : custom_open_(std::move(open)) {}

explicit FileSource(std::shared_ptr<ResizableBuffer> buffer,
using CustomOpenWithCompression =
std::function<Result<std::shared_ptr<io::RandomAccessFile>>(Compression::type)>;
explicit FileSource(CustomOpenWithCompression open_with_compression,
Compression::type compression = Compression::UNCOMPRESSED)
: source_kind_(BUFFER),
buffer_(std::move(buffer)),
compression_(compression),
writable_(true) {}

bool operator==(const FileSource& other) const {
if (id() != other.id()) {
return false;
}
: custom_open_(std::bind(std::move(open_with_compression), compression)),
compression_(compression) {}

if (id() == PATH) {
return path() == other.path() && filesystem() == other.filesystem();
}
explicit FileSource(std::shared_ptr<io::RandomAccessFile> file,
Compression::type compression = Compression::UNCOMPRESSED)
: custom_open_([=] { return ToResult(file); }), compression_(compression) {}

return buffer()->Equals(*other.buffer());
}
FileSource() : custom_open_(CustomOpen{&InvalidOpen}) {}

/// \brief The kind of file, whether stored in a filesystem, memory
/// resident, or other
SourceKind id() const { return source_kind_; }
static std::vector<FileSource> FromPaths(const std::shared_ptr<fs::FileSystem>& fs,
std::vector<std::string> paths) {
std::vector<FileSource> sources;
for (auto&& path : paths) {
sources.emplace_back(std::move(path), fs);
}
return sources;
}

/// \brief Return the type of raw compression on the file, if any
/// \brief Return the type of raw compression on the file, if any.
Compression::type compression() const { return compression_; }

/// \brief Whether the this source may be opened writable
bool writable() const { return writable_; }

/// \brief Return the file path, if any. Only valid when file source
/// type is PATH
/// \brief Return the file path, if any. Only valid when file source wraps a path.
const std::string& path() const {
static std::string buffer_path = "<Buffer>";
return id() == PATH ? path_ : buffer_path;
static std::string custom_open_path = "<Buffer>";
return filesystem_ ? path_ : buffer_ ? buffer_path : custom_open_path;
}

/// \brief Return the filesystem, if any. Only non null when file
/// source type is PATH
const std::shared_ptr<fs::FileSystem>& filesystem() const {
static std::shared_ptr<fs::FileSystem> no_fs = NULLPTR;
return id() == PATH ? filesystem_ : no_fs;
}
/// \brief Return the filesystem, if any. Otherwise returns nullptr
const std::shared_ptr<fs::FileSystem>& filesystem() const { return filesystem_; }

/// \brief Return the buffer containing the file, if any. Only value
/// when file source type is BUFFER
const std::shared_ptr<Buffer>& buffer() const {
static std::shared_ptr<Buffer> path_buffer = NULLPTR;
return id() == BUFFER ? buffer_ : path_buffer;
}
/// \brief Return the buffer containing the file, if any. Otherwise returns nullptr
const std::shared_ptr<Buffer>& buffer() const { return buffer_; }

/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<arrow::io::RandomAccessFile>> Open() const;

/// \brief Get an OutputStream which wraps this file source
Result<std::shared_ptr<arrow::io::OutputStream>> OpenWritable() const;
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;

private:
SourceKind source_kind_;
static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
return Status::Invalid("Called Open() on an uninitialized FileSource");
}

std::string path_;
std::shared_ptr<fs::FileSystem> filesystem_;

std::shared_ptr<Buffer> buffer_;
CustomOpen custom_open_;
Compression::type compression_ = Compression::UNCOMPRESSED;
};

/// \brief The path and filesystem where an actual file is located or a buffer which can
/// be written to like a file
class ARROW_DS_EXPORT WritableFileSource {
public:
WritableFileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: path_(std::move(path)),
filesystem_(std::move(filesystem)),
compression_(compression) {}

explicit WritableFileSource(std::shared_ptr<ResizableBuffer> buffer,
Compression::type compression = Compression::UNCOMPRESSED)
: buffer_(std::move(buffer)), compression_(compression) {}

/// \brief Return the type of raw compression on the file, if any
Compression::type compression() const { return compression_; }

/// \brief Return the file path, if any. Only valid when file source wraps a path.
const std::string& path() const {
static std::string buffer_path = "<Buffer>";
return filesystem_ ? path_ : buffer_path;
}

/// \brief Return the filesystem, if any. Otherwise returns nullptr
const std::shared_ptr<fs::FileSystem>& filesystem() const { return filesystem_; }

Compression::type compression_;
bool writable_ = false;
/// \brief Return the buffer containing the file, if any. Otherwise returns nullptr
const std::shared_ptr<ResizableBuffer>& buffer() const { return buffer_; }

/// \brief Get an OutputStream which wraps this file source
Result<std::shared_ptr<arrow::io::OutputStream>> Open() const;

private:
std::string path_;
std::shared_ptr<fs::FileSystem> filesystem_;
std::shared_ptr<ResizableBuffer> buffer_;
Compression::type compression_ = Compression::UNCOMPRESSED;
};

/// \brief Base class for file format implementation
Expand Down Expand Up @@ -159,7 +181,7 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
/// \brief Write a fragment. If the parent directory of destination does not exist, it
/// will be created.
virtual Result<std::shared_ptr<WriteTask>> WriteFragment(
FileSource destination, std::shared_ptr<Fragment> fragment,
WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> scan_context); // FIXME(bkietz) make this pure virtual
};
Expand Down Expand Up @@ -256,16 +278,16 @@ class ARROW_DS_EXPORT WriteTask {

virtual ~WriteTask() = default;

const FileSource& destination() const;
const WritableFileSource& destination() const;
const std::shared_ptr<FileFormat>& format() const { return format_; }

protected:
WriteTask(FileSource destination, std::shared_ptr<FileFormat> format)
WriteTask(WritableFileSource destination, std::shared_ptr<FileFormat> format)
: destination_(std::move(destination)), format_(std::move(format)) {}

Status CreateDestinationParentDir() const;

FileSource destination_;
WritableFileSource destination_;
std::shared_ptr<FileFormat> format_;
};

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(

class IpcWriteTask : public WriteTask {
public:
IpcWriteTask(FileSource destination, std::shared_ptr<FileFormat> format,
IpcWriteTask(WritableFileSource destination, std::shared_ptr<FileFormat> format,
std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context)
Expand All @@ -174,7 +174,7 @@ class IpcWriteTask : public WriteTask {

auto schema = scan_options_->schema();

ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable());
ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.Open());
ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewFileWriter(out_stream.get(), schema));
ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
fragment_->Scan(scan_options_, scan_context_));
Expand All @@ -200,7 +200,7 @@ class IpcWriteTask : public WriteTask {
};

Result<std::shared_ptr<WriteTask>> IpcFileFormat::WriteFragment(
FileSource destination, std::shared_ptr<Fragment> fragment,
WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<ScanContext> scan_context) {
return std::make_shared<IpcWriteTask>(std::move(destination), shared_from_this(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
std::shared_ptr<ScanContext> context) const override;

Result<std::shared_ptr<WriteTask>> WriteFragment(
FileSource destination, std::shared_ptr<Fragment> fragment,
WritableFileSource destination, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) override;
};
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin {
kBatchRepetitions);
}

Result<FileSource> GetFileSink() {
Result<WritableFileSource> GetFileSink() {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ResizableBuffer> buffer,
AllocateResizableBuffer(0));
return FileSource(std::move(buffer));
return WritableFileSource(std::move(buffer));
}

RecordBatchIterator Batches(ScanTaskIterator scan_task_it) {
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ TEST(FileSource, PathBased) {

ASSERT_EQ(p1, source1.path());
ASSERT_TRUE(localfs->Equals(*source1.filesystem()));
ASSERT_EQ(FileSource::PATH, source1.id());
ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());

ASSERT_EQ(p2, source2.path());
ASSERT_TRUE(localfs->Equals(*source2.filesystem()));
ASSERT_EQ(FileSource::PATH, source2.id());
ASSERT_EQ(Compression::GZIP, source2.compression());

// Test copy constructor and comparison
FileSource source3 = source1;
ASSERT_EQ(source1, source3);
FileSource source3;
source3 = source1;
ASSERT_EQ(source1.path(), source3.path());
ASSERT_EQ(source1.filesystem(), source3.filesystem());
}

TEST(FileSource, BufferBased) {
Expand All @@ -69,13 +69,15 @@ TEST(FileSource, BufferBased) {
FileSource source1(buf);
FileSource source2(buf, Compression::LZ4);

ASSERT_EQ(FileSource::BUFFER, source1.id());
ASSERT_TRUE(source1.buffer()->Equals(*buf));
ASSERT_EQ(Compression::UNCOMPRESSED, source1.compression());

ASSERT_EQ(FileSource::BUFFER, source2.id());
ASSERT_TRUE(source2.buffer()->Equals(*buf));
ASSERT_EQ(Compression::LZ4, source2.compression());

FileSource source3;
source3 = source1;
ASSERT_EQ(source1.buffer(), source3.buffer());
}

TEST_F(TestFileSystemDataset, Basic) {
Expand Down
Loading