Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ class KeyHasher {
size_t index_;
std::vector<col_index_t> indices_;
std::vector<KeyColumnMetadata> metadata_;
const RecordBatch* batch_;
std::atomic<const RecordBatch*> batch_;
Copy link
Contributor

@icexelloss icexelloss Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the KeyHasher class thread safe now? Since with this change there are two thread using this class, we should clearly document what the thread safety model of this class is. (What happens if the hash is invalidated in the middle of processing?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the KeyHasher class thread safe now?

I think so - see below for why.

Since with this change there are two thread using this class, we should clearly document what the thread safety model of this class is.

This doc describes the supported concurrency.

What happens if the hash is invalidated in the middle of processing?

Here's what I think happens. The hashes are always computed for the record batch found at the front of the queue. The critical (and probably rare) case is when the input-receiving thread gets a record batch during the computation of hashes, whence it invalidates the key hasher and pushes the record batch to the queue. In this case, the important point is that this pushed record batch cannot be at the front of the queue because the queue is not empty and the push is to the back of the queue. This invalidation may only lead to a recomputation of hashes for the same record batch at the front of the queue.

std::vector<HashType> hashes_;
LightContext ctx_;
std::vector<KeyColumnArray> column_arrays_;
Expand Down Expand Up @@ -819,7 +819,6 @@ class InputState {
have_active_batch &= !queue_.TryPop();
if (have_active_batch) {
DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed
}
}
Expand Down Expand Up @@ -897,7 +896,8 @@ class InputState {

Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
if (rb->num_rows() > 0) {
queue_.Push(rb); // only after above updates - push batch for processing
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
queue_.Push(rb); // only now push batch for processing
} else {
++batches_processed_; // don't enqueue empty batches, just record as processed
}
Expand Down