-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-9555: [Rust] [DataFusion] Added inner join #7830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This reduces * the runtime complexity of this operation from O(N*(1 + M)) to O(N*M) (N=number of rows, M=number of aggregations), * the memory footprint from O(N*M) acumulators to O(M) accumulators * the code complexity via DRY.
The gist of the implementation for a given partition is:
```
for left_record in left_records:
hash_left = build_hash_of_keys(left_record)
for right_record in right_records:
hash_right = build_hash_of_keys(right_record)
indexes = inner_join(hash_left, hash_right)
yield concat(left_record, right_record)[indexes]
```
I.e. inefficient.
The implementation is currently sequential, even though it can be trivially distributed as each RecordBatch is evaluated independently (we still lock the mutex on partition reading, as in other physical plans). Since we have not committed to a distributed computational model, IMO the sequential is enough for now.
|
Hi @jorgecarleitao Is this a nested inner loop join? I think we should be implementing a hash join instead. |
|
@andygrove , I am still learning these concepts in detail, so you will need to help me out here. :) AFAIK this is a block nested join, which reduces to the nested inner join if the recordBatch's size is one. I was unsure about the consequences of moving all the left side to a single partition, and thus took a more parallel approach of not assuming that a single partition fits in memory. Does this make sense? For me, we can implement both; I was just trying to have a parallel version in place, and then allow optimizers to pick them. I would also be fine with a hash join only. |
|
I think we are going to have to address (re-)partitioning first before we can tackle joins. If the two tables are both partitioned on the join keys and have the same number of partitions then the joins can happen in parallel across those partitions (at least, for inner joins this is true). However, if we want to implement a has join without doing that, I would suggest that we load one side (the build side) into memory (single partition) and then stream the other side (the probe side), performing a lookup in the hash table for each row. The stream side can happen in parallel. This is described in more detail here: |
|
I agree with you @andygrove that we need to revisit the partitioning before tackling this. Closing |
This is PR implements inner join, E.g.
SELECT a FROM simple1 JOIN simple2 ON a. I have not ran any benchmark, this is pure implementation plus some tests.The gist of the implementation of the physical plan for a given partition is:
The implementation is currently sequential, even though it can be trivially distributed as each RecordBatch is evaluated independently (we still lock the mutex on partition reading, as in other physical plans). Since we have not committed to a distributed computational model, IMO the sequential is enough for now.
This PR is built on top of #7687 and #7796