diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 1e47b15ee43..ef044ea3a03 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -18,6 +18,9 @@ #include "arrow/dataset/file_base.h" #include +#include +#include +#include #include #include "arrow/dataset/dataset_internal.h" @@ -31,6 +34,10 @@ #include "arrow/io/memory.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/map.h" +#include "arrow/util/mutex.h" +#include "arrow/util/string.h" #include "arrow/util/task_group.h" namespace arrow { @@ -143,91 +150,213 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( return MakeVectorIterator(std::move(fragments)); } -struct WriteTask { - Status Execute(); +Status FileWriter::Write(RecordBatchReader* batches) { + while (true) { + ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next()); + if (batch == nullptr) break; + RETURN_NOT_OK(Write(batch)); + } + return Status::OK(); +} - /// The basename of files written by this WriteTask. Extensions - /// are derived from format - std::string basename; +constexpr util::string_view kIntegerToken = "{i}"; - /// The partitioning with which paths will be generated - std::shared_ptr partitioning; +Status ValidateBasenameTemplate(util::string_view basename_template) { + if (basename_template.find(fs::internal::kSep) != util::string_view::npos) { + return Status::Invalid("basename_template contained '/'"); + } + size_t token_start = basename_template.find(kIntegerToken); + if (token_start == util::string_view::npos) { + return Status::Invalid("basename_template did not contain '", kIntegerToken, "'"); + } + return Status::OK(); +} - /// The format in which fragments will be written - std::shared_ptr format; +/// WriteQueue allows batches to be pushed from multiple threads while another thread +/// flushes some to disk. +class WriteQueue { + public: + WriteQueue(std::string partition_expression, size_t index, + std::shared_ptr schema) + : partition_expression_(std::move(partition_expression)), + index_(index), + schema_(std::move(schema)) {} + + // Push a batch into the writer's queue of pending writes. + void Push(std::shared_ptr batch) { + auto push_lock = push_mutex_.Lock(); + pending_.push_back(std::move(batch)); + } - /// The FileSystem and base directory into which fragments will be written - std::shared_ptr filesystem; - std::string base_dir; + // Flush all pending batches, or return immediately if another thread is already + // flushing this queue. + Status Flush(const FileSystemDatasetWriteOptions& write_options) { + if (auto writer_lock = writer_mutex_.TryLock()) { + if (writer_ == nullptr) { + // FileWriters are opened lazily to avoid blocking access to a scan-wide queue set + RETURN_NOT_OK(OpenWriter(write_options)); + } + + while (true) { + std::shared_ptr batch; + { + auto push_lock = push_mutex_.Lock(); + if (pending_.empty()) { + // Ensure the writer_lock is released before the push_lock. Otherwise another + // thread might successfully Push() a batch but then fail to Flush() it since + // the writer_lock is still held, leaving an unflushed batch in pending_. + writer_lock.Unlock(); + break; + } + batch = std::move(pending_.front()); + pending_.pop_front(); + } + RETURN_NOT_OK(writer_->Write(batch)); + } + } + return Status::OK(); + } - /// Batches to be written - std::shared_ptr batches; + const std::shared_ptr& writer() const { return writer_; } - /// An Expression already satisfied by every batch to be written - std::shared_ptr partition_expression; -}; + private: + Status OpenWriter(const FileSystemDatasetWriteOptions& write_options) { + auto dir = + fs::internal::EnsureTrailingSlash(write_options.base_dir) + partition_expression_; -Status WriteTask::Execute() { - std::unordered_map path_to_batches; - - // TODO(bkietz) these calls to Partition() should be scattered across a TaskGroup - for (auto maybe_batch : IteratorFromReader(batches)) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - ARROW_ASSIGN_OR_RAISE(auto partitioned_batches, partitioning->Partition(batch)); - for (auto&& partitioned_batch : partitioned_batches) { - AndExpression expr(std::move(partitioned_batch.partition_expression), - partition_expression); - ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr)); - path = fs::internal::EnsureLeadingSlash(path); - path_to_batches[path].push_back(std::move(partitioned_batch.batch)); + auto basename = internal::Replace(write_options.basename_template, kIntegerToken, + std::to_string(index_)); + if (!basename) { + return Status::Invalid("string interpolation of basename template failed"); } - } - for (auto&& path_batches : path_to_batches) { - auto dir = base_dir + path_batches.first; - RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true)); + auto path = fs::internal::ConcatAbstractPath(dir, *basename); - auto path = fs::internal::ConcatAbstractPath(dir, basename); - ARROW_ASSIGN_OR_RAISE(auto destination, filesystem->OpenOutputStream(path)); + RETURN_NOT_OK(write_options.filesystem->CreateDir(dir)); + ARROW_ASSIGN_OR_RAISE(auto destination, + write_options.filesystem->OpenOutputStream(path)); - DCHECK(!path_batches.second.empty()); - ARROW_ASSIGN_OR_RAISE(auto reader, - RecordBatchReader::Make(std::move(path_batches.second))); - RETURN_NOT_OK(format->WriteFragment(reader.get(), destination.get())); + ARROW_ASSIGN_OR_RAISE( + writer_, write_options.format()->MakeWriter(std::move(destination), schema_, + write_options.file_write_options)); + return Status::OK(); } - return Status::OK(); -} + util::Mutex writer_mutex_; + std::shared_ptr writer_; + + util::Mutex push_mutex_; + std::deque> pending_; -Status FileSystemDataset::Write(std::shared_ptr schema, - std::shared_ptr format, - std::shared_ptr filesystem, - std::string base_dir, - std::shared_ptr partitioning, - std::shared_ptr scan_context, - FragmentIterator fragment_it) { - auto task_group = scan_context->TaskGroup(); - - base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir)); - - int i = 0; - for (auto maybe_fragment : fragment_it) { - ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment); - auto task = std::make_shared(); - - task->basename = "dat_" + std::to_string(i++) + "." + format->type_name(); - task->partition_expression = fragment->partition_expression(); - task->format = format; - task->filesystem = filesystem; - task->base_dir = base_dir; - task->partitioning = partitioning; - - // make a record batch reader which yields from a fragment - ARROW_ASSIGN_OR_RAISE(task->batches, FragmentRecordBatchReader::Make( - std::move(fragment), schema, scan_context)); - task_group->Append([task] { return task->Execute(); }); + // The (formatted) partition expression to which this queue corresponds + std::string partition_expression_; + + size_t index_; + + std::shared_ptr schema_; +}; + +Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, + std::shared_ptr scanner) { + RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template)); + + auto task_group = scanner->context()->TaskGroup(); + + // Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent: + // + // - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing + // any fragments we have before waiting for discovery to complete. This isn't + // currently implemented for FileSystemDataset anyway: ARROW-8613 + // + // - ScanTask iteration. Keeping this lazy would save some unnecessary blocking when + // writing Fragments which produce scan tasks slowly. No Fragments do this. + // + // NB: neither of these will have any impact whatsoever on the common case of writing + // an in-memory table to disk. + ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, scanner->GetFragments().ToVector()); + ScanTaskVector scan_tasks; + std::vector fragment_for_task; + + // Avoid contention with multithreaded readers + auto context = std::make_shared(*scanner->context()); + context->use_threads = false; + + for (const auto& fragment : fragments) { + auto options = std::make_shared(*scanner->options()); + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, + Scanner(fragment, std::move(options), context).Scan()); + for (auto maybe_scan_task : scan_task_it) { + ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); + scan_tasks.push_back(std::move(scan_task)); + fragment_for_task.push_back(fragment.get()); + } } + // Store a mapping from partitions (represened by their formatted partition expressions) + // to a WriteQueue which flushes batches into that partition's output file. In principle + // any thread could produce a batch for any partition, so each task alternates between + // pushing batches and flushing them to disk. + util::Mutex queues_mutex; + std::unordered_map> queues; + + auto fragment_for_task_it = fragment_for_task.begin(); + for (const auto& scan_task : scan_tasks) { + const Fragment* fragment = *fragment_for_task_it++; + + task_group->Append([&, scan_task, fragment] { + ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute()); + + for (auto maybe_batch : batches) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); + batch.reset(); // drop to hopefully conserve memory + + std::unordered_set need_flushed; + for (size_t i = 0; i < groups.batches.size(); ++i) { + AndExpression partition_expression(std::move(groups.expressions[i]), + fragment->partition_expression()); + auto batch = std::move(groups.batches[i]); + + ARROW_ASSIGN_OR_RAISE(auto part, + write_options.partitioning->Format(partition_expression)); + + WriteQueue* queue; + { + // lookup the queue to which batch should be appended + auto queues_lock = queues_mutex.Lock(); + + queue = internal::GetOrInsertGenerated( + &queues, std::move(part), + [&](const std::string& emplaced_part) { + // lookup in `queues` also failed, + // generate a new WriteQueue + size_t queue_index = queues.size() - 1; + + return internal::make_unique( + emplaced_part, queue_index, batch->schema()); + }) + ->second.get(); + } + + queue->Push(std::move(batch)); + need_flushed.insert(queue); + } + + // flush all touched WriteQueues + for (auto queue : need_flushed) { + RETURN_NOT_OK(queue->Flush(write_options)); + } + } + + return Status::OK(); + }); + } + RETURN_NOT_OK(task_group->Finish()); + + task_group = scanner->context()->TaskGroup(); + for (const auto& part_queue : queues) { + task_group->Append([&] { return part_queue.second->writer()->Finish(); }); + } return task_group->Finish(); } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 957c876de72..ee0df11b32b 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -128,6 +128,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this IsSupported(const FileSource& source) const = 0; @@ -151,10 +153,11 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> MakeFragment( FileSource source, std::shared_ptr physical_schema = NULLPTR); - /// \brief Write a fragment. - /// FIXME(bkietz) make this pure virtual - virtual Status WriteFragment(RecordBatchReader* batches, - io::OutputStream* destination) const = 0; + virtual Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const = 0; + + virtual std::shared_ptr DefaultWriteOptions() = 0; }; /// \brief A Fragment that is stored in a file with a known format @@ -210,19 +213,8 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { std::vector> fragments); /// \brief Write a dataset. - /// - /// \param[in] schema Schema of written dataset. - /// \param[in] format FileFormat with which fragments will be written. - /// \param[in] filesystem FileSystem into which the dataset will be written. - /// \param[in] base_dir Root directory into which the dataset will be written. - /// \param[in] partitioning Partitioning used to generate fragment paths. - /// \param[in] scan_context Resource pool used to scan and write fragments. - /// \param[in] fragments Fragments to be written to disk. - static Status Write(std::shared_ptr schema, std::shared_ptr format, - std::shared_ptr filesystem, std::string base_dir, - std::shared_ptr partitioning, - std::shared_ptr scan_context, - FragmentIterator fragments); + static Status Write(const FileSystemDatasetWriteOptions& write_options, + std::shared_ptr scanner); /// \brief Return the type name of the dataset. std::string type_name() const override { return "filesystem"; } @@ -256,5 +248,64 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { std::vector> fragments_; }; +class ARROW_DS_EXPORT FileWriteOptions { + public: + virtual ~FileWriteOptions() = default; + + const std::shared_ptr& format() const { return format_; } + + std::string type_name() const { return format_->type_name(); } + + protected: + explicit FileWriteOptions(std::shared_ptr format) + : format_(std::move(format)) {} + + std::shared_ptr format_; +}; + +class ARROW_DS_EXPORT FileWriter { + public: + virtual ~FileWriter() = default; + + virtual Status Write(const std::shared_ptr& batch) = 0; + + Status Write(RecordBatchReader* batches); + + virtual Status Finish() = 0; + + const std::shared_ptr& format() const { return options_->format(); } + const std::shared_ptr& schema() const { return schema_; } + const std::shared_ptr& options() const { return options_; } + + protected: + FileWriter(std::shared_ptr schema, std::shared_ptr options) + : schema_(std::move(schema)), options_(std::move(options)) {} + + std::shared_ptr schema_; + std::shared_ptr options_; +}; + +struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { + /// Options for individual fragment writing. + std::shared_ptr file_write_options; + + /// FileSystem into which a dataset will be written. + std::shared_ptr filesystem; + + /// Root directory into which the dataset will be written. + std::string base_dir; + + /// Partitioning used to generate fragment paths. + std::shared_ptr partitioning; + + /// Template string used to generate fragment basenames. + /// {i} will be replaced by an auto incremented integer. + std::string basename_template; + + const std::shared_ptr& format() const { + return file_write_options->format(); + } +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 3b474b428d9..f889b12fddd 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -157,6 +157,22 @@ class CsvScanTask : public ScanTask { FileSource source_; }; +bool CsvFileFormat::Equals(const FileFormat& format) const { + if (type_name() != format.type_name()) return false; + + const auto& other_parse_options = + checked_cast(format).parse_options; + + return parse_options.delimiter == other_parse_options.delimiter && + parse_options.quoting == other_parse_options.quoting && + parse_options.quote_char == other_parse_options.quote_char && + parse_options.double_quote == other_parse_options.double_quote && + parse_options.escaping == other_parse_options.escaping && + parse_options.escape_char == other_parse_options.escape_char && + parse_options.newlines_in_values == other_parse_options.newlines_in_values && + parse_options.ignore_empty_lines == other_parse_options.ignore_empty_lines; +} + Result CsvFileFormat::IsSupported(const FileSource& source) const { RETURN_NOT_OK(source.Open().status()); return OpenReader(source, *this).ok(); diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 022517f4585..df5593459e3 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -37,6 +37,8 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::string type_name() const override { return "csv"; } + bool Equals(const FileFormat& other) const override; + Result IsSupported(const FileSource& source) const override; /// \brief Return the schema of the file if possible. @@ -47,9 +49,13 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::shared_ptr context, FileFragment* fragment) const override; - Status WriteFragment(RecordBatchReader*, io::OutputStream*) const override { + Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const override { return Status::NotImplemented("writing fragment of CsvFileFormat"); } + + std::shared_ptr DefaultWriteOptions() override { return NULLPTR; } }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index b849295bba5..e15de84b9bf 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -27,9 +27,13 @@ #include "arrow/dataset/scanner.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" namespace arrow { + +using internal::checked_pointer_cast; + namespace dataset { static inline ipc::IpcReadOptions default_read_options() { @@ -159,18 +163,44 @@ Result IpcFileFormat::ScanFile(std::shared_ptr op fragment->source()); } -Status IpcFileFormat::WriteFragment(RecordBatchReader* batches, - io::OutputStream* destination) const { - ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(destination, batches->schema())); +// +// IpcFileWriter, IpcFileWriteOptions +// - for (;;) { - ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next()); - if (batch == nullptr) break; - RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); +std::shared_ptr IpcFileFormat::DefaultWriteOptions() { + std::shared_ptr options( + new IpcFileWriteOptions(shared_from_this())); + + options->ipc_options = + std::make_shared(ipc::IpcWriteOptions::Defaults()); + return options; +} + +Result> IpcFileFormat::MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const { + if (!Equals(*options->format())) { + return Status::TypeError("Mismatching format/write options."); } - return writer->Close(); + auto ipc_options = checked_pointer_cast(options); + + ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(destination, schema)); + return std::shared_ptr( + new IpcFileWriter(std::move(writer), std::move(schema), std::move(ipc_options))); +} + +IpcFileWriter::IpcFileWriter(std::shared_ptr writer, + std::shared_ptr schema, + std::shared_ptr options) + : FileWriter(std::move(schema), std::move(options)), + batch_writer_(std::move(writer)) {} + +Status IpcFileWriter::Write(const std::shared_ptr& batch) { + return batch_writer_->WriteRecordBatch(*batch); } +Status IpcFileWriter::Finish() { return batch_writer_->Close(); } + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index be11c71a95a..50650bfedb0 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -28,6 +28,12 @@ #include "arrow/result.h" namespace arrow { +namespace ipc { + +class RecordBatchWriter; +struct IpcWriteOptions; + +} // namespace ipc namespace dataset { /// \brief A FileFormat implementation that reads from and writes to Ipc files @@ -35,6 +41,10 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { public: std::string type_name() const override { return "ipc"; } + bool Equals(const FileFormat& other) const override { + return type_name() == other.type_name(); + } + bool splittable() const override { return true; } Result IsSupported(const FileSource& source) const override; @@ -47,8 +57,37 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { std::shared_ptr context, FileFragment* fragment) const override; - Status WriteFragment(RecordBatchReader* batches, - io::OutputStream* destination) const override; + Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const override; + + std::shared_ptr DefaultWriteOptions() override; +}; + +class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions { + public: + std::shared_ptr ipc_options; + + protected: + using FileWriteOptions::FileWriteOptions; + + friend class IpcFileFormat; +}; + +class ARROW_DS_EXPORT IpcFileWriter : public FileWriter { + public: + Status Write(const std::shared_ptr& batch) override; + + Status Finish() override; + + private: + IpcFileWriter(std::shared_ptr writer, + std::shared_ptr schema, + std::shared_ptr options); + + std::shared_ptr batch_writer_; + + friend class IpcFileFormat; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 41ac5a9b7b0..808bae568b4 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -156,7 +156,10 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); - ASSERT_OK(format_->WriteFragment(reader.get(), sink.get())); + auto options = format_->DefaultWriteOptions(); + EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options)); + ASSERT_OK(writer->Write(reader.get())); + ASSERT_OK(writer->Finish()); EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); @@ -168,7 +171,9 @@ class TestIpcFileSystemDataset : public testing::Test, public: void SetUp() override { MakeSourceDataset(); - format_ = std::make_shared(); + auto ipc_format = std::make_shared(); + format_ = ipc_format; + SetWriteOptions(ipc_format->DefaultWriteOptions()); } }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 29ed18817d9..fe2f64ad526 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -42,6 +42,7 @@ namespace arrow { using internal::checked_cast; +using internal::checked_pointer_cast; namespace dataset { @@ -285,12 +286,20 @@ class ParquetScanTaskIterator { size_t idx_ = 0; }; -ParquetFileFormat::ParquetFileFormat() - : writer_properties(parquet::default_writer_properties()), - arrow_writer_properties(parquet::default_arrow_writer_properties()) {} +bool ParquetFileFormat::Equals(const FileFormat& other) const { + if (other.type_name() != type_name()) return false; -ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) - : ParquetFileFormat() { + const auto& other_reader_options = + checked_cast(other).reader_options; + + // FIXME implement comparison for decryption options + // FIXME extract these to scan time options so comparison is unnecessary + return reader_options.use_buffered_stream == other_reader_options.use_buffered_stream && + reader_options.buffer_size == other_reader_options.buffer_size && + reader_options.dict_columns == other_reader_options.dict_columns; +} + +ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) { reader_options.use_buffered_stream = reader_properties.is_buffered_stream_enabled(); reader_options.buffer_size = reader_properties.buffer_size(); reader_options.file_decryption_properties = @@ -352,28 +361,6 @@ static inline bool RowGroupInfosAreComplete(const std::vector& inf [](const RowGroupInfo& i) { return i.HasStatistics(); }); } -Status ParquetFileFormat::WriteFragment(RecordBatchReader* batches, - io::OutputStream* destination) const { - using parquet::arrow::FileWriter; - - std::shared_ptr shared_destination{destination, - [](io::OutputStream*) {}}; - - std::unique_ptr writer; - RETURN_NOT_OK(FileWriter::Open(*batches->schema(), default_memory_pool(), - shared_destination, writer_properties, - arrow_writer_properties, &writer)); - - for (;;) { - ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next()); - if (batch == nullptr) break; - ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch->schema(), {batch})); - RETURN_NOT_OK(writer->WriteTable(*table, batch->num_rows())); - } - - return writer->Close(); -} - Result ParquetFileFormat::ScanFile(std::shared_ptr options, std::shared_ptr context, FileFragment* fragment) const { @@ -440,9 +427,51 @@ Result> ParquetFileFormat::MakeFragment( std::move(physical_schema), {})); } -/// -/// RowGroupInfo -/// +// +// ParquetFileWriter, ParquetFileWriteOptions +// + +std::shared_ptr ParquetFileFormat::DefaultWriteOptions() { + std::shared_ptr options( + new ParquetFileWriteOptions(shared_from_this())); + options->writer_properties = parquet::default_writer_properties(); + options->arrow_writer_properties = parquet::default_arrow_writer_properties(); + return options; +} + +Result> ParquetFileFormat::MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const { + if (!Equals(*options->format())) { + return Status::TypeError("Mismatching format/write options"); + } + + auto parquet_options = checked_pointer_cast(options); + + std::unique_ptr parquet_writer; + RETURN_NOT_OK(parquet::arrow::FileWriter::Open( + *schema, default_memory_pool(), destination, parquet_options->writer_properties, + parquet_options->arrow_writer_properties, &parquet_writer)); + + return std::shared_ptr( + new ParquetFileWriter(std::move(parquet_writer), std::move(parquet_options))); +} + +ParquetFileWriter::ParquetFileWriter(std::shared_ptr writer, + std::shared_ptr options) + : FileWriter(writer->schema(), std::move(options)), + parquet_writer_(std::move(writer)) {} + +Status ParquetFileWriter::Write(const std::shared_ptr& batch) { + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch->schema(), {batch})); + return parquet_writer_->WriteTable(*table, batch->num_rows()); +} + +Status ParquetFileWriter::Finish() { return parquet_writer_->Close(); } + +// +// RowGroupInfo +// std::vector RowGroupInfo::FromIdentifiers(const std::vector ids) { std::vector results; @@ -497,9 +526,9 @@ bool RowGroupInfo::Satisfy(const Expression& predicate) const { return !HasStatistics() || predicate.IsSatisfiableWith(statistics_expression_); } -/// -/// ParquetFileFragment -/// +// +// ParquetFileFragment +// ParquetFileFragment::ParquetFileFragment(FileSource source, std::shared_ptr format, @@ -602,9 +631,9 @@ Result> ParquetFileFragment::FilterRowGroups( return row_groups; } -/// -/// ParquetDatasetFactory -/// +// +// ParquetDatasetFactory +// ParquetDatasetFactory::ParquetDatasetFactory( std::shared_ptr filesystem, std::shared_ptr format, diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 939fdc53687..25235128257 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -41,11 +41,13 @@ class FileEncryptionProperties; class ReaderProperties; class ArrowReaderProperties; + class WriterProperties; class ArrowWriterProperties; namespace arrow { class FileReader; +class FileWriter; } // namespace arrow } // namespace parquet @@ -57,7 +59,7 @@ class RowGroupInfo; /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: - ParquetFileFormat(); + ParquetFileFormat() = default; /// Convenience constructor which copies properties from a parquet::ReaderProperties. /// memory_pool will be ignored. @@ -67,6 +69,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { bool splittable() const override { return true; } + bool Equals(const FileFormat& other) const override; + struct ReaderOptions { /// \defgroup parquet-file-format-reader-properties properties which correspond to /// members of parquet::ReaderProperties. @@ -99,10 +103,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { bool enable_parallel_column_conversion = false; } reader_options; - std::shared_ptr writer_properties; - - std::shared_ptr arrow_writer_properties; - Result IsSupported(const FileSource& source) const override; /// \brief Return the schema of the file if possible. @@ -130,8 +130,11 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result> GetReader( const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const; - Status WriteFragment(RecordBatchReader* batches, - io::OutputStream* destination) const override; + Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const override; + + std::shared_ptr DefaultWriteOptions() override; }; /// \brief Represents a parquet's RowGroup with extra information. @@ -248,6 +251,37 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { friend class ParquetFileFormat; }; +class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions { + public: + std::shared_ptr writer_properties; + + std::shared_ptr arrow_writer_properties; + + protected: + using FileWriteOptions::FileWriteOptions; + + friend class ParquetFileFormat; +}; + +class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter { + public: + const std::shared_ptr& parquet_writer() const { + return parquet_writer_; + } + + Status Write(const std::shared_ptr& batch) override; + + Status Finish() override; + + private: + ParquetFileWriter(std::shared_ptr writer, + std::shared_ptr options); + + std::shared_ptr parquet_writer_; + + friend class ParquetFileFormat; +}; + struct ParquetFactoryOptions { // Either an explicit Partitioning or a PartitioningFactory to discover one. // diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index d0a22e98382..ec47259d30a 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -48,7 +48,6 @@ using parquet::default_writer_properties; using parquet::WriterProperties; using parquet::CreateOutputStream; -using parquet::arrow::FileWriter; using parquet::arrow::WriteTable; using testing::Pointee; @@ -57,7 +56,7 @@ using internal::checked_pointer_cast; class ArrowParquetWriterMixin : public ::testing::Test { public: - Status WriteRecordBatch(const RecordBatch& batch, FileWriter* writer) { + Status WriteRecordBatch(const RecordBatch& batch, parquet::arrow::FileWriter* writer) { auto schema = batch.schema(); auto size = batch.num_rows(); @@ -75,7 +74,8 @@ class ArrowParquetWriterMixin : public ::testing::Test { return Status::OK(); } - Status WriteRecordBatchReader(RecordBatchReader* reader, FileWriter* writer) { + Status WriteRecordBatchReader(RecordBatchReader* reader, + parquet::arrow::FileWriter* writer) { auto schema = reader->schema(); if (!schema->Equals(*writer->schema(), false)) { @@ -96,9 +96,9 @@ class ArrowParquetWriterMixin : public ::testing::Test { const std::shared_ptr& properties = default_writer_properties(), const std::shared_ptr& arrow_properties = default_arrow_writer_properties()) { - std::unique_ptr writer; - RETURN_NOT_OK(FileWriter::Open(*reader->schema(), pool, sink, properties, - arrow_properties, &writer)); + std::unique_ptr writer; + RETURN_NOT_OK(parquet::arrow::FileWriter::Open( + *reader->schema(), pool, sink, properties, arrow_properties, &writer)); RETURN_NOT_OK(WriteRecordBatchReader(reader, writer.get())); return writer->Close(); } @@ -554,7 +554,10 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); - ASSERT_OK(format_->WriteFragment(reader.get(), sink.get())); + auto options = format_->DefaultWriteOptions(); + EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options)); + ASSERT_OK(writer->Write(reader.get())); + ASSERT_OK(writer->Finish()); EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); @@ -572,16 +575,21 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) { EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); - format_->writer_properties = parquet::WriterProperties::Builder() + auto options = + checked_pointer_cast(format_->DefaultWriteOptions()); + options->writer_properties = parquet::WriterProperties::Builder() .created_by("TestParquetFileFormat") ->disable_statistics() ->build(); - format_->arrow_writer_properties = parquet::ArrowWriterProperties::Builder() + options->arrow_writer_properties = parquet::ArrowWriterProperties::Builder() .coerce_timestamps(coerce_timestamps_to) ->build(); - ASSERT_OK(format_->WriteFragment(reader.get(), sink.get())); + EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options)); + ASSERT_OK(writer->Write(reader.get())); + ASSERT_OK(writer->Finish()); + EXPECT_OK_AND_ASSIGN(auto written, sink->Finish()); EXPECT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(FileSource{written})); @@ -596,7 +604,9 @@ class TestParquetFileSystemDataset : public WriteFileSystemDatasetMixin, void SetUp() override { MakeSourceDataset(); check_metadata_ = false; - format_ = std::make_shared(); + auto parquet_format = std::make_shared(); + format_ = parquet_format; + SetWriteOptions(parquet_format->DefaultWriteOptions()); } }; diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index a47ba849cfc..159e0ac0331 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -69,9 +69,9 @@ std::shared_ptr Partitioning::Default() { " Partitioning"); } - Result> Partition( + Result Partition( const std::shared_ptr& batch) const override { - return std::vector{{batch, scalar(true)}}; + return PartitionedBatches{{batch}, {scalar(true)}}; } }; @@ -139,7 +139,7 @@ inline std::shared_ptr ConjunctionFromGroupingRow(Scalar* row) { return and_(std::move(equality_expressions)); } -Result> KeyValuePartitioning::Partition( +Result KeyValuePartitioning::Partition( const std::shared_ptr& batch) const { FieldVector by_fields; ArrayVector by_columns; @@ -158,7 +158,7 @@ Result> KeyValuePartitioning::Partit if (by_fields.empty()) { // no fields to group by; return the whole batch - return std::vector{{batch, scalar(true)}}; + return PartitionedBatches{{batch}, {scalar(true)}}; } ARROW_ASSIGN_OR_RAISE(auto by, @@ -168,13 +168,13 @@ Result> KeyValuePartitioning::Partit checked_pointer_cast(groupings_and_values->GetFieldByName("groupings")); auto unique_rows = groupings_and_values->GetFieldByName("values"); - ARROW_ASSIGN_OR_RAISE(auto grouped_batches, ApplyGroupings(*groupings, rest)); + PartitionedBatches out; + ARROW_ASSIGN_OR_RAISE(out.batches, ApplyGroupings(*groupings, rest)); + out.expressions.resize(out.batches.size()); - std::vector out(grouped_batches.size()); - for (size_t i = 0; i < out.size(); ++i) { + for (size_t i = 0; i < out.batches.size(); ++i) { ARROW_ASSIGN_OR_RAISE(auto row, unique_rows->GetScalar(i)); - out[i].partition_expression = ConjunctionFromGroupingRow(row.get()); - out[i].batch = std::move(grouped_batches[i]); + out.expressions[i] = ConjunctionFromGroupingRow(row.get()); } return out; } diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 6228999b41d..165fcfb5248 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -60,12 +60,12 @@ class ARROW_DS_EXPORT Partitioning { virtual std::string type_name() const = 0; /// \brief If the input batch shares any fields with this partitioning, - /// produce slices of the batch which satisfy mutually exclusive Expressions. - struct PartitionedBatch { - std::shared_ptr batch; - std::shared_ptr partition_expression; + /// produce sub-batches which satisfy mutually exclusive Expressions. + struct PartitionedBatches { + RecordBatchVector batches; + ExpressionVector expressions; }; - virtual Result> Partition( + virtual Result Partition( const std::shared_ptr& batch) const = 0; /// \brief Parse a path into a partition expression @@ -133,7 +133,7 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { static Status SetDefaultValuesFromKeys(const Expression& expr, RecordBatchProjector* projector); - Result> Partition( + Result Partition( const std::shared_ptr& batch) const override; Result> Parse(const std::string& path) const override; @@ -240,7 +240,7 @@ class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning { return Status::NotImplemented("formatting paths from ", type_name(), " Partitioning"); } - Result> Partition( + Result Partition( const std::shared_ptr& batch) const override { return Status::NotImplemented("partitioning batches from ", type_name(), " Partitioning"); diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index ef92a99a967..e9ea2539e89 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -435,7 +435,7 @@ class RangePartitioning : public Partitioning { } Result Format(const Expression&) const override { return ""; } - Result> Partition( + Result Partition( const std::shared_ptr&) const override { return Status::OK(); } diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index fcecf46552a..7251c39bffd 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -199,6 +199,11 @@ class DummyFileFormat : public FileFormat { std::string type_name() const override { return "dummy"; } + bool Equals(const FileFormat& other) const override { + return type_name() == other.type_name() && + schema_->Equals(checked_cast(other).schema_); + } + Result IsSupported(const FileSource& source) const override { return true; } Result> Inspect(const FileSource& source) const override { @@ -212,10 +217,14 @@ class DummyFileFormat : public FileFormat { return MakeEmptyIterator>(); } - Status WriteFragment(RecordBatchReader*, io::OutputStream*) const override { + Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const override { return Status::NotImplemented("writing fragment of DummyFileFormat"); } + std::shared_ptr DefaultWriteOptions() override { return nullptr; } + protected: std::shared_ptr schema_; }; @@ -230,6 +239,8 @@ class JSONRecordBatchFileFormat : public FileFormat { explicit JSONRecordBatchFileFormat(SchemaResolver resolver) : resolver_(std::move(resolver)) {} + bool Equals(const FileFormat& other) const override { return this == &other; } + std::string type_name() const override { return "json_record_batch"; } /// \brief Return true if the given file extension @@ -255,10 +266,14 @@ class JSONRecordBatchFileFormat : public FileFormat { std::move(context)); } - Status WriteFragment(RecordBatchReader*, io::OutputStream*) const override { + Result> MakeWriter( + std::shared_ptr destination, std::shared_ptr schema, + std::shared_ptr options) const override { return Status::NotImplemented("writing fragment of JSONRecordBatchFileFormat"); } + std::shared_ptr DefaultWriteOptions() override { return nullptr; } + protected: SchemaResolver resolver_; }; @@ -560,46 +575,54 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { ASSERT_OK_AND_ASSIGN(auto factory, FileSystemDatasetFactory::Make(fs_, s, source_format, options)); ASSERT_OK_AND_ASSIGN(dataset_, factory->Finish()); + + scan_options_ = ScanOptions::Make(source_schema_); } - void TestWriteWithIdenticalPartitioningSchema() { - auto desired_partitioning = std::make_shared( - SchemaFromColumnNames(source_schema_, {"year", "month"})); + void SetWriteOptions(std::shared_ptr file_write_options) { + write_options_.file_write_options = file_write_options; + write_options_.filesystem = fs_; + write_options_.base_dir = "new_root/"; + write_options_.basename_template = "dat_{i}"; + } - ASSERT_OK(FileSystemDataset::Write( - source_schema_, format_, fs_, "new_root/", desired_partitioning, - std::make_shared(), dataset_->GetFragments())); + void DoWrite(std::shared_ptr desired_partitioning) { + write_options_.partitioning = desired_partitioning; + auto scanner = std::make_shared(dataset_, scan_options_, scan_context_); + ASSERT_OK(FileSystemDataset::Write(write_options_, scanner)); + // re-discover the written dataset fs::FileSelector s; s.recursive = true; s.base_dir = "/new_root"; - FileSystemFactoryOptions options; - options.partitioning = desired_partitioning; - ASSERT_OK_AND_ASSIGN(auto factory, - FileSystemDatasetFactory::Make(fs_, s, format_, options)); + FileSystemFactoryOptions factory_options; + factory_options.partitioning = desired_partitioning; + ASSERT_OK_AND_ASSIGN( + auto factory, FileSystemDatasetFactory::Make(fs_, s, format_, factory_options)); ASSERT_OK_AND_ASSIGN(written_, factory->Finish()); + } + + void TestWriteWithIdenticalPartitioningSchema() { + DoWrite(std::make_shared( + SchemaFromColumnNames(source_schema_, {"year", "month"}))); - expected_files_["/new_root/2018/1/dat_0." + format_->type_name()] = R"([ + expected_files_["/new_root/2018/1/dat_0"] = R"([ {"region": "NY", "model": "3", "sales": 742.0, "country": "US"}, {"region": "NY", "model": "S", "sales": 304.125, "country": "US"}, - {"region": "NY", "model": "Y", "sales": 27.5, "country": "US"} - ])"; - expected_files_["/new_root/2018/1/dat_1." + format_->type_name()] = R"([ + {"region": "NY", "model": "Y", "sales": 27.5, "country": "US"}, {"region": "QC", "model": "3", "sales": 512, "country": "CA"}, {"region": "QC", "model": "S", "sales": 978, "country": "CA"}, {"region": "NY", "model": "X", "sales": 136.25, "country": "US"}, {"region": "QC", "model": "X", "sales": 1.0, "country": "CA"}, {"region": "QC", "model": "Y", "sales": 69, "country": "CA"} ])"; - expected_files_["/new_root/2019/1/dat_2." + format_->type_name()] = R"([ + expected_files_["/new_root/2019/1/dat_1"] = R"([ {"region": "CA", "model": "3", "sales": 273.5, "country": "US"}, {"region": "CA", "model": "S", "sales": 13, "country": "US"}, {"region": "CA", "model": "X", "sales": 54, "country": "US"}, {"region": "QC", "model": "S", "sales": 10, "country": "CA"}, - {"region": "CA", "model": "Y", "sales": 21, "country": "US"} - ])"; - expected_files_["/new_root/2019/1/dat_3." + format_->type_name()] = R"([ + {"region": "CA", "model": "Y", "sales": 21, "country": "US"}, {"region": "QC", "model": "3", "sales": 152.25, "country": "CA"}, {"region": "QC", "model": "X", "sales": 42, "country": "CA"}, {"region": "QC", "model": "Y", "sales": 37, "country": "CA"} @@ -611,52 +634,32 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { } void TestWriteWithUnrelatedPartitioningSchema() { - auto desired_partitioning = std::make_shared( - SchemaFromColumnNames(source_schema_, {"country", "region"})); - - ASSERT_OK(FileSystemDataset::Write( - source_schema_, format_, fs_, "new_root/", desired_partitioning, - std::make_shared(), dataset_->GetFragments())); - - fs::FileSelector s; - s.recursive = true; - s.base_dir = "/new_root"; - - FileSystemFactoryOptions options; - options.partitioning = desired_partitioning; - ASSERT_OK_AND_ASSIGN(auto factory, - FileSystemDatasetFactory::Make(fs_, s, format_, options)); - ASSERT_OK_AND_ASSIGN(written_, factory->Finish()); + DoWrite(std::make_shared( + SchemaFromColumnNames(source_schema_, {"country", "region"}))); // XXX first thing a user will be annoyed by: we don't support left // padding the month field with 0. - expected_files_["/new_root/US/NY/dat_0." + format_->type_name()] = R"([ + expected_files_["/new_root/US/NY/dat_0"] = R"([ {"year": 2018, "month": 1, "model": "3", "sales": 742.0}, {"year": 2018, "month": 1, "model": "S", "sales": 304.125}, - {"year": 2018, "month": 1, "model": "Y", "sales": 27.5} - ])"; - expected_files_["/new_root/US/NY/dat_1." + format_->type_name()] = R"([ + {"year": 2018, "month": 1, "model": "Y", "sales": 27.5}, {"year": 2018, "month": 1, "model": "X", "sales": 136.25} ])"; - expected_files_["/new_root/CA/QC/dat_1." + format_->type_name()] = R"([ + expected_files_["/new_root/CA/QC/dat_1"] = R"([ {"year": 2018, "month": 1, "model": "3", "sales": 512}, {"year": 2018, "month": 1, "model": "S", "sales": 978}, {"year": 2018, "month": 1, "model": "X", "sales": 1.0}, - {"year": 2018, "month": 1, "model": "Y", "sales": 69} + {"year": 2018, "month": 1, "model": "Y", "sales": 69}, + {"year": 2019, "month": 1, "model": "S", "sales": 10}, + {"year": 2019, "month": 1, "model": "3", "sales": 152.25}, + {"year": 2019, "month": 1, "model": "X", "sales": 42}, + {"year": 2019, "month": 1, "model": "Y", "sales": 37} ])"; - expected_files_["/new_root/US/CA/dat_2." + format_->type_name()] = R"([ + expected_files_["/new_root/US/CA/dat_2"] = R"([ {"year": 2019, "month": 1, "model": "3", "sales": 273.5}, {"year": 2019, "month": 1, "model": "S", "sales": 13}, {"year": 2019, "month": 1, "model": "X", "sales": 54}, {"year": 2019, "month": 1, "model": "Y", "sales": 21} - ])"; - expected_files_["/new_root/CA/QC/dat_2." + format_->type_name()] = R"([ - {"year": 2019, "month": 1, "model": "S", "sales": 10} - ])"; - expected_files_["/new_root/CA/QC/dat_3." + format_->type_name()] = R"([ - {"year": 2019, "month": 1, "model": "3", "sales": 152.25}, - {"year": 2019, "month": 1, "model": "X", "sales": 42}, - {"year": 2019, "month": 1, "model": "Y", "sales": 37} ])"; expected_physical_schema_ = SchemaFromColumnNames(source_schema_, {"model", "sales", "year", "month"}); @@ -665,49 +668,31 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { } void TestWriteWithSupersetPartitioningSchema() { - auto desired_partitioning = std::make_shared( - SchemaFromColumnNames(source_schema_, {"year", "month", "country", "region"})); - - ASSERT_OK(FileSystemDataset::Write( - source_schema_, format_, fs_, "new_root/", desired_partitioning, - std::make_shared(), dataset_->GetFragments())); - - fs::FileSelector s; - s.recursive = true; - s.base_dir = "/new_root"; - - FileSystemFactoryOptions options; - options.partitioning = desired_partitioning; - ASSERT_OK_AND_ASSIGN(auto factory, - FileSystemDatasetFactory::Make(fs_, s, format_, options)); - ASSERT_OK_AND_ASSIGN(written_, factory->Finish()); + DoWrite(std::make_shared( + SchemaFromColumnNames(source_schema_, {"year", "month", "country", "region"}))); // XXX first thing a user will be annoyed by: we don't support left // padding the month field with 0. - expected_files_["/new_root/2018/1/US/NY/dat_0." + format_->type_name()] = R"([ + expected_files_["/new_root/2018/1/US/NY/dat_0"] = R"([ {"model": "3", "sales": 742.0}, {"model": "S", "sales": 304.125}, - {"model": "Y", "sales": 27.5} - ])"; - expected_files_["/new_root/2018/1/US/NY/dat_1." + format_->type_name()] = R"([ + {"model": "Y", "sales": 27.5}, {"model": "X", "sales": 136.25} ])"; - expected_files_["/new_root/2018/1/CA/QC/dat_1." + format_->type_name()] = R"([ + expected_files_["/new_root/2018/1/CA/QC/dat_1"] = R"([ {"model": "3", "sales": 512}, {"model": "S", "sales": 978}, {"model": "X", "sales": 1.0}, {"model": "Y", "sales": 69} ])"; - expected_files_["/new_root/2019/1/US/CA/dat_2." + format_->type_name()] = R"([ + expected_files_["/new_root/2019/1/US/CA/dat_2"] = R"([ {"model": "3", "sales": 273.5}, {"model": "S", "sales": 13}, {"model": "X", "sales": 54}, {"model": "Y", "sales": 21} ])"; - expected_files_["/new_root/2019/1/CA/QC/dat_2." + format_->type_name()] = R"([ - {"model": "S", "sales": 10} - ])"; - expected_files_["/new_root/2019/1/CA/QC/dat_3." + format_->type_name()] = R"([ + expected_files_["/new_root/2019/1/CA/QC/dat_3"] = R"([ + {"model": "S", "sales": 10}, {"model": "3", "sales": 152.25}, {"model": "X", "sales": 42}, {"model": "Y", "sales": 37} @@ -718,43 +703,23 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { } void TestWriteWithEmptyPartitioningSchema() { - auto desired_partitioning = std::make_shared( - SchemaFromColumnNames(source_schema_, {})); - - ASSERT_OK(FileSystemDataset::Write( - source_schema_, format_, fs_, "new_root/", desired_partitioning, - std::make_shared(), dataset_->GetFragments())); + DoWrite(std::make_shared( + SchemaFromColumnNames(source_schema_, {}))); - fs::FileSelector s; - s.recursive = true; - s.base_dir = "/new_root"; - - FileSystemFactoryOptions options; - options.partitioning = desired_partitioning; - ASSERT_OK_AND_ASSIGN(auto factory, - FileSystemDatasetFactory::Make(fs_, s, format_, options)); - ASSERT_OK_AND_ASSIGN(written_, factory->Finish()); - - expected_files_["/new_root/dat_0." + format_->type_name()] = R"([ + expected_files_["/new_root/dat_0"] = R"([ {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "3", "sales": 742.0}, {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "S", "sales": 304.125}, - {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "Y", "sales": 27.5} - ])"; - expected_files_["/new_root/dat_1." + format_->type_name()] = R"([ + {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "Y", "sales": 27.5}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "3", "sales": 512}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "S", "sales": 978}, {"country": "US", "region": "NY", "year": 2018, "month": 1, "model": "X", "sales": 136.25}, {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "X", "sales": 1.0}, - {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "Y", "sales": 69} - ])"; - expected_files_["/new_root/dat_2." + format_->type_name()] = R"([ + {"country": "CA", "region": "QC", "year": 2018, "month": 1, "model": "Y", "sales": 69}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "3", "sales": 273.5}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "S", "sales": 13}, {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "X", "sales": 54}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "S", "sales": 10}, - {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "Y", "sales": 21} - ])"; - expected_files_["/new_root/dat_3." + format_->type_name()] = R"([ + {"country": "US", "region": "CA", "year": 2019, "month": 1, "model": "Y", "sales": 21}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "3", "sales": 152.25}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "X", "sales": 42}, {"country": "CA", "region": "QC", "year": 2019, "month": 1, "model": "Y", "sales": 37} @@ -765,12 +730,14 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { } void AssertWrittenAsExpected() { - std::vector files; + std::unordered_set expected_paths, actual_paths; for (const auto& file_contents : expected_files_) { - files.push_back(file_contents.first); + expected_paths.insert(file_contents.first); + } + for (auto path : checked_pointer_cast(written_)->files()) { + actual_paths.insert(std::move(path)); } - EXPECT_THAT(checked_pointer_cast(written_)->files(), - testing::UnorderedElementsAreArray(files)); + EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths)); for (auto maybe_fragment : written_->GetFragments()) { ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment); @@ -779,12 +746,13 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { AssertSchemaEqual(*expected_physical_schema_, *actual_physical_schema, check_metadata_); - AssertSchemaEqual(*expected_physical_schema_, *actual_physical_schema); - const auto& path = checked_pointer_cast(fragment)->source().path(); - auto expected_struct = ArrayFromJSON(struct_(expected_physical_schema_->fields()), - {expected_files_[path]}); + auto file_contents = expected_files_.find(path); + if (file_contents == expected_files_.end()) { + // file wasn't expected to be written at all; nothing to compare with + continue; + } ASSERT_OK_AND_ASSIGN(auto scanner, ScannerBuilder(actual_physical_schema, fragment, std::make_shared()) @@ -799,6 +767,9 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { ASSERT_OK_AND_ASSIGN(actual_struct, batch->ToStructArray()); } + auto expected_struct = ArrayFromJSON(struct_(expected_physical_schema_->fields()), + {file_contents->second}); + AssertArraysEqual(*expected_struct, *actual_struct, /*verbose=*/true); } } @@ -809,6 +780,9 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { PathAndContent expected_files_; std::shared_ptr expected_physical_schema_; std::shared_ptr written_; + FileSystemDatasetWriteOptions write_options_; + std::shared_ptr scan_options_; + std::shared_ptr scan_context_ = std::make_shared(); }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index b7604f6bddd..e329b072ed7 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -48,16 +48,23 @@ using FragmentVector = std::vector>; class FileSource; class FileFormat; class FileFragment; +class FileWriter; +class FileWriteOptions; class FileSystemDataset; +struct FileSystemDatasetWriteOptions; class InMemoryDataset; class CsvFileFormat; class IpcFileFormat; +class IpcFileWriter; +class IpcFileWriteOptions; class ParquetFileFormat; class ParquetFileFragment; +class ParquetFileWriter; +class ParquetFileWriteOptions; class Expression; using ExpressionVector = std::vector>; diff --git a/cpp/src/arrow/util/map.h b/cpp/src/arrow/util/map.h new file mode 100644 index 00000000000..5523909061d --- /dev/null +++ b/cpp/src/arrow/util/map.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/result.h" + +namespace arrow { +namespace internal { + +/// Helper providing single-lookup conditional insertion into std::map or +/// std::unordered_map. If `key` exists in the container, an iterator to that pair +/// will be returned. If `key` does not exist in the container, `gen(key)` will be +/// invoked and its return value inserted. +template +auto GetOrInsertGenerated(Map* map, typename Map::key_type key, Gen&& gen) + -> decltype(map->begin()->second = gen(map->begin()->first), map->begin()) { + decltype(gen(map->begin()->first)) placeholder{}; + + auto it_success = map->emplace(std::move(key), std::move(placeholder)); + if (it_success.second) { + // insertion of placeholder succeeded, overwrite it with gen() + const auto& inserted_key = it_success.first->first; + auto* value = &it_success.first->second; + *value = gen(inserted_key); + } + return it_success.first; +} + +template +auto GetOrInsertGenerated(Map* map, typename Map::key_type key, Gen&& gen) + -> Resultbegin()->second = gen(map->begin()->first).ValueOrDie(), + map->begin())> { + decltype(gen(map->begin()->first).ValueOrDie()) placeholder{}; + + auto it_success = map->emplace(std::move(key), std::move(placeholder)); + if (it_success.second) { + // insertion of placeholder succeeded, overwrite it with gen() + const auto& inserted_key = it_success.first->first; + auto* value = &it_success.first->second; + ARROW_ASSIGN_OR_RAISE(*value, gen(inserted_key)); + } + return it_success.first; +} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc index fa900fd164b..7456d7889d8 100644 --- a/cpp/src/arrow/util/mutex.cc +++ b/cpp/src/arrow/util/mutex.cc @@ -19,6 +19,8 @@ #include +#include "arrow/util/logging.h" + namespace arrow { namespace util { @@ -27,9 +29,13 @@ struct Mutex::Impl { }; Mutex::Guard::Guard(Mutex* locked) - : locked_(locked, [](Mutex* locked) { locked->impl_->mutex_.unlock(); }) {} + : locked_(locked, [](Mutex* locked) { + DCHECK(!locked->impl_->mutex_.try_lock()); + locked->impl_->mutex_.unlock(); + }) {} Mutex::Guard Mutex::TryLock() { + DCHECK_NE(impl_, nullptr); if (impl_->mutex_.try_lock()) { return Guard{this}; } @@ -37,6 +43,7 @@ Mutex::Guard Mutex::TryLock() { } Mutex::Guard Mutex::Lock() { + DCHECK_NE(impl_, nullptr); impl_->mutex_.lock(); return Guard{this}; } diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h index a0365b710fb..f4fc64181fb 100644 --- a/cpp/src/arrow/util/mutex.h +++ b/cpp/src/arrow/util/mutex.h @@ -31,9 +31,11 @@ namespace util { class ARROW_EXPORT Mutex { public: Mutex(); + Mutex(Mutex&&) = default; + Mutex& operator=(Mutex&&) = default; /// A Guard is falsy if a lock could not be acquired. - class Guard { + class ARROW_EXPORT Guard { public: Guard() : locked_(NULLPTR, [](Mutex* mutex) {}) {} Guard(Guard&&) = default; @@ -41,6 +43,8 @@ class ARROW_EXPORT Mutex { explicit operator bool() const { return bool(locked_); } + void Unlock() { locked_.reset(); } + private: explicit Guard(Mutex* locked); diff --git a/cpp/src/arrow/util/string.cc b/cpp/src/arrow/util/string.cc index b3ed2473724..625086c39b2 100644 --- a/cpp/src/arrow/util/string.cc +++ b/cpp/src/arrow/util/string.cc @@ -144,5 +144,15 @@ std::string AsciiToLower(util::string_view value) { return result; } +util::optional Replace(util::string_view s, util::string_view token, + util::string_view replacement) { + size_t token_start = s.find(token); + if (token_start == std::string::npos) { + return util::nullopt; + } + return s.substr(0, token_start).to_string() + replacement.to_string() + + s.substr(token_start + token.size()).to_string(); +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/string.h b/cpp/src/arrow/util/string.h index ea445c71ef4..56a02dfb20b 100644 --- a/cpp/src/arrow/util/string.h +++ b/cpp/src/arrow/util/string.h @@ -20,6 +20,7 @@ #include #include +#include "arrow/util/optional.h" #include "arrow/util/string_view.h" #include "arrow/util/visibility.h" @@ -56,5 +57,11 @@ bool AsciiEqualsCaseInsensitive(util::string_view left, util::string_view right) ARROW_EXPORT std::string AsciiToLower(util::string_view value); +/// \brief Search for the first instance of a token and replace it or return nullopt if +/// the token is not found. +ARROW_EXPORT +util::optional Replace(util::string_view s, util::string_view token, + util::string_view replacement); + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/string_test.cc b/cpp/src/arrow/util/string_test.cc index c56525732b0..d9b3749fdbd 100644 --- a/cpp/src/arrow/util/string_test.cc +++ b/cpp/src/arrow/util/string_test.cc @@ -88,5 +88,19 @@ TEST(ParseHexValue, Invalid) { ASSERT_RAISES(Invalid, ParseHexValue(input.c_str(), &output)); } +TEST(Replace, Basics) { + auto s = Replace("dat_{i}.txt", "{i}", "23"); + EXPECT_TRUE(s); + EXPECT_EQ(*s, "dat_23.txt"); + + // only replace the first occurrence of token + s = Replace("dat_{i}_{i}.txt", "{i}", "23"); + EXPECT_TRUE(s); + EXPECT_EQ(*s, "dat_23_{i}.txt"); + + s = Replace("dat_.txt", "{nope}", "23"); + EXPECT_FALSE(s); +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool_test.cc b/cpp/src/arrow/util/thread_pool_test.cc index ac0eae78165..a54880b21b1 100644 --- a/cpp/src/arrow/util/thread_pool_test.cc +++ b/cpp/src/arrow/util/thread_pool_test.cc @@ -126,7 +126,7 @@ class AddTester { class TestThreadPool : public ::testing::Test { public: - void TearDown() { + void TearDown() override { fflush(stdout); fflush(stderr); } diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index a8198b66d70..0aec214831b 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -606,6 +606,44 @@ cdef shared_ptr[CExpression] _insert_implicit_casts(Expression filter, ) +cdef class FileWriteOptions(_Weakrefable): + + cdef: + shared_ptr[CFileWriteOptions] wrapped + CFileWriteOptions* options + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef void init(self, const shared_ptr[CFileWriteOptions]& sp): + self.wrapped = sp + self.options = sp.get() + + @staticmethod + cdef wrap(const shared_ptr[CFileWriteOptions]& sp): + type_name = frombytes(sp.get().type_name()) + + classes = { + 'ipc': IpcFileWriteOptions, + 'parquet': ParquetFileWriteOptions, + } + + class_ = classes.get(type_name, None) + if class_ is None: + raise TypeError(type_name) + + cdef FileWriteOptions self = class_.__new__(class_) + self.init(sp) + return self + + @property + def format(self): + return FileFormat.wrap(self.options.format()) + + cdef inline shared_ptr[CFileWriteOptions] unwrap(self): + return self.wrapped + + cdef class FileFormat(_Weakrefable): cdef: @@ -662,6 +700,13 @@ cdef class FileFormat(_Weakrefable): nullptr)) return Fragment.wrap(move(c_fragment)) + def make_write_options(self): + return FileWriteOptions.wrap(self.format.DefaultWriteOptions()) + + @property + def default_extname(self): + return frombytes(self.format.type_name()) + def __eq__(self, other): try: return self.equals(other) @@ -1054,18 +1099,85 @@ cdef class ParquetReadOptions(_Weakrefable): return False +cdef class ParquetFileWriteOptions(FileWriteOptions): + + cdef: + CParquetFileWriteOptions* parquet_options + object _properties + + def update(self, **kwargs): + cdef CParquetFileWriteOptions* opts = self.parquet_options + + arrow_fields = { + "use_deprecated_int96_timestamps", + "coerce_timestamps", + "allow_truncated_timestamps", + } + + update = False + update_arrow = False + for name, value in kwargs.items(): + if name not in self._properties: + raise TypeError("unexpected parquet write option: " + name) + self._properties[name] = value + if name in arrow_fields: + update_arrow = True + else: + update = True + + if update: + opts.writer_properties = _create_writer_properties( + use_dictionary=self._properties["use_dictionary"], + compression=self._properties["compression"], + version=self._properties["version"], + write_statistics=self._properties["write_statistics"], + data_page_size=self._properties["data_page_size"], + compression_level=self._properties["compression_level"], + use_byte_stream_split=( + self._properties["use_byte_stream_split"] + ), + data_page_version=self._properties["data_page_version"], + ) + + if update_arrow: + opts.arrow_writer_properties = _create_arrow_writer_properties( + use_deprecated_int96_timestamps=( + self._properties["use_deprecated_int96_timestamps"] + ), + coerce_timestamps=self._properties["coerce_timestamps"], + allow_truncated_timestamps=( + self._properties["allow_truncated_timestamps"] + ), + writer_engine_version='V2' + ) + + cdef void init(self, const shared_ptr[CFileWriteOptions]& sp): + FileWriteOptions.init(self, sp) + self.parquet_options = sp.get() + self._properties = dict( + use_dictionary=True, + compression="snappy", + version="1.0", + write_statistics=None, + data_page_size=None, + compression_level=None, + use_byte_stream_split=False, + data_page_version="1.0", + use_deprecated_int96_timestamps=False, + coerce_timestamps=None, + allow_truncated_timestamps=False, + ) + + cdef class ParquetFileFormat(FileFormat): cdef: CParquetFileFormat* parquet_format - object _write_options - def __init__(self, read_options=None, dict write_options=None): + def __init__(self, read_options=None): cdef: shared_ptr[CParquetFileFormat] wrapped CParquetFileFormatReaderOptions* options - shared_ptr[ArrowWriterProperties] arrow_properties - shared_ptr[WriterProperties] properties # Read options @@ -1087,33 +1199,6 @@ cdef class ParquetFileFormat(FileFormat): for column in read_options.dictionary_columns: options.dict_columns.insert(tobytes(column)) - # Write options - - self._write_options = {} - if write_options is not None: - self._write_options = write_options - properties = _create_writer_properties( - use_dictionary=write_options.get("use_dictionary", True), - compression=write_options.get("compression", "snappy"), - version=write_options.get("version", "1.0"), - write_statistics=write_options.get("write_statistics", None), - data_page_size=write_options.get("data_page_size", None), - compression_level=write_options.get("compression_level", None), - use_byte_stream_split=write_options.get( - "use_byte_stream_split", False), - data_page_version=write_options.get("data_page_version", "1.0") - ) - arrow_properties = _create_arrow_writer_properties( - use_deprecated_int96_timestamps=write_options.get( - "use_deprecated_int96_timestamps", False), - coerce_timestamps=write_options.get("coerce_timestamps", None), - allow_truncated_timestamps=write_options.get( - "allow_truncated_timestamps", False), - writer_engine_version="V2" - ) - wrapped.get().writer_properties = properties - wrapped.get().arrow_writer_properties = arrow_properties - self.init( wrapped) cdef void init(self, const shared_ptr[CFileFormat]& sp): @@ -1124,27 +1209,28 @@ cdef class ParquetFileFormat(FileFormat): def read_options(self): cdef CParquetFileFormatReaderOptions* options options = &self.parquet_format.reader_options - enable = options.enable_parallel_column_conversion return ParquetReadOptions( use_buffered_stream=options.use_buffered_stream, buffer_size=options.buffer_size, dictionary_columns={frombytes(col) for col in options.dict_columns}, - enable_parallel_column_conversion=enable + enable_parallel_column_conversion=( + options.enable_parallel_column_conversion + ) ) - @property - def write_options(self): - return self._write_options + def make_write_options(self, **kwargs): + opts = FileFormat.make_write_options(self) + ( opts).update(**kwargs) + return opts def equals(self, ParquetFileFormat other): return ( - self.read_options.equals(other.read_options) and - self.write_options == other.write_options + self.read_options.equals(other.read_options) ) def __reduce__(self): - return ParquetFileFormat, (self.read_options, self.write_options) + return ParquetFileFormat, (self.read_options, ) def make_fragment(self, file, filesystem=None, Expression partition_expression=None, row_groups=None): @@ -1170,6 +1256,12 @@ cdef class ParquetFileFormat(FileFormat): return Fragment.wrap(move(c_fragment)) +cdef class IpcFileWriteOptions(FileWriteOptions): + + def __init__(self): + _forbid_instantiation(self.__class__) + + cdef class IpcFileFormat(FileFormat): def __init__(self): @@ -1178,6 +1270,10 @@ cdef class IpcFileFormat(FileFormat): def equals(self, IpcFileFormat other): return True + @property + def default_extname(self): + return "feather" + def __reduce__(self): return IpcFileFormat, tuple() @@ -1195,6 +1291,9 @@ cdef class CsvFileFormat(FileFormat): FileFormat.init(self, sp) self.csv_format = sp.get() + def make_write_options(self): + raise NotImplemented("writing CSV datasets") + @property def parse_options(self): return ParseOptions.wrap(self.csv_format.parse_options) @@ -2122,47 +2221,29 @@ def _get_partition_keys(Expression partition_expression): def _filesystemdataset_write( - data, object base_dir, Schema schema not None, - FileFormat format not None, FileSystem filesystem not None, - Partitioning partitioning not None, bint use_threads=True, + data not None, object base_dir not None, str basename_template not None, + Schema schema not None, FileSystem filesystem not None, + Partitioning partitioning not None, + FileWriteOptions file_options not None, bint use_threads, ): """ CFileSystemDataset.Write wrapper """ cdef: - c_string c_base_dir - shared_ptr[CSchema] c_schema - shared_ptr[CFileFormat] c_format - shared_ptr[CFileSystem] c_filesystem - shared_ptr[CPartitioning] c_partitioning - shared_ptr[CScanContext] c_context - # to create iterator of InMemory fragments + CFileSystemDatasetWriteOptions c_options + shared_ptr[CScanner] c_scanner vector[shared_ptr[CRecordBatch]] c_batches - shared_ptr[CFragment] c_fragment - vector[shared_ptr[CFragment]] c_fragment_vector - c_base_dir = tobytes(_stringify_path(base_dir)) - c_schema = pyarrow_unwrap_schema(schema) - c_format = format.unwrap() - c_filesystem = filesystem.unwrap() - c_partitioning = partitioning.unwrap() - c_context = _build_scan_context(use_threads=use_threads) + c_options.file_write_options = file_options.unwrap() + c_options.filesystem = filesystem.unwrap() + c_options.base_dir = tobytes(_stringify_path(base_dir)) + c_options.partitioning = partitioning.unwrap() + c_options.basename_template = tobytes(basename_template) if isinstance(data, Dataset): - with nogil: - check_status( - CFileSystemDataset.Write( - c_schema, - c_format, - c_filesystem, - c_base_dir, - c_partitioning, - c_context, - ( data).dataset.GetFragments() - ) - ) + scanner = data._scanner(use_threads=use_threads) else: - # data is list of batches/tables, one element per fragment + # data is list of batches/tables for table in data: if isinstance(table, Table): for batch in table.to_batches(): @@ -2170,20 +2251,11 @@ def _filesystemdataset_write( else: c_batches.push_back(( table).sp_batch) - c_fragment = shared_ptr[CFragment]( - new CInMemoryFragment(c_batches, _true.unwrap())) - c_batches.clear() - c_fragment_vector.push_back(c_fragment) + data = Fragment.wrap(shared_ptr[CFragment]( + new CInMemoryFragment(move(c_batches), _true.unwrap()))) - with nogil: - check_status( - CFileSystemDataset.Write( - c_schema, - c_format, - c_filesystem, - c_base_dir, - c_partitioning, - c_context, - MakeVectorIterator(move(c_fragment_vector)) - ) - ) + scanner = Scanner.from_fragment(data, schema, use_threads=use_threads) + + c_scanner = ( scanner).unwrap() + with nogil: + check_status(CFileSystemDataset.Write(c_options, c_scanner)) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 19f3c549d56..f278e05c007 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -32,13 +32,16 @@ FileSystemDataset, FileSystemDatasetFactory, FileSystemFactoryOptions, + FileWriteOptions, Fragment, HivePartitioning, IpcFileFormat, + IpcFileWriteOptions, ParquetDatasetFactory, ParquetFactoryOptions, ParquetFileFormat, ParquetFileFragment, + ParquetFileWriteOptions, ParquetReadOptions, Partitioning, PartitioningFactory, @@ -694,8 +697,9 @@ def _ensure_write_partitioning(scheme): return scheme -def write_dataset(data, base_dir, format=None, partitioning=None, schema=None, - filesystem=None, use_threads=True): +def write_dataset(data, base_dir, basename_template=None, format=None, + partitioning=None, schema=None, + filesystem=None, file_options=None, use_threads=True): """ Write a dataset to a given format and partitioning. @@ -703,32 +707,34 @@ def write_dataset(data, base_dir, format=None, partitioning=None, schema=None, ---------- data : Dataset, Table/RecordBatch, or list of Table/RecordBatch The data to write. This can be a Dataset instance or - in-memory Arrow data. A Table or RecordBatch is written as a - single fragment (resulting in a single file, or multiple files if - split according to the `partitioning`). If you have a Table consisting - of multiple record batches, you can pass ``table.to_batches()`` to - handle each record batch as a separate fragment. + in-memory Arrow data. base_dir : str The root directory where to write the dataset. + basename_template : str, optional + A template string used to generate basenames of written data files. + The token '{i}' will be replaced with an automatically incremented + integer. If not specified, it defaults to + "part-{i}." + format.default_extname format : FileFormat or str The format in which to write the dataset. Currently supported: - "ipc"/"feather". If a FileSystemDataset is being written and `format` - is not specified, it defaults to the same format as the specified - FileSystemDataset. When writing a Table or RecordBatch, this keyword - is required. + "parquet", "ipc"/"feather". If a FileSystemDataset is being written + and `format` is not specified, it defaults to the same format as the + specified FileSystemDataset. When writing a Table or RecordBatch, this + keyword is required. partitioning : Partitioning, optional The partitioning scheme specified with the ``partitioning()`` function. schema : Schema, optional filesystem : FileSystem, optional + file_options : FileWriteOptions, optional + FileFormat specific write options, created using the + ``FileFormat.make_write_options()`` function. use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be used determined by the number of available CPU cores. """ if isinstance(data, Dataset): schema = schema or data.schema - if isinstance(data, FileSystemDataset): - format = format or data.format elif isinstance(data, (pa.Table, pa.RecordBatch)): schema = schema or data.schema data = [data] @@ -740,7 +746,22 @@ def write_dataset(data, base_dir, format=None, partitioning=None, schema=None, "objects are supported." ) - format = _ensure_format(format) + if format is None and isinstance(data, FileSystemDataset): + format = data.format + else: + format = _ensure_format(format) + + if file_options is None: + file_options = format.make_write_options() + + if format != file_options.format: + raise TypeError("Supplied FileWriteOptions have format {}, " + "which doesn't match supplied FileFormat {}".format( + format, file_options)) + + if basename_template is None: + basename_template = "part-{i}." + format.default_extname + partitioning = _ensure_write_partitioning(partitioning) if filesystem is None: @@ -750,5 +771,6 @@ def write_dataset(data, base_dir, format=None, partitioning=None, schema=None, filesystem, _ = _ensure_fs(filesystem) _filesystemdataset_write( - data, base_dir, schema, format, filesystem, partitioning, use_threads, + data, base_dir, basename_template, schema, + filesystem, partitioning, file_options, use_threads, ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index a1f7cd4aa4b..8bf8b78b07b 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -211,6 +211,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: # the generated C++ is compiled). CFileSource(...) + cdef cppclass CFileWriteOptions \ + "arrow::dataset::FileWriteOptions": + const shared_ptr[CFileFormat]& format() const + c_string type_name() const + cdef cppclass CFileFormat "arrow::dataset::FileFormat": c_string type_name() const CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const @@ -218,6 +223,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileSource source, shared_ptr[CExpression] partition_expression, shared_ptr[CSchema] physical_schema) + shared_ptr[CFileWriteOptions] DefaultWriteOptions() cdef cppclass CFileFragment "arrow::dataset::FileFragment"( CFragment): @@ -237,6 +243,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod vector[CRowGroupInfo] FromIdentifiers(vector[int]) + cdef cppclass CParquetFileWriteOptions \ + "arrow::dataset::ParquetFileWriteOptions"(CFileWriteOptions): + shared_ptr[WriterProperties] writer_properties + shared_ptr[ArrowWriterProperties] arrow_writer_properties + cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"( CFileFragment): const vector[CRowGroupInfo]& row_groups() const @@ -244,6 +255,14 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CExpression] predicate) CStatus EnsureCompleteMetadata() + cdef cppclass CFileSystemDatasetWriteOptions \ + "arrow::dataset::FileSystemDatasetWriteOptions": + shared_ptr[CFileWriteOptions] file_write_options + shared_ptr[CFileSystem] filesystem + c_string base_dir + shared_ptr[CPartitioning] partitioning + c_string basename_template + cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): @staticmethod @@ -256,13 +275,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod CStatus Write( - shared_ptr[CSchema] schema, - shared_ptr[CFileFormat] format, - shared_ptr[CFileSystem] filesystem, - c_string base_dir, - shared_ptr[CPartitioning] partitioning, - shared_ptr[CScanContext] scan_context, - CFragmentIterator fragments) + const CFileSystemDatasetWriteOptions& write_options, + shared_ptr[CScanner] scanner) c_string type() vector[c_string] files() @@ -279,14 +293,16 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( CFileFormat): CParquetFileFormatReaderOptions reader_options - shared_ptr[WriterProperties] writer_properties - shared_ptr[ArrowWriterProperties] arrow_writer_properties CResult[shared_ptr[CFileFragment]] MakeFragment( CFileSource source, shared_ptr[CExpression] partition_expression, vector[CRowGroupInfo] row_groups, shared_ptr[CSchema] physical_schema) + cdef cppclass CIpcFileWriteOptions \ + "arrow::dataset::IpcFileWriteOptions"(CFileWriteOptions): + pass + cdef cppclass CIpcFileFormat "arrow::dataset::IpcFileFormat"( CFileFormat): pass diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 13549746755..8c24c66ea99 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -510,9 +510,6 @@ def test_file_format_pickling(): read_options={ 'use_buffered_stream': True, 'buffer_size': 4096, - }, - write_options={ - 'version': '2.0' } ) ] @@ -2154,9 +2151,8 @@ def _check_dataset_roundtrip(dataset, base_dir, expected_files, base_dir_path=None, partitioning=None): base_dir_path = base_dir_path or base_dir - ds.write_dataset( - dataset, base_dir, format="feather", partitioning=partitioning - ) + ds.write_dataset(dataset, base_dir, format="feather", + partitioning=partitioning, use_threads=False) # check that all files are present file_paths = list(base_dir_path.rglob("*")) @@ -2178,18 +2174,18 @@ def test_write_dataset(tempdir): # full string path target = tempdir / 'single-file-target' - expected_files = [target / "dat_0.ipc"] + expected_files = [target / "part-0.feather"] _check_dataset_roundtrip(dataset, str(target), expected_files, target) # pathlib path object target = tempdir / 'single-file-target2' - expected_files = [target / "dat_0.ipc"] + expected_files = [target / "part-0.feather"] _check_dataset_roundtrip(dataset, target, expected_files, target) # TODO # # relative path # target = tempdir / 'single-file-target3' - # expected_files = [target / "dat_0.ipc"] + # expected_files = [target / "part-0.ipc"] # _check_dataset_roundtrip( # dataset, './single-file-target3', expected_files, target) @@ -2200,7 +2196,7 @@ def test_write_dataset(tempdir): dataset = ds.dataset(directory) target = tempdir / 'single-directory-target' - expected_files = [target / "dat_0.ipc", target / "dat_1.ipc"] + expected_files = [target / "part-0.feather"] _check_dataset_roundtrip(dataset, str(target), expected_files, target) @@ -2215,8 +2211,8 @@ def test_write_dataset_partitioned(tempdir): # hive partitioning target = tempdir / 'partitioned-hive-target' expected_paths = [ - target / "part=a", target / "part=a" / "dat_0.ipc", - target / "part=b", target / "part=b" / "dat_1.ipc" + target / "part=a", target / "part=a" / "part-0.feather", + target / "part=b", target / "part=b" / "part-1.feather" ] partitioning_schema = ds.partitioning( pa.schema([("part", pa.string())]), flavor="hive") @@ -2227,8 +2223,8 @@ def test_write_dataset_partitioned(tempdir): # directory partitioning target = tempdir / 'partitioned-dir-target' expected_paths = [ - target / "a", target / "a" / "dat_0.ipc", - target / "b", target / "b" / "dat_1.ipc" + target / "a", target / "a" / "part-0.feather", + target / "b", target / "b" / "part-1.feather" ] partitioning_schema = ds.partitioning( pa.schema([("part", pa.string())])) @@ -2258,11 +2254,6 @@ def test_write_dataset_use_threads(tempdir): use_threads=False ) - # check that all files are the same in both cases - paths1 = [p.relative_to(target1) for p in target1.rglob("*")] - paths2 = [p.relative_to(target2) for p in target2.rglob("*")] - assert set(paths1) == set(paths2) - # check that reading in gives same result result1 = ds.dataset(target1, format="feather", partitioning=partitioning) result2 = ds.dataset(target2, format="feather", partitioning=partitioning) @@ -2276,10 +2267,11 @@ def test_write_table(tempdir): ], names=["f1", "f2", "part"]) base_dir = tempdir / 'single' - ds.write_dataset(table, base_dir, format="feather") + ds.write_dataset(table, base_dir, + basename_template='dat_{i}.arrow', format="feather") # check that all files are present file_paths = list(base_dir.rglob("*")) - expected_paths = [base_dir / "dat_0.ipc"] + expected_paths = [base_dir / "dat_0.arrow"] assert set(file_paths) == set(expected_paths) # check Table roundtrip result = ds.dataset(base_dir, format="ipc").to_table() @@ -2289,12 +2281,13 @@ def test_write_table(tempdir): base_dir = tempdir / 'partitioned' partitioning = ds.partitioning( pa.schema([("part", pa.string())]), flavor="hive") - ds.write_dataset( - table, base_dir, format="feather", partitioning=partitioning) + ds.write_dataset(table, base_dir, format="feather", + basename_template='dat_{i}.arrow', + partitioning=partitioning) file_paths = list(base_dir.rglob("*")) expected_paths = [ - base_dir / "part=a", base_dir / "part=a" / "dat_0.ipc", - base_dir / "part=b", base_dir / "part=b" / "dat_0.ipc" + base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", + base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow" ] assert set(file_paths) == set(expected_paths) result = ds.dataset(base_dir, format="ipc", partitioning=partitioning) @@ -2311,27 +2304,27 @@ def test_write_table_multiple_fragments(tempdir): # Table with multiple batches written as single Fragment by default base_dir = tempdir / 'single' ds.write_dataset(table, base_dir, format="feather") - assert set(base_dir.rglob("*")) == set([base_dir / "dat_0.ipc"]) + assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"]) assert ds.dataset(base_dir, format="ipc").to_table().equals(table) # Same for single-element list of Table base_dir = tempdir / 'single-list' ds.write_dataset([table], base_dir, format="feather") - assert set(base_dir.rglob("*")) == set([base_dir / "dat_0.ipc"]) + assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"]) assert ds.dataset(base_dir, format="ipc").to_table().equals(table) # Provide list of batches to write multiple fragments base_dir = tempdir / 'multiple' ds.write_dataset(table.to_batches(), base_dir, format="feather") assert set(base_dir.rglob("*")) == set( - [base_dir / "dat_0.ipc", base_dir / "dat_1.ipc"]) + [base_dir / "part-0.feather"]) assert ds.dataset(base_dir, format="ipc").to_table().equals(table) # Provide list of tables to write multiple fragments base_dir = tempdir / 'multiple-table' ds.write_dataset([table, table], base_dir, format="feather") assert set(base_dir.rglob("*")) == set( - [base_dir / "dat_0.ipc", base_dir / "dat_1.ipc"]) + [base_dir / "part-0.feather"]) assert ds.dataset(base_dir, format="ipc").to_table().equals( pa.concat_tables([table]*2) ) @@ -2352,17 +2345,17 @@ def test_write_dataset_parquet(tempdir): ds.write_dataset(table, base_dir, format="parquet") # check that all files are present file_paths = list(base_dir.rglob("*")) - expected_paths = [base_dir / "dat_0.parquet"] + expected_paths = [base_dir / "part-0.parquet"] assert set(file_paths) == set(expected_paths) # check Table roundtrip result = ds.dataset(base_dir, format="parquet").to_table() assert result.equals(table) - # using custom options / format object - + # using custom options for version in ["1.0", "2.0"]: - format = ds.ParquetFileFormat(write_options=dict(version=version)) + format = ds.ParquetFileFormat() + opts = format.make_write_options(version=version) base_dir = tempdir / 'parquet_dataset_version{0}'.format(version) - ds.write_dataset(table, base_dir, format=format) - meta = pq.read_metadata(base_dir / "dat_0.parquet") + ds.write_dataset(table, base_dir, format=format, file_options=opts) + meta = pq.read_metadata(base_dir / "part-0.parquet") assert meta.format_version == version diff --git a/r/NAMESPACE b/r/NAMESPACE index a46f72ea2e6..f33d39244d4 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -22,6 +22,7 @@ S3method(Ops,array_expression) S3method(all,equal.ArrowObject) S3method(as.character,Array) S3method(as.character,ChunkedArray) +S3method(as.character,FileFormat) S3method(as.character,Scalar) S3method(as.data.frame,RecordBatch) S3method(as.data.frame,Table) @@ -74,6 +75,7 @@ S3method(min,ChunkedArray) S3method(names,Dataset) S3method(names,FeatherReader) S3method(names,RecordBatch) +S3method(names,Scanner) S3method(names,ScannerBuilder) S3method(names,Schema) S3method(names,Table) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c059c761efe..932d95396e7 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -396,12 +396,20 @@ dataset___FileFormat__type_name <- function(format){ .Call(`_arrow_dataset___FileFormat__type_name` , format) } -dataset___ParquetFileFormat__MakeRead <- function(use_buffered_stream, buffer_size, dict_columns){ - .Call(`_arrow_dataset___ParquetFileFormat__MakeRead` , use_buffered_stream, buffer_size, dict_columns) +dataset___FileFormat__DefaultWriteOptions <- function(fmt){ + .Call(`_arrow_dataset___FileFormat__DefaultWriteOptions` , fmt) } -dataset___ParquetFileFormat__MakeWrite <- function(writer_props, arrow_props){ - .Call(`_arrow_dataset___ParquetFileFormat__MakeWrite` , writer_props, arrow_props) +dataset___ParquetFileFormat__Make <- function(use_buffered_stream, buffer_size, dict_columns){ + .Call(`_arrow_dataset___ParquetFileFormat__Make` , use_buffered_stream, buffer_size, dict_columns) +} + +dataset___FileWriteOptions__type_name <- function(options){ + .Call(`_arrow_dataset___FileWriteOptions__type_name` , options) +} + +dataset___ParquetFileWriteOptions__update <- function(options, writer_props, arrow_writer_props){ + invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` , options, writer_props, arrow_writer_props)) } dataset___IpcFileFormat__Make <- function(){ @@ -464,12 +472,16 @@ dataset___Scanner__Scan <- function(scanner){ .Call(`_arrow_dataset___Scanner__Scan` , scanner) } +dataset___Scanner__schema <- function(sc){ + .Call(`_arrow_dataset___Scanner__schema` , sc) +} + dataset___ScanTask__get_batches <- function(scan_task){ .Call(`_arrow_dataset___ScanTask__get_batches` , scan_task) } -dataset___Dataset__Write <- function(ds, schema, format, filesystem, path, partitioning){ - invisible(.Call(`_arrow_dataset___Dataset__Write` , ds, schema, format, filesystem, path, partitioning)) +dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner){ + invisible(.Call(`_arrow_dataset___Dataset__Write` , file_write_options, filesystem, base_dir, partitioning, basename_template, scanner)) } shared_ptr_is_null <- function(xp){ diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index 46e7ec4e649..ec3ac36f9e5 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -84,6 +84,13 @@ FileFormat$create <- function(format, ...) { } } +#' @export +as.character.FileFormat <- function(x, ...) { + out <- x$type + # Slight hack: special case IPC -> feather, otherwise is just the type_name + ifelse(out == "ipc", "feather", out) +} + #' @usage NULL #' @format NULL #' @rdname FileFormat @@ -91,18 +98,9 @@ FileFormat$create <- function(format, ...) { ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat) ParquetFileFormat$create <- function(use_buffered_stream = FALSE, buffer_size = 8196, - dict_columns = character(0), - writer_properties = NULL, - arrow_writer_properties = NULL) { - if (is.null(writer_properties) && is.null(arrow_writer_properties)) { - shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__MakeRead( - use_buffered_stream, buffer_size, dict_columns)) - } else { - writer_properties = writer_properties %||% ParquetWriterProperties$create() - arrow_writer_properties = arrow_writer_properties %||% ParquetArrowWriterProperties$create() - shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__MakeWrite( - writer_properties, arrow_writer_properties)) - } + dict_columns = character(0)) { + shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make( + use_buffered_stream, buffer_size, dict_columns)) } #' @usage NULL @@ -129,3 +127,30 @@ csv_file_format_parse_options <- function(...) { CsvParseOptions$create(...) } } + +#' Format-specific write options +#' +#' @description +#' A `FileWriteOptions` holds write options specific to a `FileFormat`. +FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, + public = list( + update = function(...) { + if (self$type == "parquet") { + dataset___ParquetFileWriteOptions__update(self, + ParquetWriterProperties$create(...), + ParquetArrowWriterProperties$create(...)) + } + invisible(self) + } + ), + active = list( + type = function() dataset___FileWriteOptions__type_name(self) + ) +) +FileWriteOptions$create <- function(format, ...) { + if (!inherits(format, "FileFormat")) { + format <- FileFormat$create(format) + } + options <- shared_ptr(FileWriteOptions, dataset___FileFormat__DefaultWriteOptions(format)) + options$update(...) +} diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 4aabd2c2bbf..e9017825782 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -57,6 +57,9 @@ Scanner <- R6Class("Scanner", inherit = ArrowObject, public = list( ToTable = function() shared_ptr(Table, dataset___Scanner__ToTable(self)), Scan = function() map(dataset___Scanner__Scan(self), shared_ptr, class = ScanTask) + ), + active = list( + schema = function() shared_ptr(Schema, dataset___Scanner__schema(self)) ) ) Scanner$create <- function(dataset, @@ -65,7 +68,7 @@ Scanner$create <- function(dataset, use_threads = option_use_threads(), batch_size = NULL, ...) { - if (inherits(dataset, "arrow_dplyr_query") && inherits(dataset$.data, "Dataset")) { + if (inherits(dataset, "arrow_dplyr_query")) { return(Scanner$create( dataset$.data, dataset$selected_columns, @@ -74,7 +77,11 @@ Scanner$create <- function(dataset, ... )) } + if (inherits(dataset, c("data.frame", "RecordBatch", "Table"))) { + dataset <- InMemoryDataset$create(dataset) + } assert_is(dataset, "Dataset") + scanner_builder <- dataset$NewScan() if (use_threads) { scanner_builder$UseThreads() @@ -91,6 +98,9 @@ Scanner$create <- function(dataset, scanner_builder$Finish() } +#' @export +names.Scanner <- function(x) names(x$schema) + ScanTask <- R6Class("ScanTask", inherit = ArrowObject, public = list( Execute = function() map(dataset___ScanTask__get_batches(self), shared_ptr, class = RecordBatch) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 8cfd8dec527..abeb0ce4393 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -24,81 +24,64 @@ #' @param dataset [Dataset], [RecordBatch], [Table], `arrow_dplyr_query`, or #' `data.frame`. If an `arrow_dplyr_query` or `grouped_df`, #' `schema` and `partitioning` will be taken from the result of any `select()` -#' and `group_by()` operations done on the dataset. Note that `filter()` queries -#' are not currently supported, and `select`-ed columns may not be renamed. -#' @param path string path to a directory to write to (directory will be +#' and `group_by()` operations done on the dataset. `filter()` queries will be +#' applied to restrict written rows. +#' Note that `select()`-ed columns may not be renamed. +#' @param path string path or URI to a directory to write to (directory will be #' created if it does not exist) #' @param format file format to write the dataset to. Currently supported #' formats are "feather" (aka "ipc") and "parquet". Default is to write to the #' same format as `dataset`. -#' @param schema [Schema] containing a subset of columns, possibly reordered, -#' in `dataset`. Default is `dataset$schema`, i.e. all columns. #' @param partitioning `Partitioning` or a character vector of columns to #' use as partition keys (to be written as path segments). Default is to #' use the current `group_by()` columns. +#' @param basename_template string template for the names of files to be written. +#' Must contain `"{i}"`, which will be replaced with an autoincremented +#' integer to generate basenames of datafiles. For example, `"part-{i}.feather"` +#' will yield `"part-0.feather", ...`. #' @param hive_style logical: write partition segments as Hive-style #' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`. +#' @param filesystem A [FileSystem] where the dataset should be written if it is a +#' string file path; default is the local file system #' @param ... additional format-specific arguments. For available Parquet #' options, see [write_parquet()]. #' @return The input `dataset`, invisibly #' @export write_dataset <- function(dataset, path, - format = dataset$format$type, - schema = dataset$schema, + format = dataset$format, partitioning = dplyr::group_vars(dataset), + basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, + filesystem = NULL, ...) { if (inherits(dataset, "arrow_dplyr_query")) { - force(partitioning) # get the group_vars before we drop the object - # Check for a filter - if (!isTRUE(dataset$filtered_rows)) { - # TODO: - stop("Writing a filtered dataset is not yet supported", call. = FALSE) - } - # Check for a select - if (!identical(dataset$selected_columns, set_names(names(dataset$.data)))) { - # We can select a subset of columns but we can't rename them - if (!setequal(dataset$selected_columns, names(dataset$selected_columns))) { - stop("Renaming columns when writing a dataset is not yet supported", call. = FALSE) - } - dataset <- ensure_group_vars(dataset) - schema <- dataset$.data$schema[dataset$selected_columns] - } - dataset <- dataset$.data - } - if (inherits(dataset, c("data.frame", "RecordBatch", "Table"))) { - force(partitioning) # get the group_vars before we replace the object - if (inherits(dataset, "grouped_df")) { - # Drop the grouping metadata before writing; we've already consumed it - # now to construct `partitioning` and don't want it in the metadata$r - dataset <- dplyr::ungroup(dataset) - } - dataset <- InMemoryDataset$create(dataset) - } - if (!inherits(dataset, "Dataset")) { - stop("'dataset' must be a Dataset, not ", class(dataset)[1], call. = FALSE) - } - - if (!inherits(format, "FileFormat")) { - if (identical(format, "parquet")) { - # We have to do some special massaging of properties - writer_props <- ParquetWriterProperties$create(dataset, ...) - arrow_writer_props <- ParquetArrowWriterProperties$create(...) - format <- ParquetFileFormat$create(writer_properties = writer_props, arrow_writer_properties = arrow_writer_props) - } else { - format <- FileFormat$create(format, ...) + # We can select a subset of columns but we can't rename them + if (!all(dataset$selected_columns == names(dataset$selected_columns))) { + stop("Renaming columns when writing a dataset is not yet supported", call. = FALSE) } + # partitioning vars need to be in the `select` schema + dataset <- ensure_group_vars(dataset) + } else if (inherits(dataset, "grouped_df")) { + force(partitioning) + # Drop the grouping metadata before writing; we've already consumed it + # now to construct `partitioning` and don't want it in the metadata$r + dataset <- dplyr::ungroup(dataset) } + scanner <- Scanner$create(dataset) if (!inherits(partitioning, "Partitioning")) { - # TODO: tidyselect? - partition_schema <- dataset$schema[partitioning] + partition_schema <- scanner$schema[partitioning] if (isTRUE(hive_style)) { partitioning <- HivePartitioning$create(partition_schema) } else { partitioning <- DirectoryPartitioning$create(partition_schema) } } - dataset$write(path, format = format, partitioning = partitioning, schema = schema, ...) + + path_and_fs <- get_path_and_filesystem(path, filesystem) + options <- FileWriteOptions$create(format, table = scanner, ...) + + dataset___Dataset__Write(options, path_and_fs$fs, path_and_fs$path, + partitioning, basename_template, scanner) } diff --git a/r/R/dataset.R b/r/R/dataset.R index 7661c33292e..7b1d6609295 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -133,9 +133,6 @@ open_dataset <- function(sources, #' may also replace the dataset's schema by using `ds$schema <- new_schema`. #' This method currently supports only adding, removing, or reordering #' fields in the schema: you cannot alter or cast the field types. -#' - `$write(path, filesystem, schema, format, partitioning, ...)`: writes the -#' dataset to `path` in the `format` file format, partitioned by `partitioning`, -#' and invisibly returns `self`. See [write_dataset()]. #' #' `FileSystemDataset` has the following methods: #' - `$files`: Active binding, returns the files of the `FileSystemDataset` @@ -162,12 +159,7 @@ Dataset <- R6Class("Dataset", inherit = ArrowObject, # Start a new scan of the data # @return A [ScannerBuilder] NewScan = function() unique_ptr(ScannerBuilder, dataset___Dataset__NewScan(self)), - ToString = function() self$schema$ToString(), - write = function(path, filesystem = NULL, schema = self$schema, format, partitioning, ...) { - path_and_fs <- get_path_and_filesystem(path, filesystem) - dataset___Dataset__Write(self, schema, format, path_and_fs$fs, path_and_fs$path, partitioning) - invisible(self) - } + ToString = function() self$schema$ToString() ), active = list( schema = function(schema) { diff --git a/r/man/Dataset.Rd b/r/man/Dataset.Rd index 056b7d529e2..3c9a314195d 100644 --- a/r/man/Dataset.Rd +++ b/r/man/Dataset.Rd @@ -61,9 +61,6 @@ A \code{Dataset} has the following methods: may also replace the dataset's schema by using \code{ds$schema <- new_schema}. This method currently supports only adding, removing, or reordering fields in the schema: you cannot alter or cast the field types. -\item \verb{$write(path, filesystem, schema, format, partitioning, ...)}: writes the -dataset to \code{path} in the \code{format} file format, partitioned by \code{partitioning}, -and invisibly returns \code{self}. See \code{\link[=write_dataset]{write_dataset()}}. } \code{FileSystemDataset} has the following methods: diff --git a/r/man/FileWriteOptions.Rd b/r/man/FileWriteOptions.Rd new file mode 100644 index 00000000000..661393c8e0d --- /dev/null +++ b/r/man/FileWriteOptions.Rd @@ -0,0 +1,8 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataset-format.R +\name{FileWriteOptions} +\alias{FileWriteOptions} +\title{Format-specific write options} +\description{ +A \code{FileWriteOptions} holds write options specific to a \code{FileFormat}. +} diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd index bb020b6ce68..e12c4287266 100644 --- a/r/man/write_dataset.Rd +++ b/r/man/write_dataset.Rd @@ -7,10 +7,11 @@ write_dataset( dataset, path, - format = dataset$format$type, - schema = dataset$schema, + format = dataset$format, partitioning = dplyr::group_vars(dataset), + basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, + filesystem = NULL, ... ) } @@ -18,26 +19,32 @@ write_dataset( \item{dataset}{\link{Dataset}, \link{RecordBatch}, \link{Table}, \code{arrow_dplyr_query}, or \code{data.frame}. If an \code{arrow_dplyr_query} or \code{grouped_df}, \code{schema} and \code{partitioning} will be taken from the result of any \code{select()} -and \code{group_by()} operations done on the dataset. Note that \code{filter()} queries -are not currently supported, and \code{select}-ed columns may not be renamed.} +and \code{group_by()} operations done on the dataset. \code{filter()} queries will be +applied to restrict written rows. +Note that \code{select()}-ed columns may not be renamed.} -\item{path}{string path to a directory to write to (directory will be +\item{path}{string path or URI to a directory to write to (directory will be created if it does not exist)} \item{format}{file format to write the dataset to. Currently supported formats are "feather" (aka "ipc") and "parquet". Default is to write to the same format as \code{dataset}.} -\item{schema}{\link{Schema} containing a subset of columns, possibly reordered, -in \code{dataset}. Default is \code{dataset$schema}, i.e. all columns.} - \item{partitioning}{\code{Partitioning} or a character vector of columns to use as partition keys (to be written as path segments). Default is to use the current \code{group_by()} columns.} +\item{basename_template}{string template for the names of files to be written. +Must contain \code{"{i}"}, which will be replaced with an autoincremented +integer to generate basenames of datafiles. For example, \code{"part-{i}.feather"} +will yield \verb{"part-0.feather", ...}.} + \item{hive_style}{logical: write partition segments as Hive-style (\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is \code{TRUE}.} +\item{filesystem}{A \link{FileSystem} where the dataset should be written if it is a +string file path; default is the local file system} + \item{...}{additional format-specific arguments. For available Parquet options, see \code{\link[=write_parquet]{write_parquet()}}.} } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 999917188b1..1c7ab14e816 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1558,34 +1558,66 @@ extern "C" SEXP _arrow_dataset___FileFormat__type_name(SEXP format_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___ParquetFileFormat__MakeRead(bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns); -extern "C" SEXP _arrow_dataset___ParquetFileFormat__MakeRead(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ +std::shared_ptr dataset___FileFormat__DefaultWriteOptions(const std::shared_ptr& fmt); +extern "C" SEXP _arrow_dataset___FileFormat__DefaultWriteOptions(SEXP fmt_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type fmt(fmt_sexp); + return cpp11::as_sexp(dataset___FileFormat__DefaultWriteOptions(fmt)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___FileFormat__DefaultWriteOptions(SEXP fmt_sexp){ + Rf_error("Cannot call dataset___FileFormat__DefaultWriteOptions(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr dataset___ParquetFileFormat__Make(bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns); +extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ BEGIN_CPP11 arrow::r::Input::type use_buffered_stream(use_buffered_stream_sexp); arrow::r::Input::type buffer_size(buffer_size_sexp); arrow::r::Input::type dict_columns(dict_columns_sexp); - return cpp11::as_sexp(dataset___ParquetFileFormat__MakeRead(use_buffered_stream, buffer_size, dict_columns)); + return cpp11::as_sexp(dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ + Rf_error("Cannot call dataset___ParquetFileFormat__Make(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::string dataset___FileWriteOptions__type_name(const std::shared_ptr& options); +extern "C" SEXP _arrow_dataset___FileWriteOptions__type_name(SEXP options_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type options(options_sexp); + return cpp11::as_sexp(dataset___FileWriteOptions__type_name(options)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___ParquetFileFormat__MakeRead(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ - Rf_error("Cannot call dataset___ParquetFileFormat__MakeRead(). Please use arrow::install_arrow() to install required runtime libraries. "); +extern "C" SEXP _arrow_dataset___FileWriteOptions__type_name(SEXP options_sexp){ + Rf_error("Cannot call dataset___FileWriteOptions__type_name(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___ParquetFileFormat__MakeWrite(const std::shared_ptr& writer_props, const std::shared_ptr& arrow_props); -extern "C" SEXP _arrow_dataset___ParquetFileFormat__MakeWrite(SEXP writer_props_sexp, SEXP arrow_props_sexp){ +void dataset___ParquetFileWriteOptions__update(const std::shared_ptr& options, const std::shared_ptr& writer_props, const std::shared_ptr& arrow_writer_props); +extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_sexp, SEXP writer_props_sexp, SEXP arrow_writer_props_sexp){ BEGIN_CPP11 + arrow::r::Input&>::type options(options_sexp); arrow::r::Input&>::type writer_props(writer_props_sexp); - arrow::r::Input&>::type arrow_props(arrow_props_sexp); - return cpp11::as_sexp(dataset___ParquetFileFormat__MakeWrite(writer_props, arrow_props)); + arrow::r::Input&>::type arrow_writer_props(arrow_writer_props_sexp); + dataset___ParquetFileWriteOptions__update(options, writer_props, arrow_writer_props); + return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___ParquetFileFormat__MakeWrite(SEXP writer_props_sexp, SEXP arrow_props_sexp){ - Rf_error("Cannot call dataset___ParquetFileFormat__MakeWrite(). Please use arrow::install_arrow() to install required runtime libraries. "); +extern "C" SEXP _arrow_dataset___ParquetFileWriteOptions__update(SEXP options_sexp, SEXP writer_props_sexp, SEXP arrow_writer_props_sexp){ + Rf_error("Cannot call dataset___ParquetFileWriteOptions__update(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -1821,6 +1853,21 @@ extern "C" SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){ } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr dataset___Scanner__schema(const std::shared_ptr& sc); +extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type sc(sc_sexp); + return cpp11::as_sexp(dataset___Scanner__schema(sc)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ + Rf_error("Cannot call dataset___Scanner__schema(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_ARROW) std::vector> dataset___ScanTask__get_batches(const std::shared_ptr& scan_task); @@ -1838,21 +1885,21 @@ extern "C" SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -void dataset___Dataset__Write(const std::shared_ptr& ds, const std::shared_ptr& schema, const std::shared_ptr& format, const std::shared_ptr& filesystem, std::string path, const std::shared_ptr& partitioning); -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP ds_sexp, SEXP schema_sexp, SEXP format_sexp, SEXP filesystem_sexp, SEXP path_sexp, SEXP partitioning_sexp){ +void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner); +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){ BEGIN_CPP11 - arrow::r::Input&>::type ds(ds_sexp); - arrow::r::Input&>::type schema(schema_sexp); - arrow::r::Input&>::type format(format_sexp); + arrow::r::Input&>::type file_write_options(file_write_options_sexp); arrow::r::Input&>::type filesystem(filesystem_sexp); - arrow::r::Input::type path(path_sexp); + arrow::r::Input::type base_dir(base_dir_sexp); arrow::r::Input&>::type partitioning(partitioning_sexp); - dataset___Dataset__Write(ds, schema, format, filesystem, path, partitioning); + arrow::r::Input::type basename_template(basename_template_sexp); + arrow::r::Input&>::type scanner(scanner_sexp); + dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP ds_sexp, SEXP schema_sexp, SEXP format_sexp, SEXP filesystem_sexp, SEXP path_sexp, SEXP partitioning_sexp){ +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){ Rf_error("Cannot call dataset___Dataset__Write(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -6224,8 +6271,10 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FileSystemDatasetFactory__Make1", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make1, 3}, { "_arrow_dataset___FileSystemDatasetFactory__Make3", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make3, 4}, { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, - { "_arrow_dataset___ParquetFileFormat__MakeRead", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__MakeRead, 3}, - { "_arrow_dataset___ParquetFileFormat__MakeWrite", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__MakeWrite, 2}, + { "_arrow_dataset___FileFormat__DefaultWriteOptions", (DL_FUNC) &_arrow_dataset___FileFormat__DefaultWriteOptions, 1}, + { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, + { "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, + { "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 1}, { "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, @@ -6241,6 +6290,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__Scan", (DL_FUNC) &_arrow_dataset___Scanner__Scan, 1}, + { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 6}, { "_arrow_shared_ptr_is_null", (DL_FUNC) &_arrow_shared_ptr_is_null, 1}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index c041e114d2a..f4c3ed1d994 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -171,7 +171,13 @@ std::string dataset___FileFormat__type_name( } // [[arrow::export]] -std::shared_ptr dataset___ParquetFileFormat__MakeRead( +std::shared_ptr dataset___FileFormat__DefaultWriteOptions( + const std::shared_ptr& fmt) { + return fmt->DefaultWriteOptions(); +} + +// [[arrow::export]] +std::shared_ptr dataset___ParquetFileFormat__Make( bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns) { auto fmt = std::make_shared(); @@ -187,15 +193,18 @@ std::shared_ptr dataset___ParquetFileFormat__MakeRead( } // [[arrow::export]] -std::shared_ptr dataset___ParquetFileFormat__MakeWrite( - const std::shared_ptr& writer_props, - const std::shared_ptr& arrow_props) { - auto fmt = std::make_shared(); - - fmt->writer_properties = writer_props; - fmt->arrow_writer_properties = arrow_props; +std::string dataset___FileWriteOptions__type_name( + const std::shared_ptr& options) { + return options->type_name(); +} - return fmt; +// [[arrow::export]] +void dataset___ParquetFileWriteOptions__update( + const std::shared_ptr& options, + const std::shared_ptr& writer_props, + const std::shared_ptr& arrow_writer_props) { + options->writer_properties = writer_props; + options->arrow_writer_properties = arrow_writer_props; } // [[arrow::export]] @@ -316,6 +325,12 @@ std::vector> dataset___Scanner__Scan( return out; } +// [[arrow::export]] +std::shared_ptr dataset___Scanner__schema( + const std::shared_ptr& sc) { + return sc->schema(); +} + // [[arrow::export]] std::vector> dataset___ScanTask__get_batches( const std::shared_ptr& scan_task) { @@ -331,18 +346,18 @@ std::vector> dataset___ScanTask__get_batches } // [[arrow::export]] -void dataset___Dataset__Write(const std::shared_ptr& ds, - const std::shared_ptr& schema, - const std::shared_ptr& format, - const std::shared_ptr& filesystem, - std::string path, - const std::shared_ptr& partitioning) { - auto frags = ds->GetFragments(); - auto ctx = std::make_shared(); - ctx->use_threads = true; - StopIfNotOk(ds::FileSystemDataset::Write(schema, format, filesystem, path, partitioning, - ctx, std::move(frags))); - return; +void dataset___Dataset__Write( + const std::shared_ptr& file_write_options, + const std::shared_ptr& filesystem, std::string base_dir, + const std::shared_ptr& partitioning, std::string basename_template, + const std::shared_ptr& scanner) { + ds::FileSystemDatasetWriteOptions opts; + opts.file_write_options = file_write_options; + opts.filesystem = filesystem; + opts.base_dir = base_dir; + opts.partitioning = partitioning; + opts.basename_template = basename_template; + StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner)); } #endif diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 327d0394771..f33f8ad1def 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -759,6 +759,14 @@ test_that("Writing a dataset: CSV->IPC", { filter(integer > 6) %>% summarize(mean = mean(integer)) ) + + # Check whether "int" is present in the files or just in the dirs + first <- read_feather( + dir(dst_dir, pattern = ".feather$", recursive = TRUE, full.names = TRUE)[1], + as_data_frame = FALSE + ) + # It shouldn't be there + expect_false("int" %in% names(first)) }) test_that("Writing a dataset: Parquet->IPC", { @@ -853,6 +861,18 @@ test_that("Dataset writing: dplyr methods", { collect(new_ds) %>% arrange(int), rbind(df1[c("chr", "dbl", "int")], df2[c("chr", "dbl", "int")]) ) + + # filter to restrict written rows + dst_dir3 <- tempfile() + ds %>% + filter(int == 4) %>% + write_dataset(dst_dir3, format = "feather") + new_ds <- open_dataset(dst_dir3, format = "feather") + + expect_equivalent( + new_ds %>% select(names(df1)) %>% collect(), + df1 %>% filter(int == 4) + ) }) test_that("Dataset writing: non-hive", { @@ -870,7 +890,7 @@ test_that("Dataset writing: no partitioning", { dst_dir <- tempfile() write_dataset(ds, dst_dir, format = "feather", partitioning = NULL) expect_true(dir.exists(dst_dir)) - expect_true(length(dir(dst_dir)) > 1) + expect_true(length(dir(dst_dir)) > 0) }) test_that("Dataset writing: from data.frame", { @@ -965,21 +985,21 @@ test_that("Writing a dataset: Parquet format options", { }) test_that("Dataset writing: unsupported features/input validation", { - expect_error(write_dataset(4), "'dataset' must be a Dataset") + expect_error(write_dataset(4), 'dataset must be a "Dataset"') ds <- open_dataset(hive_dir) - - expect_error( - filter(ds, int == 4) %>% write_dataset(ds), - "Writing a filtered dataset is not yet supported" - ) expect_error( select(ds, integer = int) %>% write_dataset(ds), "Renaming columns when writing a dataset is not yet supported" ) - expect_error( write_dataset(ds, partitioning = c("int", "NOTACOLUMN"), format = "ipc"), 'Invalid field name: "NOTACOLUMN"' ) + expect_error( + write_dataset(ds, tempfile(), basename_template = "something_without_i") + ) + expect_error( + write_dataset(ds, tempfile(), basename_template = NULL) + ) }) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index 63902aac40a..5d9213f4f21 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -119,7 +119,7 @@ if (arrow_with_s3() && process_is_running("minio server")) { test_that("write_dataset with fs", { ds <- open_dataset(minio_path("hive_dir"), filesystem = fs) write_dataset(ds, minio_path("new_dataset_dir"), filesystem = fs) - expect_length(fs$GetFileInfo(FileSelector$create(minio_path("new_dataset_dir"))), 2) + expect_length(fs$GetFileInfo(FileSelector$create(minio_path("new_dataset_dir"))), 1) }) test_that("S3FileSystem input validation", { diff --git a/r/vignettes/dataset.Rmd b/r/vignettes/dataset.Rmd index 488f7f4f43b..d9c90261f82 100644 --- a/r/vignettes/dataset.Rmd +++ b/r/vignettes/dataset.Rmd @@ -328,17 +328,12 @@ system("tree nyc-taxi/feather") # feather # ├── payment_type=1 -# │ ├── dat_0.ipc -# │ ├── dat_1.ipc -# │ ├── dat_2.ipc -# │ ├── dat_3.ipc -# │ ├── dat_4.ipc -# │ └── dat_5.ipc +# │ └── part-5.feather # ├── payment_type=2 -# │ ├── dat_0.ipc +# │ └── part-0.feather # ... # └── payment_type=5 -# └── dat_2.ipc +# └── part-2.feather # # 5 directories, 25 files ``` @@ -350,6 +345,16 @@ partitions are because they can be read from the file paths. (To instead write bare values for partition segments, i.e. `1` rather than `payment_type=1`, call `write_dataset()` with `hive_style = FALSE`.) +Perhaps, though, `payment_type == 1` is the only data we care about for our +current work, and we just want to drop the rest and have a smaller working set. +For this, we can `filter()` them out when writing: + +```r +ds %>% + filter(payment_type == 1) %>% + write_dataset("nyc-taxi/feather", format = "feather") +``` + The other thing we can do when writing datasets is select a subset of and/or reorder columns. Suppose we never care about `vendor_id`, and being a string column, it can take up a lot of space when we read it in, so let's drop it: @@ -361,5 +366,5 @@ ds %>% write_dataset("nyc-taxi/feather", format = "feather") ``` -Note that you cannot currently rename columns when writing, nor can you `filter()` -and write a subset of the data. +Note that while you can select a subset of columns, +you cannot currently rename columns when writing.