Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ class WriteQueue {

ARROW_ASSIGN_OR_RAISE(
writer_, write_options.format()->MakeWriter(std::move(destination), schema_,
write_options.file_write_options));
write_options.file_write_options,
{write_options.filesystem, path}));
return Status::OK();
}

Expand All @@ -445,15 +446,15 @@ struct WriteState {
std::unordered_map<std::string, std::unique_ptr<WriteQueue>> queues;
};

Status WriteNextBatch(WriteState& state, const std::shared_ptr<Fragment>& fragment,
Status WriteNextBatch(WriteState* state, const std::shared_ptr<Fragment>& fragment,
std::shared_ptr<RecordBatch> batch) {
ARROW_ASSIGN_OR_RAISE(auto groups, state.write_options.partitioning->Partition(batch));
ARROW_ASSIGN_OR_RAISE(auto groups, state->write_options.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory

if (groups.batches.size() > static_cast<size_t>(state.write_options.max_partitions)) {
if (groups.batches.size() > static_cast<size_t>(state->write_options.max_partitions)) {
return Status::Invalid("Fragment would be written into ", groups.batches.size(),
" partitions. This exceeds the maximum of ",
state.write_options.max_partitions);
state->write_options.max_partitions);
}

std::unordered_set<WriteQueue*> need_flushed;
Expand All @@ -462,20 +463,20 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr<Fragment>& fragme
and_(std::move(groups.expressions[i]), fragment->partition_expression());
auto batch = std::move(groups.batches[i]);

ARROW_ASSIGN_OR_RAISE(auto part,
state.write_options.partitioning->Format(partition_expression));
ARROW_ASSIGN_OR_RAISE(
auto part, state->write_options.partitioning->Format(partition_expression));

WriteQueue* queue;
{
// lookup the queue to which batch should be appended
auto queues_lock = state.mutex.Lock();
auto queues_lock = state->mutex.Lock();

queue = internal::GetOrInsertGenerated(
&state.queues, std::move(part),
&state->queues, std::move(part),
[&](const std::string& emplaced_part) {
// lookup in `queues` also failed,
// generate a new WriteQueue
size_t queue_index = state.queues.size() - 1;
size_t queue_index = state->queues.size() - 1;

return internal::make_unique<WriteQueue>(emplaced_part, queue_index,
batch->schema());
Expand All @@ -489,12 +490,12 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr<Fragment>& fragme

// flush all touched WriteQueues
for (auto queue : need_flushed) {
RETURN_NOT_OK(queue->Flush(state.write_options));
RETURN_NOT_OK(queue->Flush(state->write_options));
}
return Status::OK();
}

Status WriteInternal(const ScanOptions& scan_options, WriteState& state,
Status WriteInternal(const ScanOptions& scan_options, WriteState* state,
ScanTaskVector scan_tasks) {
// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
Expand Down Expand Up @@ -544,7 +545,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
#pragma warning(disable : 4996)
#endif

// TODO: (ARROW-11782/ARROW-12288) Remove calls to Scan()
// TODO(ARROW-11782/ARROW-12288) Remove calls to Scan()
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, scanner->Scan());
ARROW_ASSIGN_OR_RAISE(ScanTaskVector scan_tasks, scan_task_it.ToVector());

Expand All @@ -555,11 +556,14 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
#endif

WriteState state(write_options);
RETURN_NOT_OK(WriteInternal(*scanner->options(), state, std::move(scan_tasks)));
RETURN_NOT_OK(WriteInternal(*scanner->options(), &state, std::move(scan_tasks)));

auto task_group = scanner->options()->TaskGroup();
for (const auto& part_queue : state.queues) {
task_group->Append([&] { return part_queue.second->writer()->Finish(); });
task_group->Append([&] {
RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get()));
return part_queue.second->writer()->Finish();
});
}
return task_group->Finish();
}
Expand Down
17 changes: 14 additions & 3 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
/// \brief Create a writer for this format.
virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const = 0;
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const = 0;

/// \brief Get default write options for this format.
virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() = 0;
Expand Down Expand Up @@ -313,19 +314,23 @@ class ARROW_DS_EXPORT FileWriter {
const std::shared_ptr<FileFormat>& format() const { return options_->format(); }
const std::shared_ptr<Schema>& schema() const { return schema_; }
const std::shared_ptr<FileWriteOptions>& options() const { return options_; }
const fs::FileLocator& destination() const { return destination_locator_; }

protected:
FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options,
std::shared_ptr<io::OutputStream> destination)
std::shared_ptr<io::OutputStream> destination,
fs::FileLocator destination_locator)
: schema_(std::move(schema)),
options_(std::move(options)),
destination_(destination) {}
destination_(std::move(destination)),
destination_locator_(std::move(destination_locator)) {}

virtual Status FinishInternal() = 0;

std::shared_ptr<Schema> schema_;
std::shared_ptr<FileWriteOptions> options_;
std::shared_ptr<io::OutputStream> destination_;
fs::FileLocator destination_locator_;
};

/// \brief Options for writing a dataset.
Expand All @@ -349,6 +354,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
/// {i} will be replaced by an auto incremented integer.
std::string basename_template;

/// Callback to be invoked against all FileWriters before
/// they are finalized with FileWriter::Finish().
std::function<Status(FileWriter*)> writer_pre_finish = [](FileWriter*) {
return Status::OK();
};

const std::shared_ptr<FileFormat>& format() const {
return file_write_options->format();
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override {
return Status::NotImplemented("writing fragment of CsvFileFormat");
}

Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ std::shared_ptr<FileWriteOptions> IpcFileFormat::DefaultWriteOptions() {

Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const {
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const {
if (!Equals(*options->format())) {
return Status::TypeError("Mismatching format/write options.");
}
Expand All @@ -274,14 +275,16 @@ Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(

return std::shared_ptr<FileWriter>(
new IpcFileWriter(std::move(destination), std::move(writer), std::move(schema),
std::move(ipc_options)));
std::move(ipc_options), std::move(destination_locator)));
}

IpcFileWriter::IpcFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<ipc::RecordBatchWriter> writer,
std::shared_ptr<Schema> schema,
std::shared_ptr<IpcFileWriteOptions> options)
: FileWriter(std::move(schema), std::move(options), std::move(destination)),
std::shared_ptr<IpcFileWriteOptions> options,
fs::FileLocator destination_locator)
: FileWriter(std::move(schema), std::move(options), std::move(destination),
std::move(destination_locator)),
batch_writer_(std::move(writer)) {}

Status IpcFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override;
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override;

std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};
Expand Down Expand Up @@ -107,7 +108,8 @@ class ARROW_DS_EXPORT IpcFileWriter : public FileWriter {
IpcFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<ipc::RecordBatchWriter> writer,
std::shared_ptr<Schema> schema,
std::shared_ptr<IpcFileWriteOptions> options);
std::shared_ptr<IpcFileWriteOptions> options,
fs::FileLocator destination_locator);

Status FinishInternal() override;

Expand Down
16 changes: 10 additions & 6 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ParquetScanTask : public ScanTask {
reader_(std::move(reader)),
pre_buffer_once_(std::move(pre_buffer_once)),
pre_buffer_row_groups_(std::move(pre_buffer_row_groups)),
io_context_(io_context),
io_context_(std::move(io_context)),
cache_options_(cache_options) {}

Result<RecordBatchIterator> Execute() override {
Expand Down Expand Up @@ -521,7 +521,8 @@ std::shared_ptr<FileWriteOptions> ParquetFileFormat::DefaultWriteOptions() {

Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const {
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const {
if (!Equals(*options->format())) {
return Status::TypeError("Mismatching format/write options");
}
Expand All @@ -533,14 +534,17 @@ Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
*schema, default_memory_pool(), destination, parquet_options->writer_properties,
parquet_options->arrow_writer_properties, &parquet_writer));

return std::shared_ptr<FileWriter>(new ParquetFileWriter(
std::move(destination), std::move(parquet_writer), std::move(parquet_options)));
return std::shared_ptr<FileWriter>(
new ParquetFileWriter(std::move(destination), std::move(parquet_writer),
std::move(parquet_options), std::move(destination_locator)));
}

ParquetFileWriter::ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<parquet::arrow::FileWriter> writer,
std::shared_ptr<ParquetFileWriteOptions> options)
: FileWriter(writer->schema(), std::move(options), std::move(destination)),
std::shared_ptr<ParquetFileWriteOptions> options,
fs::FileLocator destination_locator)
: FileWriter(writer->schema(), std::move(options), std::move(destination),
std::move(destination_locator)),
parquet_writer_(std::move(writer)) {}

Status ParquetFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override;
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override;

std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};
Expand Down Expand Up @@ -252,7 +253,8 @@ class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {
private:
ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<parquet::arrow::FileWriter> writer,
std::shared_ptr<ParquetFileWriteOptions> options);
std::shared_ptr<ParquetFileWriteOptions> options,
fs::FileLocator destination_locator);

Status FinishInternal() override;

Expand Down
19 changes: 10 additions & 9 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,23 @@ constexpr int kNumScanTasks = 2;
constexpr int kBatchesPerScanTask = 2;
constexpr int kRowsPerBatch = 1024;
class MockFileFormat : public FileFormat {
virtual std::string type_name() const { return "mock"; }
virtual bool Equals(const FileFormat& other) const { return false; }
virtual Result<bool> IsSupported(const FileSource& source) const { return true; }
virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const {
std::string type_name() const override { return "mock"; }
bool Equals(const FileFormat& other) const override { return false; }
Result<bool> IsSupported(const FileSource& source) const override { return true; }
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override {
return Status::NotImplemented("Not needed for test");
}
virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const {
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override {
return Status::NotImplemented("Not needed for test");
}
virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() { return nullptr; }
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return nullptr; }

virtual Result<ScanTaskIterator> ScanFile(
Result<ScanTaskIterator> ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
const std::shared_ptr<FileFragment>& file) const override {
auto sch = schema({field("i32", int32())});
ScanTaskVector scan_tasks;
for (int i = 0; i < kNumScanTasks; i++) {
Expand Down
21 changes: 17 additions & 4 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ class FileFormatFixtureMixin : public ::testing::Test {
EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());

if (!options) options = format->DefaultWriteOptions();
EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options));
EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options, {}));
ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get()));
ARROW_EXPECT_OK(writer->Finish());
EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
Expand Down Expand Up @@ -688,7 +688,8 @@ class DummyFileFormat : public FileFormat {

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override {
return Status::NotImplemented("writing fragment of DummyFileFormat");
}

Expand Down Expand Up @@ -736,7 +737,8 @@ class JSONRecordBatchFileFormat : public FileFormat {

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
std::shared_ptr<FileWriteOptions> options,
fs::FileLocator destination_locator) const override {
return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat");
}

Expand Down Expand Up @@ -1023,8 +1025,12 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
void SetWriteOptions(std::shared_ptr<FileWriteOptions> file_write_options) {
write_options_.file_write_options = file_write_options;
write_options_.filesystem = fs_;
write_options_.base_dir = "new_root/";
write_options_.base_dir = "/new_root/";
write_options_.basename_template = "dat_{i}";
write_options_.writer_pre_finish = [this](FileWriter* writer) {
visited_paths_.push_back(writer->destination().path);
return Status::OK();
};
}

void DoWrite(std::shared_ptr<Partitioning> desired_partitioning) {
Expand Down Expand Up @@ -1176,11 +1182,17 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
for (const auto& file_contents : expected_files_) {
expected_paths.insert(file_contents.first);
}

// expect the written filesystem to contain precisely the paths we expected
for (auto path : checked_pointer_cast<FileSystemDataset>(written_)->files()) {
actual_paths.insert(std::move(path));
}
EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths));

// Additionally, the writer producing each written file was visited and its path
// collected. That should match the expected paths as well
EXPECT_THAT(visited_paths_, testing::UnorderedElementsAreArray(expected_paths));

ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments());
for (auto maybe_fragment : written_fragments_it) {
ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment);
Expand Down Expand Up @@ -1223,6 +1235,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
PathAndContent expected_files_;
std::shared_ptr<Schema> expected_physical_schema_;
std::shared_ptr<Dataset> written_;
std::vector<std::string> visited_paths_;
FileSystemDatasetWriteOptions write_options_;
std::shared_ptr<ScanOptions> scan_options_;
};
Expand Down