-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the enhancement requested
Both ConcurrentQueue and BackpressureConcurrentQueue are used only in AsofJoin and SortedMerge.
Consider ConcurrentQueue as process_ in AsofJoin:
arrow/cpp/src/arrow/acero/asof_join_node.cc
Lines 1104 to 1114 in 0fbf982
| void ProcessThread() { | |
| for (;;) { | |
| if (!process_.Pop()) { | |
| EndFromProcessThread(); | |
| return; | |
| } | |
| if (!Process()) { | |
| return; | |
| } | |
| } | |
| } |
and as
process_queue in SortedMerge:arrow/cpp/src/arrow/acero/sorted_merge_node.cc
Lines 586 to 597 in 0fbf982
| void EmitBatches() { | |
| while (true) { | |
| // Implementation note: If the queue is empty, we will block here | |
| if (process_queue.Pop() == kPoisonPill) { | |
| EndFromProcessThread(); | |
| } | |
| // Either we're out of data or something went wrong | |
| if (!PollOnce()) { | |
| return; | |
| } | |
| } | |
| } |
In both cases the code relies on Pop() to block on empty queue. It can be confirmed with current tests when you remove cond_.wait from:
arrow/cpp/src/arrow/acero/concurrent_queue_internal.h
Lines 36 to 40 in 0fbf982
| T Pop() { | |
| std::unique_lock<std::mutex> lock(mutex_); | |
| cond_.wait(lock, [&] { return !queue_.empty(); }); | |
| return PopUnlocked(); | |
| } |
This all seems fine and all if we assert Pop is a blocking. But then derived class BackpressureConcurrentQueue break this assertion with non blocking implementation of Pop. Even though in both cases it is used correctly (ie. tested for emptiness before pop) it makes the code messy.
I think the API should be made a little more explicit about blocking/non-blocking behaviour and less confusing for further use and ARROW_EXPORT it.
Side note. Confusing API could be the reason for this:
arrow/cpp/src/arrow/acero/asof_join_node.cc
Lines 641 to 646 in 0fbf982
| have_active_batch &= !queue_.TryPop(); | |
| if (have_active_batch) { | |
| DCHECK_GT(queue_.Front()->num_rows(), 0); // empty batches disallowed | |
| memo_.UpdateTime(GetTime(queue_.Front().get(), time_type_id_, time_col_index_, | |
| 0)); // time changed | |
| } |
In this context queue_.TryPop() always succeeds and
have_active_batch is always false. And that actually saves us from UB by queue_.Front() in empty after pop queue.
Component(s)
C++