-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11802: [Rust][DataFusion] Remove use of crossbeam channels to avoid potential deadlocks #9605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
| Ok(()) | ||
| }); | ||
| tokio::task::yield_now().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the workaround added in #9580 and it is now removed in favor of what we think is the fix of the root cause
| channels: Arc<Mutex<Vec<(Sender<MaybeBatch>, Receiver<MaybeBatch>)>>>, | ||
| /// Channels for sending batches from input partitions to output partitions. | ||
| /// Key is the partition number | ||
| channels: Arc< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note @Dandandan asked about using a HashMap vs some other structure. It is a HashMap for reasons explained by @edrevo here: edrevo@97c256c#r47626396
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't follow the answer completely; why do we need to remove it?
I think UnboundedSender is cheap to clone and cheap to keep in memory for the duration of RepartitionStream?
But maybe I would need to play with the code more to see why it is needed per se.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So ok if it's needed, but I just wasn't sure 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Dandandan
The use is here: https://github.com/apache/arrow/pull/9605/files#diff-b9b79e3b35bc8bfb43040ada3a4382bd0a0017ca1b1e8135be8fb310ff095674R229
Basically this code sets up all input and outputs channels for all of the partitions and then hands out one receiver at a time in some arbitrary order (depending on the partition argument).
UnboundedReceiver https://docs.rs/tokio/1.2.0/tokio/sync/mpsc/struct.UnboundedReceiver.html doesn't implement Clone (as it is multiple producer single consumer)
I suspect with some more thought a different structure could be used, but I couldn't convince myself it was a valuable use of time.
edrevo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
many thanks for following through with this
|
I checked this code out locally, merged from apache/master and reran the tests -- all still passes so merging in. |
|
I also tested partitioning parquet & reading parquet from datafusion - all worked OK 👍 |
Rationale
As spotted / articulated by @edrevo #9523 (comment), the intermixing of
crossbeamchannels (not designed forasyncand can block task threads) andasynccode such as DataFusion can lead to deadlock.At least one of the crossbeam uses predates DataFusion being async (e.g. the one in the parquet reader). The use of crossbeam in the repartition operator in #8982 may have resulted from the re-use of the same pattern.
Changes
RepartitionExecandParquetExec) and replace with tokio channels (which are designed for single threaded code).crossbeamdependency entirelymulti_threaded executor in tests (e.g.#[tokio::test(flavor = "multi_thread")]) which can mask hangsKudos / Thanks
This PR incorporates the work of @seddonm1 from #9603 and @edrevo in https://github.com/edrevo/arrow/tree/remove-crossbeam (namely 97c256c4f76b8185311f36a7b27e317588904a3a). A big thanks to both of them for their help in this endeavor.