Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
754e559
ARROW-9782: [C++][Dataset] More configurable Dataset writing
bkietz Sep 28, 2020
e2c1199
Minimal hacking to get the R tests passing
nealrichardson Sep 29, 2020
9eea1bd
fix Scanner splitting
bkietz Sep 30, 2020
c263f6c
remove debug print()s
bkietz Sep 30, 2020
e70dd9d
don't double unlock std::mutex
bkietz Sep 30, 2020
fa97c52
extract a helper for single-lookup insertion into maps
bkietz Oct 1, 2020
0815d6e
add a python binding for custom parquet write properties
bkietz Oct 1, 2020
2830bd0
remove unused schema parameter
bkietz Oct 1, 2020
9b8b8fc
repair parquet write options in R
bkietz Oct 1, 2020
bf7d392
add a test for writing with a selection
bkietz Oct 1, 2020
733af53
lint
bkietz Oct 1, 2020
0567537
make doc
bkietz Oct 1, 2020
f0da04a
document basename_template parameter
bkietz Oct 1, 2020
18ea1c3
enable on-write filtering of written datasets
bkietz Oct 2, 2020
bca8764
add LockFreeStack
bkietz Oct 2, 2020
1c6db50
extract and unit test string interpolation
bkietz Oct 2, 2020
6eb546e
refactor ::Write() to use explicit WriteQueues
bkietz Oct 2, 2020
ed5ec52
cache queue mapping local to each thread
bkietz Oct 2, 2020
1a541d6
lint, simplify WriteQueue storage, try workaround for atomic::atomic()
bkietz Oct 2, 2020
95e548e
comparator must be const
bkietz Oct 3, 2020
7fd7185
simplify thread local caching
bkietz Oct 4, 2020
dde5eed
simplify: revert local queue lookup caching
bkietz Oct 5, 2020
dfc2291
revert lock_free
bkietz Oct 5, 2020
a3454d9
more exact typing in GetOrInsertGenerated
bkietz Oct 5, 2020
448e04e
move lazy initialization locking into Flush()
bkietz Oct 5, 2020
87d863a
fix comment
bkietz Oct 5, 2020
7db8bf3
address review comments
bkietz Oct 6, 2020
33257a6
add default basename_template for python
bkietz Oct 6, 2020
d46c1af
R code/doc polishing
nealrichardson Oct 5, 2020
7f1255e
Update vignette now that you can filter when writing
nealrichardson Oct 5, 2020
16d9d53
lint fix
bkietz Oct 6, 2020
086f59d
writing without partitioning will create a single file
bkietz Oct 6, 2020
998d760
address review comments
bkietz Oct 7, 2020
5602aa8
correct R doc after dat_{i} -> part-{i}
bkietz Oct 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 198 additions & 69 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "arrow/dataset/file_base.h"

#include <algorithm>
#include <deque>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "arrow/dataset/dataset_internal.h"
Expand All @@ -31,6 +34,10 @@
#include "arrow/io/memory.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/map.h"
#include "arrow/util/mutex.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"

namespace arrow {
Expand Down Expand Up @@ -143,91 +150,213 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
return MakeVectorIterator(std::move(fragments));
}

struct WriteTask {
Status Execute();
Status FileWriter::Write(RecordBatchReader* batches) {
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next());
if (batch == nullptr) break;
RETURN_NOT_OK(Write(batch));
}
return Status::OK();
}

/// The basename of files written by this WriteTask. Extensions
/// are derived from format
std::string basename;
constexpr util::string_view kIntegerToken = "{i}";

/// The partitioning with which paths will be generated
std::shared_ptr<Partitioning> partitioning;
Status ValidateBasenameTemplate(util::string_view basename_template) {
if (basename_template.find(fs::internal::kSep) != util::string_view::npos) {
return Status::Invalid("basename_template contained '/'");
}
size_t token_start = basename_template.find(kIntegerToken);
if (token_start == util::string_view::npos) {
return Status::Invalid("basename_template did not contain '", kIntegerToken, "'");
}
return Status::OK();
}

/// The format in which fragments will be written
std::shared_ptr<FileFormat> format;
/// WriteQueue allows batches to be pushed from multiple threads while another thread
/// flushes some to disk.
class WriteQueue {
public:
WriteQueue(std::string partition_expression, size_t index,
std::shared_ptr<Schema> schema)
: partition_expression_(std::move(partition_expression)),
index_(index),
schema_(std::move(schema)) {}

// Push a batch into the writer's queue of pending writes.
void Push(std::shared_ptr<RecordBatch> batch) {
auto push_lock = push_mutex_.Lock();
pending_.push_back(std::move(batch));
}

/// The FileSystem and base directory into which fragments will be written
std::shared_ptr<fs::FileSystem> filesystem;
std::string base_dir;
// Flush all pending batches, or return immediately if another thread is already
// flushing this queue.
Status Flush(const FileSystemDatasetWriteOptions& write_options) {
if (auto writer_lock = writer_mutex_.TryLock()) {
if (writer_ == nullptr) {
// FileWriters are opened lazily to avoid blocking access to a scan-wide queue set
RETURN_NOT_OK(OpenWriter(write_options));
}

while (true) {
std::shared_ptr<RecordBatch> batch;
{
auto push_lock = push_mutex_.Lock();
if (pending_.empty()) {
// Ensure the writer_lock is released before the push_lock. Otherwise another
// thread might successfully Push() a batch but then fail to Flush() it since
// the writer_lock is still held, leaving an unflushed batch in pending_.
writer_lock.Unlock();
break;
}
batch = std::move(pending_.front());
pending_.pop_front();
}
RETURN_NOT_OK(writer_->Write(batch));
}
}
return Status::OK();
}

/// Batches to be written
std::shared_ptr<RecordBatchReader> batches;
const std::shared_ptr<FileWriter>& writer() const { return writer_; }

/// An Expression already satisfied by every batch to be written
std::shared_ptr<Expression> partition_expression;
};
private:
Status OpenWriter(const FileSystemDatasetWriteOptions& write_options) {
auto dir =
fs::internal::EnsureTrailingSlash(write_options.base_dir) + partition_expression_;

Status WriteTask::Execute() {
std::unordered_map<std::string, RecordBatchVector> path_to_batches;

// TODO(bkietz) these calls to Partition() should be scattered across a TaskGroup
for (auto maybe_batch : IteratorFromReader(batches)) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
ARROW_ASSIGN_OR_RAISE(auto partitioned_batches, partitioning->Partition(batch));
for (auto&& partitioned_batch : partitioned_batches) {
AndExpression expr(std::move(partitioned_batch.partition_expression),
partition_expression);
ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr));
path = fs::internal::EnsureLeadingSlash(path);
path_to_batches[path].push_back(std::move(partitioned_batch.batch));
auto basename = internal::Replace(write_options.basename_template, kIntegerToken,
std::to_string(index_));
if (!basename) {
return Status::Invalid("string interpolation of basename template failed");
}
}

for (auto&& path_batches : path_to_batches) {
auto dir = base_dir + path_batches.first;
RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true));
auto path = fs::internal::ConcatAbstractPath(dir, *basename);

auto path = fs::internal::ConcatAbstractPath(dir, basename);
ARROW_ASSIGN_OR_RAISE(auto destination, filesystem->OpenOutputStream(path));
RETURN_NOT_OK(write_options.filesystem->CreateDir(dir));
ARROW_ASSIGN_OR_RAISE(auto destination,
write_options.filesystem->OpenOutputStream(path));

DCHECK(!path_batches.second.empty());
ARROW_ASSIGN_OR_RAISE(auto reader,
RecordBatchReader::Make(std::move(path_batches.second)));
RETURN_NOT_OK(format->WriteFragment(reader.get(), destination.get()));
ARROW_ASSIGN_OR_RAISE(
writer_, write_options.format()->MakeWriter(std::move(destination), schema_,
write_options.file_write_options));
return Status::OK();
}

return Status::OK();
}
util::Mutex writer_mutex_;
std::shared_ptr<FileWriter> writer_;

util::Mutex push_mutex_;
std::deque<std::shared_ptr<RecordBatch>> pending_;

Status FileSystemDataset::Write(std::shared_ptr<Schema> schema,
std::shared_ptr<FileFormat> format,
std::shared_ptr<fs::FileSystem> filesystem,
std::string base_dir,
std::shared_ptr<Partitioning> partitioning,
std::shared_ptr<ScanContext> scan_context,
FragmentIterator fragment_it) {
auto task_group = scan_context->TaskGroup();

base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir));

int i = 0;
for (auto maybe_fragment : fragment_it) {
ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment);
auto task = std::make_shared<WriteTask>();

task->basename = "dat_" + std::to_string(i++) + "." + format->type_name();
task->partition_expression = fragment->partition_expression();
task->format = format;
task->filesystem = filesystem;
task->base_dir = base_dir;
task->partitioning = partitioning;

// make a record batch reader which yields from a fragment
ARROW_ASSIGN_OR_RAISE(task->batches, FragmentRecordBatchReader::Make(
std::move(fragment), schema, scan_context));
task_group->Append([task] { return task->Execute(); });
// The (formatted) partition expression to which this queue corresponds
std::string partition_expression_;

size_t index_;

std::shared_ptr<Schema> schema_;
};

Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
std::shared_ptr<Scanner> scanner) {
RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));

auto task_group = scanner->context()->TaskGroup();

// Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent:
//
// - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing
// any fragments we have before waiting for discovery to complete. This isn't
// currently implemented for FileSystemDataset anyway: ARROW-8613
//
// - ScanTask iteration. Keeping this lazy would save some unnecessary blocking when
// writing Fragments which produce scan tasks slowly. No Fragments do this.
//
// NB: neither of these will have any impact whatsoever on the common case of writing
// an in-memory table to disk.
ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, scanner->GetFragments().ToVector());
ScanTaskVector scan_tasks;
std::vector<const Fragment*> fragment_for_task;

// Avoid contention with multithreaded readers
auto context = std::make_shared<ScanContext>(*scanner->context());
context->use_threads = false;

for (const auto& fragment : fragments) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we know all fragments (and their expressions) already, can we avoid all the locking multi-threading in WriterSet (IIRC, you need them to create the writer once)? That would heavily simplify all of this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this context fragments are the object of writing rather than the target (so for example one might represent an in-memory table which is being copied to disk). Writers are not known ahead of time since they depend on the partitioning which depends on the set of unique values in a given column, which we discover only after running GroupBy on an input batch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do two scans of the input data:

  1. Assemble a list of all unique values in the partition columns of the data, from which we can determine the precise set of writers to open
  2. Apply groupings to batches, passing the results to pre-opened writers

This doesn't seem worthwhile to me; scanning the input is potentially expensive so we should avoid doing it twice. Furthermore we'll still need to coordinate between threads since two input batches might still contain rows bound for a single output writer.

auto options = std::make_shared<ScanOptions>(*scanner->options());
ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
Scanner(fragment, std::move(options), context).Scan());
for (auto maybe_scan_task : scan_task_it) {
ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
scan_tasks.push_back(std::move(scan_task));
fragment_for_task.push_back(fragment.get());
}
}

// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
// any thread could produce a batch for any partition, so each task alternates between
// pushing batches and flushing them to disk.
util::Mutex queues_mutex;
std::unordered_map<std::string, std::unique_ptr<WriteQueue>> queues;

auto fragment_for_task_it = fragment_for_task.begin();
for (const auto& scan_task : scan_tasks) {
const Fragment* fragment = *fragment_for_task_it++;

task_group->Append([&, scan_task, fragment] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());

for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory

std::unordered_set<WriteQueue*> need_flushed;
for (size_t i = 0; i < groups.batches.size(); ++i) {
AndExpression partition_expression(std::move(groups.expressions[i]),
fragment->partition_expression());
auto batch = std::move(groups.batches[i]);

ARROW_ASSIGN_OR_RAISE(auto part,
write_options.partitioning->Format(partition_expression));

WriteQueue* queue;
{
// lookup the queue to which batch should be appended
auto queues_lock = queues_mutex.Lock();

queue = internal::GetOrInsertGenerated(
&queues, std::move(part),
[&](const std::string& emplaced_part) {
// lookup in `queues` also failed,
// generate a new WriteQueue
size_t queue_index = queues.size() - 1;

return internal::make_unique<WriteQueue>(
emplaced_part, queue_index, batch->schema());
})
->second.get();
}

queue->Push(std::move(batch));
need_flushed.insert(queue);
}

// flush all touched WriteQueues
for (auto queue : need_flushed) {
RETURN_NOT_OK(queue->Flush(write_options));
}
}

return Status::OK();
});
}
RETURN_NOT_OK(task_group->Finish());

task_group = scanner->context()->TaskGroup();
for (const auto& part_queue : queues) {
task_group->Append([&] { return part_queue.second->writer()->Finish(); });
}
return task_group->Finish();
}

Expand Down
Loading