-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-3316: [R] Multi-threaded conversion from R data.frame to Arrow table / record batch #9615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
bool CanExtendParallel(SEXP x, const std::shared_ptr<arrow::DataType>& type) {
// TODO: identify when it's ok to do things in parallel
return false;
}turning this to It would probably be better to move the "can this be done in parallel" to a virtual method of |
|
The issue about doing R things in parallel is that you can't really. Maybe we can have an R specific mutex: std::mutex& get_r_mutex() {
static std::mutex m ;
return m;
}that we can lock when we do need to call something in the R api, including making a template <class vector>
class synchronized {
public:
synchronized(SEXP x) {
std::lock_guard<std::mutex> lock(get_r_mutex());
data_ = new vector(x);
}
vector& data() {
return *data_;
}
~synchronized() {
std::lock_guard<std::mutex> lock(get_r_mutex());
delete data_;
}
private:
vector* data_;
};so that we can have something like this: // [[arrow::export]]
int parallel_test(int n) {
auto tasks = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool());
SEXP x = PROTECT(Rf_allocVector(REALSXP, 100));
std::atomic<int> count(0);
for (int i = 0; i < n; i++) {
tasks->Append([x, &count] {
synchronized<cpp11::doubles> dx(x);
int nx = dx.data().size();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
count += nx;
return arrow::Status::OK();
});
}
auto status = tasks->Finish();
UNPROTECT(1);
return count;
}Of course this only makes sure that the |
06a4d75 to
28baff4
Compare
28baff4 to
6e41082
Compare
|
Marking this as ready to review. I've changed the approach this week so that it does not need to resort to locking. This introduces the Then it has With this, the The |
|
@github-actions crossbow submit -g r |
|
Revision: 299c34f94c61c7017f4a9e32437ddd0d9bbd50ee Submitted crossbow builds: ursacomputing/crossbow @ actions-362 |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neal asked me to take a look at some of the parallel stuff since I've been working on some parallel code in the C++ code base as well. I think this is a very clever approach. You basically take a quick scheduling pass through all of the data to spawn as much parallel as you can and then tackle the rest serially.
One thing I would watch out for with DelayedExtend is iterating through the data itself both before and after since you will most likely lose your CPU cache between the iterations and be forced to load the data out of RAM twice. I'm pretty sure you are not doing that here so I don't think it is a problem. Future DelayedExtend implementations will need to keep an eye out though.
I'm going to try and take a bit more of a look tomorrow but here are some initial comments.
r/src/r_to_arrow.cpp
Outdated
| } | ||
|
|
||
| // then wait for the parallel tasks to finish | ||
| status &= parallel_tasks->Finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if there were a good way to trigger the parallel_tasks to fail early if status was not ok here (and we broke out of the serial loop above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scanning through quickly I just thought "it would be nice" but now thinking about implementing it I think "but it sure would be tricky" 😆
I think you would need to use a stop token of some kind. Either arrow::StopToken in arrow/util/cancel.h or an atomic bool of some kind that is set by the serial tasks.
TaskGroup::MakeThreaded can take in a stop token as well. That might be easiest:
- Create a
StopSource - Get a
StopTokenfrom the source - Pass the
StopTokenintoTaskGroup::MakeThreaded - After doing the serial work, if there is an error, call
RequestStopon the source.
That would cancel any conversion tasks that are scheduled but not yet executing. Any tasks currently executing would simply have to run until completion. If you really wanted to make it responsive then you could pass the StopToken into your conversion functions and check it periodically to see if a stop was requested and, if so, bail early.
r/src/r_to_arrow.cpp
Outdated
| template <typename Iterator, typename AppendNull, typename AppendValue> | ||
| Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null, | ||
| AppendValue&& append_value) { | ||
| for (R_xlen_t i = 0; i < n; i++, ++it) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: You're pretty close to being able to use a range-based for loop here. I'm not sure how difficult it would be to create an end() pointer and an iterator equality function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might revisit when we tackle chunking, iterator equality and end() should be straightforward as the iterator classes we use are just wrappers around either pointers or cpp11 iterators.
The only thing is that when we'll chunk, we might not iterate from start to end.
|
Ok, Finally got these benchmarks re-run and this report put together. TL;DR: For multi-core operation:
For single-core operation: Here's a zip* of the report
|
|
Ok, I've added in a run from the commit that this branch is based off (16a0739 ) of to be a more close comparison, and things are murkier:
|
|
I'm a little skeptical, with the exception of the big change on the data.frames of factor columns, that this isn't just noise. I don't think there's been any other changes in the data.frame to Arrow code between latest master and where this branch is based. For the sake of argument, let's assume that "a little better or a little worse" is really just no change. I'm more surprised that there seems only to be that one improvement. The fannie mae dataset has 31 columns: with 8 cores, why is essentially the same performance as before/with 1 core? |
|
Absolutely, aside from factors, all of these differences are compatible with being pure noise / no real change. If we don't see any speed up with any types other than factors, I'm not totally surprised that the naturalistic data sets aren't seeing an improvement since fannie + nyctaxi when read in as data.frames don't result in any factors. And the chi traffic dataset which starts as a parquet only has two columns which are factors. |
|
Also, I should have been more careful with my words and that "worse" should have really been "not-convincingly-better" |
299c34f to
feaf577
Compare
|
What is the schema of the fanni mae data set ? Does it have some missing values ? Maybe the code goes through this branch: if (arrow::r::can_reuse_memory(x, options.type)) {
columns[j] = std::make_shared<arrow::ChunkedArray>(
arrow::r::vec_to_arrow__reuse_memory(x));
}which for now does not benefit from parallelization, and perhaps should, at least when there are some NA to deal with: // this is only used on some special cases when the arrow Array can just use the memory of
// the R object, via an RBuffer, hence be zero copy
template <int RTYPE, typename RVector, typename Type>
std::shared_ptr<Array> MakeSimpleArray(SEXP x) {
using value_type = typename arrow::TypeTraits<Type>::ArrayType::value_type;
RVector vec(x);
auto n = vec.size();
auto p_vec_start = reinterpret_cast<const value_type*>(DATAPTR_RO(vec));
auto p_vec_end = p_vec_start + n;
std::vector<std::shared_ptr<Buffer>> buffers{nullptr,
std::make_shared<RBuffer<RVector>>(vec)};
int null_count = 0;
auto first_na = std::find_if(p_vec_start, p_vec_end, is_NA<value_type>);
if (first_na < p_vec_end) {
auto null_bitmap =
ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool()));
internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n);
// first loop to clear all the bits before the first NA
auto j = std::distance(p_vec_start, first_na);
int i = 0;
for (; i < j; i++, bitmap_writer.Next()) {
bitmap_writer.Set();
}
auto p_vec = first_na;
// then finish
for (; i < n; i++, bitmap_writer.Next(), ++p_vec) {
if (is_NA<value_type>(*p_vec)) {
bitmap_writer.Clear();
null_count++;
} else {
bitmap_writer.Set();
}
}
bitmap_writer.Finish();
buffers[0] = std::move(null_bitmap);
}
auto data = ArrayData::Make(std::make_shared<Type>(), LENGTH(x), std::move(buffers),
null_count, 0 /*offset*/);
// return the right Array class
return std::make_shared<typename TypeTraits<Type>::ArrayType>(data);
}The Looking at this in the next few days. |
|
Oh, knowing about missing values is helpful, lemme dig more into that and see if I can replicate performance differences on those. Here's I also have been digging into differences across types. Factors seem to parallelize really well, so I tried to convert the chitraffic data frame which is a mic of strings + numerics + 2 factor columns, and when I do that (with 12 cpu cores available) the most I’m seeing the CPU get to is ~140% and even that is only briefly, most of the time the process is at 100% I then created a silly version of this dataset where I converted each of the columns into a factor (totally naively with as.factor()), and converting that is about half the time + the cpu usage peaks at ~300% though it drops down to 100% and then bumps back up a few times |
|
Thanks. The special case for |
This comment has been minimized.
This comment has been minimized.
|
Here's another example of trying a data.frame of strings and not seeing parallelization, but converting those strings to factors and boom we get parallelization: |
|
@jonkeane I believe the last commit will improve things. The zero copy cases are now handled in parallel, as it appears these cases might actually represent some work when dealing with missing values. |
|
Yes! I reran the benchmarks again (comparing the last commit here with the base commit and The naturalistic datasets aren't seeing much (if any) speed up — they are all within the noise range for variability that we see here. I'm going to dig into those separately and see if I see any funny patterns there that might explain it. |
|
I wonder if the logic for "parallelize what you can, then do the rest in serial" isn't working right. Maybe the natural datasets all have at least one column (string, most likely) that can't be parallel, and instead of parallelizing the integer/double/factor columns and then handling the strings, it just keeps them all serial. |
|
Yeah, I'm going to try testing that exactly and see if I can duplicate this behavior (probably tomorrow) |
|
Some drastic improvements for Looking into strings now, hoping to be able to leverage parallelism there too, it's currently not the case: void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override {
auto task = [this, values, size]() { return this->Extend(values, size); };
// TODO: refine this., e.g. extract setup from Extend()
tasks.Append(false, std::move(task));
} |
…tly because null handling).
…ation so that it may be done in parallel.
599efdc to
af3d42b
Compare
|
After all the for (int j = 0; j < num_fields; j++) {
auto& converter = converters[j];
if (converter != nullptr) {
auto maybe_array = converter->ToArray();
StopIfNotOk(maybe_array.status());
columns[j] = std::make_shared<arrow::ChunkedArray>(maybe_array.ValueUnsafe());
}
}I don't think there's any R involved there, so I suppose this could be done in parallel, with some care about the |
|
Done. This probably won't have much impact, because I guess by the time the converter does virtual Result<std::shared_ptr<Array>> ToArray() { return builder_->Finish(); }
|
|
@westonpace can you have a look at the updated |
|
@romainfrancois @westonpace @jonkeane Is this ready to merge? (The rtools35 error is spurious) |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really sorry for the delay, totally my mistake (I saw the ping, made a mental note, and then let the mental note get pushed out of my brain).
What you have should work fine. I think you could simplify it but if you wanted to do that in a follow-up that should be fine.
|
|
||
| // run the delayed tasks now | ||
| for (auto& task : delayed_serial_tasks_) { | ||
| status &= std::move(task)(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than wrapping all of your tasks in StoppingTask you really only need the StopSource as a way to send a signal to parallel_tasks_. Everywhere else you could handle stopping logic on your own. So I think you could change this loop to...
for (auto& task : delayed_serial_tasks_) {
status &= std::move(task)();
if (!status.ok()) {
stop_source_.RequestStop();
break;
}
}
...then you can get rid of StoppingTask. If an error happens in a parallel task the ThreadedTaskGroup will already take care of stopping everything.
|
Thanks @westonpace, I'll merge now and make a followup (edit: I made ARROW-12939) |
No description provided.