From 53d787e480ac266a92a3afda466e94f9f1aacf1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Wed, 8 Dec 2021 10:50:54 +0100 Subject: [PATCH 1/2] Add support for RecordBatchReader in CSV writer --- cpp/src/arrow/csv/writer.cc | 12 ++++++++++++ cpp/src/arrow/csv/writer.h | 6 ++++++ cpp/src/arrow/csv/writer_test.cc | 6 ++++++ 3 files changed, 24 insertions(+) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index 7b3fcc76a10..b8470708c39 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -548,6 +548,18 @@ Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, return writer->Close(); } +Status WriteCSV(const std::shared_ptr& reader, + const WriteOptions& options, arrow::io::OutputStream* output) { + ASSIGN_OR_RAISE(auto writer, MakeCSVWriter(output, reader->schema(), options)); + std::shared_ptr batch; + while (true) { + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + if (batch == nullptr) break; + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + } + return writer->Close(); +} + ARROW_EXPORT Result> MakeCSVWriter( std::shared_ptr sink, const std::shared_ptr& schema, diff --git a/cpp/src/arrow/csv/writer.h b/cpp/src/arrow/csv/writer.h index 2f1442ae0af..b7670b75212 100644 --- a/cpp/src/arrow/csv/writer.h +++ b/cpp/src/arrow/csv/writer.h @@ -45,6 +45,12 @@ ARROW_EXPORT Status WriteCSV(const Table& table, const WriteOptions& options, /// Experimental ARROW_EXPORT Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, arrow::io::OutputStream* output); +/// \brief Converts batches read through a RecordBatchReader +/// to CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCSV(const std::shared_ptr& reader, + const WriteOptions& options, + arrow::io::OutputStream* output); /// \brief Create a new CSV writer. User is responsible for closing the /// actual OutputStream. diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc index e089e14cd54..8f5910a0871 100644 --- a/cpp/src/arrow/csv/writer_test.cc +++ b/cpp/src/arrow/csv/writer_test.cc @@ -259,6 +259,12 @@ TEST_P(TestWriteCSV, TestWrite) { ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*table, options)); EXPECT_EQ(csv, GetParam().expected_output); + // RecordBatchReader should work identically. + ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, + RecordBatchReader::Make({record_batch})); + ASSERT_OK_AND_ASSIGN(csv, ToCsvString(reader, options)); + EXPECT_EQ(csv, GetParam().expected_output); + // The writer should work identically. ASSERT_OK_AND_ASSIGN(csv, ToCsvStringUsingWriter(*table, options)); EXPECT_EQ(csv, GetParam().expected_output); From 3db5f574b1225a08c6e75b424b6fcfc02c08409f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Wed, 8 Dec 2021 14:50:34 +0100 Subject: [PATCH 2/2] Small refactor --- cpp/src/arrow/csv/writer.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc index b8470708c39..33a02d1258c 100644 --- a/cpp/src/arrow/csv/writer.cc +++ b/cpp/src/arrow/csv/writer.cc @@ -192,7 +192,7 @@ class UnquotedColumnPopulator : public ColumnPopulator { return VisitArrayDataInline( *casted_array_->data(), [&](arrow::util::string_view s) { - ARROW_RETURN_NOT_OK(CheckStringHasNoStructuralChars(s)); + RETURN_NOT_OK(CheckStringHasNoStructuralChars(s)); return valid_function(s); }, null_function); @@ -553,7 +553,7 @@ Status WriteCSV(const std::shared_ptr& reader, ASSIGN_OR_RAISE(auto writer, MakeCSVWriter(output, reader->schema(), options)); std::shared_ptr batch; while (true) { - ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + ASSIGN_OR_RAISE(batch, reader->Next()); if (batch == nullptr) break; RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); }