From 0949d180ff3669afd3c8d457aa10696752e613dd Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 22 Jun 2021 13:28:25 -0400 Subject: [PATCH] ARROW-10440: [C++][Dataset] Visit FileWriters before Finish --- cpp/src/arrow/dataset/file_base.cc | 34 +++++++++++++++------------ cpp/src/arrow/dataset/file_base.h | 17 +++++++++++--- cpp/src/arrow/dataset/file_csv.h | 3 ++- cpp/src/arrow/dataset/file_ipc.cc | 11 +++++---- cpp/src/arrow/dataset/file_ipc.h | 6 +++-- cpp/src/arrow/dataset/file_parquet.cc | 16 ++++++++----- cpp/src/arrow/dataset/file_parquet.h | 6 +++-- cpp/src/arrow/dataset/file_test.cc | 19 ++++++++------- cpp/src/arrow/dataset/test_util.h | 21 +++++++++++++---- 9 files changed, 87 insertions(+), 46 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index b1cbd63ec61..741071d1703 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -418,7 +418,8 @@ class WriteQueue { ARROW_ASSIGN_OR_RAISE( writer_, write_options.format()->MakeWriter(std::move(destination), schema_, - write_options.file_write_options)); + write_options.file_write_options, + {write_options.filesystem, path})); return Status::OK(); } @@ -445,15 +446,15 @@ struct WriteState { std::unordered_map> queues; }; -Status WriteNextBatch(WriteState& state, const std::shared_ptr& fragment, +Status WriteNextBatch(WriteState* state, const std::shared_ptr& fragment, std::shared_ptr batch) { - ARROW_ASSIGN_OR_RAISE(auto groups, state.write_options.partitioning->Partition(batch)); + ARROW_ASSIGN_OR_RAISE(auto groups, state->write_options.partitioning->Partition(batch)); batch.reset(); // drop to hopefully conserve memory - if (groups.batches.size() > static_cast(state.write_options.max_partitions)) { + if (groups.batches.size() > static_cast(state->write_options.max_partitions)) { return Status::Invalid("Fragment would be written into ", groups.batches.size(), " partitions. This exceeds the maximum of ", - state.write_options.max_partitions); + state->write_options.max_partitions); } std::unordered_set need_flushed; @@ -462,20 +463,20 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr& fragme and_(std::move(groups.expressions[i]), fragment->partition_expression()); auto batch = std::move(groups.batches[i]); - ARROW_ASSIGN_OR_RAISE(auto part, - state.write_options.partitioning->Format(partition_expression)); + ARROW_ASSIGN_OR_RAISE( + auto part, state->write_options.partitioning->Format(partition_expression)); WriteQueue* queue; { // lookup the queue to which batch should be appended - auto queues_lock = state.mutex.Lock(); + auto queues_lock = state->mutex.Lock(); queue = internal::GetOrInsertGenerated( - &state.queues, std::move(part), + &state->queues, std::move(part), [&](const std::string& emplaced_part) { // lookup in `queues` also failed, // generate a new WriteQueue - size_t queue_index = state.queues.size() - 1; + size_t queue_index = state->queues.size() - 1; return internal::make_unique(emplaced_part, queue_index, batch->schema()); @@ -489,12 +490,12 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr& fragme // flush all touched WriteQueues for (auto queue : need_flushed) { - RETURN_NOT_OK(queue->Flush(state.write_options)); + RETURN_NOT_OK(queue->Flush(state->write_options)); } return Status::OK(); } -Status WriteInternal(const ScanOptions& scan_options, WriteState& state, +Status WriteInternal(const ScanOptions& scan_options, WriteState* state, ScanTaskVector scan_tasks) { // Store a mapping from partitions (represened by their formatted partition expressions) // to a WriteQueue which flushes batches into that partition's output file. In principle @@ -544,7 +545,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio #pragma warning(disable : 4996) #endif - // TODO: (ARROW-11782/ARROW-12288) Remove calls to Scan() + // TODO(ARROW-11782/ARROW-12288) Remove calls to Scan() ARROW_ASSIGN_OR_RAISE(auto scan_task_it, scanner->Scan()); ARROW_ASSIGN_OR_RAISE(ScanTaskVector scan_tasks, scan_task_it.ToVector()); @@ -555,11 +556,14 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio #endif WriteState state(write_options); - RETURN_NOT_OK(WriteInternal(*scanner->options(), state, std::move(scan_tasks))); + RETURN_NOT_OK(WriteInternal(*scanner->options(), &state, std::move(scan_tasks))); auto task_group = scanner->options()->TaskGroup(); for (const auto& part_queue : state.queues) { - task_group->Append([&] { return part_queue.second->writer()->Finish(); }); + task_group->Append([&] { + RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get())); + return part_queue.second->writer()->Finish(); + }); } return task_group->Finish(); } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index dd47b1226f4..f074e0f81da 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -175,7 +175,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const = 0; + std::shared_ptr options, + fs::FileLocator destination_locator) const = 0; /// \brief Get default write options for this format. virtual std::shared_ptr DefaultWriteOptions() = 0; @@ -313,19 +314,23 @@ class ARROW_DS_EXPORT FileWriter { const std::shared_ptr& format() const { return options_->format(); } const std::shared_ptr& schema() const { return schema_; } const std::shared_ptr& options() const { return options_; } + const fs::FileLocator& destination() const { return destination_locator_; } protected: FileWriter(std::shared_ptr schema, std::shared_ptr options, - std::shared_ptr destination) + std::shared_ptr destination, + fs::FileLocator destination_locator) : schema_(std::move(schema)), options_(std::move(options)), - destination_(destination) {} + destination_(std::move(destination)), + destination_locator_(std::move(destination_locator)) {} virtual Status FinishInternal() = 0; std::shared_ptr schema_; std::shared_ptr options_; std::shared_ptr destination_; + fs::FileLocator destination_locator_; }; /// \brief Options for writing a dataset. @@ -349,6 +354,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// {i} will be replaced by an auto incremented integer. std::string basename_template; + /// Callback to be invoked against all FileWriters before + /// they are finalized with FileWriter::Finish(). + std::function writer_pre_finish = [](FileWriter*) { + return Status::OK(); + }; + const std::shared_ptr& format() const { return file_write_options->format(); } diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index f6636285c92..a365f7eac2b 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -67,7 +67,8 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const override { + std::shared_ptr options, + fs::FileLocator destination_locator) const override { return Status::NotImplemented("writing fragment of CsvFileFormat"); } diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 2032f03d28f..40f5d3e8e0d 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -258,7 +258,8 @@ std::shared_ptr IpcFileFormat::DefaultWriteOptions() { Result> IpcFileFormat::MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const { + std::shared_ptr options, + fs::FileLocator destination_locator) const { if (!Equals(*options->format())) { return Status::TypeError("Mismatching format/write options."); } @@ -274,14 +275,16 @@ Result> IpcFileFormat::MakeWriter( return std::shared_ptr( new IpcFileWriter(std::move(destination), std::move(writer), std::move(schema), - std::move(ipc_options))); + std::move(ipc_options), std::move(destination_locator))); } IpcFileWriter::IpcFileWriter(std::shared_ptr destination, std::shared_ptr writer, std::shared_ptr schema, - std::shared_ptr options) - : FileWriter(std::move(schema), std::move(options), std::move(destination)), + std::shared_ptr options, + fs::FileLocator destination_locator) + : FileWriter(std::move(schema), std::move(options), std::move(destination), + std::move(destination_locator)), batch_writer_(std::move(writer)) {} Status IpcFileWriter::Write(const std::shared_ptr& batch) { diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index deff26c6f95..ef78515221c 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -67,7 +67,8 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const override; + std::shared_ptr options, + fs::FileLocator destination_locator) const override; std::shared_ptr DefaultWriteOptions() override; }; @@ -107,7 +108,8 @@ class ARROW_DS_EXPORT IpcFileWriter : public FileWriter { IpcFileWriter(std::shared_ptr destination, std::shared_ptr writer, std::shared_ptr schema, - std::shared_ptr options); + std::shared_ptr options, + fs::FileLocator destination_locator); Status FinishInternal() override; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 0ebbd0a5333..edf3e5443ac 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -68,7 +68,7 @@ class ParquetScanTask : public ScanTask { reader_(std::move(reader)), pre_buffer_once_(std::move(pre_buffer_once)), pre_buffer_row_groups_(std::move(pre_buffer_row_groups)), - io_context_(io_context), + io_context_(std::move(io_context)), cache_options_(cache_options) {} Result Execute() override { @@ -521,7 +521,8 @@ std::shared_ptr ParquetFileFormat::DefaultWriteOptions() { Result> ParquetFileFormat::MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const { + std::shared_ptr options, + fs::FileLocator destination_locator) const { if (!Equals(*options->format())) { return Status::TypeError("Mismatching format/write options"); } @@ -533,14 +534,17 @@ Result> ParquetFileFormat::MakeWriter( *schema, default_memory_pool(), destination, parquet_options->writer_properties, parquet_options->arrow_writer_properties, &parquet_writer)); - return std::shared_ptr(new ParquetFileWriter( - std::move(destination), std::move(parquet_writer), std::move(parquet_options))); + return std::shared_ptr( + new ParquetFileWriter(std::move(destination), std::move(parquet_writer), + std::move(parquet_options), std::move(destination_locator))); } ParquetFileWriter::ParquetFileWriter(std::shared_ptr destination, std::shared_ptr writer, - std::shared_ptr options) - : FileWriter(writer->schema(), std::move(options), std::move(destination)), + std::shared_ptr options, + fs::FileLocator destination_locator) + : FileWriter(writer->schema(), std::move(options), std::move(destination), + std::move(destination_locator)), parquet_writer_(std::move(writer)) {} Status ParquetFileWriter::Write(const std::shared_ptr& batch) { diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 347f4032046..da4fd58ebbe 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -128,7 +128,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const override; + std::shared_ptr options, + fs::FileLocator destination_locator) const override; std::shared_ptr DefaultWriteOptions() override; }; @@ -252,7 +253,8 @@ class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter { private: ParquetFileWriter(std::shared_ptr destination, std::shared_ptr writer, - std::shared_ptr options); + std::shared_ptr options, + fs::FileLocator destination_locator); Status FinishInternal() override; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 839b48a0e64..b80d1bb57f0 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -87,22 +87,23 @@ constexpr int kNumScanTasks = 2; constexpr int kBatchesPerScanTask = 2; constexpr int kRowsPerBatch = 1024; class MockFileFormat : public FileFormat { - virtual std::string type_name() const { return "mock"; } - virtual bool Equals(const FileFormat& other) const { return false; } - virtual Result IsSupported(const FileSource& source) const { return true; } - virtual Result> Inspect(const FileSource& source) const { + std::string type_name() const override { return "mock"; } + bool Equals(const FileFormat& other) const override { return false; } + Result IsSupported(const FileSource& source) const override { return true; } + Result> Inspect(const FileSource& source) const override { return Status::NotImplemented("Not needed for test"); } - virtual Result> MakeWriter( + Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const { + std::shared_ptr options, + fs::FileLocator destination_locator) const override { return Status::NotImplemented("Not needed for test"); } - virtual std::shared_ptr DefaultWriteOptions() { return nullptr; } + std::shared_ptr DefaultWriteOptions() override { return nullptr; } - virtual Result ScanFile( + Result ScanFile( const std::shared_ptr& options, - const std::shared_ptr& file) const { + const std::shared_ptr& file) const override { auto sch = schema({field("i32", int32())}); ScanTaskVector scan_tasks; for (int i = 0; i < kNumScanTasks; i++) { diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 39223eba35b..fddea355c4b 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -452,7 +452,7 @@ class FileFormatFixtureMixin : public ::testing::Test { EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); if (!options) options = format->DefaultWriteOptions(); - EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options)); + EXPECT_OK_AND_ASSIGN(auto writer, format->MakeWriter(sink, schema, options, {})); ARROW_EXPECT_OK(writer->Write(GetRecordBatchReader(schema).get())); ARROW_EXPECT_OK(writer->Finish()); EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); @@ -688,7 +688,8 @@ class DummyFileFormat : public FileFormat { Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const override { + std::shared_ptr options, + fs::FileLocator destination_locator) const override { return Status::NotImplemented("writing fragment of DummyFileFormat"); } @@ -736,7 +737,8 @@ class JSONRecordBatchFileFormat : public FileFormat { Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, - std::shared_ptr options) const override { + std::shared_ptr options, + fs::FileLocator destination_locator) const override { return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat"); } @@ -1023,8 +1025,12 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { void SetWriteOptions(std::shared_ptr file_write_options) { write_options_.file_write_options = file_write_options; write_options_.filesystem = fs_; - write_options_.base_dir = "new_root/"; + write_options_.base_dir = "/new_root/"; write_options_.basename_template = "dat_{i}"; + write_options_.writer_pre_finish = [this](FileWriter* writer) { + visited_paths_.push_back(writer->destination().path); + return Status::OK(); + }; } void DoWrite(std::shared_ptr desired_partitioning) { @@ -1176,11 +1182,17 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { for (const auto& file_contents : expected_files_) { expected_paths.insert(file_contents.first); } + + // expect the written filesystem to contain precisely the paths we expected for (auto path : checked_pointer_cast(written_)->files()) { actual_paths.insert(std::move(path)); } EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths)); + // Additionally, the writer producing each written file was visited and its path + // collected. That should match the expected paths as well + EXPECT_THAT(visited_paths_, testing::UnorderedElementsAreArray(expected_paths)); + ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments()); for (auto maybe_fragment : written_fragments_it) { ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment); @@ -1223,6 +1235,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { PathAndContent expected_files_; std::shared_ptr expected_physical_schema_; std::shared_ptr written_; + std::vector visited_paths_; FileSystemDatasetWriteOptions write_options_; std::shared_ptr scan_options_; };