Skip to content

Conversation

@jorgecarleitao
Copy link
Member

This PR is based on top of #8630 and contains a physical node to perform an inner join in DataFusion.

This is still a draft, but IMO the design is here and the two tests already pass.

This is co-authored with @andygrove , that contributed to the design on how to perform this operation in the context of DataFusion (see ARROW-9555 for details).

The API used for the computation of the join at the arrow level is briefly discussed in this document.

There is still a lot to work on, but I I though it would be a good time to have a first round of discussions, and also to gauge timings wrt to the 3.0 release.

There are two main issues being addressed in this PR:

  • How to we perform the join at the partition level: this pr collects all batches from the left, and then issues a stream per part on the right. Each batch on that stream joins itself with all the ones from the left (N) via a hash. This allow us to only require computing the hash of a row once (first all the left, then one by one on the right).

  • How do we build an array from N (left) arrays and a set of indices (matching the hash from the right): this is done using the MutableArrayData being worked on ARROW-10540 [Rust] Improve filtering #8630, which incrementally memcopies slots from each of the N arrays based on the index. This implementation is useful because it works for all array types and does not require casting anything to rust native types (i.e. it operates on ArrayData, not specific implementations).

There are still some steps left to have a join in SQL, most notably the whole logical planning, the output_partition logic, the bindings to SQL and DataFrame API, update the optimizers to handle nodes with 2 children, and a whole battery of tests.

There is also a natural path for the other joins, as it will be a matter of incorporating the work already on PR #8689 that introduces the option to extend the MutableArrayData with nulls, the operation required for left and right joins.

@github-actions
Copy link

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great start but will limit us to cases where the column names are the same between two tables. As a follow-on we can expand this to support different names e..g customer.id = orders.customer_id.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this some more, I think it might be better to address this in the current PR. I would suggest either:

left_keys: Vec<String>
right_keys: Vec<String>,

or the more general case:

left_keys: Vec<Arc<dyn PhysicalExpr>>
right_keys: Vec<Arc<dyn PhysicalExpr>>,

It should be possible to perform an equi-join using any deterministic expression, for example:

SELECT a.*, b.* FROM a JOIN b on UPPER(a.name) = UPPER(CONCAT(b.first, ' ', b.last))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will have this fixed: I do not see any technical challenge here (apart from the schema changes).

Copy link
Member Author

@jorgecarleitao jorgecarleitao Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option here is to have an planning rule that whenever a non-column expression (e.g. a.a + 1 = a.b + 2) is passed to the on, it converts it to a projection on both sides. In terms of compute, AFAIK it is the same, as we always need to evaluate the array before computing the hash of each of its rows, and evaluating it on the Join or on a preceding projection seems to be equivalent.

The rational for using HashSet<String> was that this is way easier to represent (i.e. serialize and equate) on a partition key, which means when we introduce hashJoins on partitioned keys, it may be easier to tackle.

The other reason is that this has a much clearer separation of concerns between "evaluating an expression" => projection and "joining two plans" => join.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is an implicit projection that happens on the join expressions. This is usually not represented as a separate step in a query plan, in my experience at least, and is handled within the join operator itself. I agree that this would be equivalent in compute to having a separate projection in the plan.

I would be fine if we just want to support join on columns for this PR and look at supporting any expression as a separate issue. We can always work around the limitation by doing explicit projections before the join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For joins, the column=column case, the so called equijoin is so common and critical, I think it is important to make super fast and thus special cased. The more general expression case, while supported, is much less important than fast equi-joins, I think

I have seen both approaches (the join handles generic expressions, or the planner arranges to have only equijoins). For OUTER joins there is no choice but to do the evaluation in the join operator itself

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tried to implement this, my approach was to store the materialized left-hand side in an Arc<Mutex<_>> and my plan was to use logic like this:

if (partition == 0) {
  // materialize left input and store in Arc<Mutex<_>>
} else {
 // wait for left input to be materialized
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I woulld be fine with us doing this as a follow up and get the expensive version of this functionally correct before we optimize too much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this suggestion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate this execute limitation. I have an idea about it: I think that we should change async execute to return a vector of streams (one stream per part). I.e. instead of

let stream_0 = node.execute(0)?.await;
let stream_1 = node.execute(1)?.await;
...

we use

let streams = node.execute()?.await;
let stream_0 = streams[0];
let stream_1 = streams[1];
...

by doing this, we can make node.execute()'s implementation to compute the left side, and have that shared across all streams (HashJoinStream) without mutating the node (which IMO is good that it is immutable).

IMO we may not need an Arc<Mutex<, as that data is immutable: an Arc should be enough to share this across the streams (but again, I need to check this).

Copy link
Member

@andygrove andygrove Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more standard to have all the left fields followed by all the right fields. Also, the new schema should have fully qualified names that include the table/alias prefix.

For example if we join table 'a' with fields id and name with table 'b' that also has fields id and name, the output schema would be a.id, a.name, b.id, b.name.

It is quite possible that we are lacking support currently for being able to reference these qualified field names, so we will need to deal with that before we can fully integrate this new join operator into DataFusion.

Copy link
Member Author

@jorgecarleitao jorgecarleitao Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted.

Since expressions may now contain columns from multiple Schemas/RecordBatch, I think that we need to modify all signatures of PhysicalExpr to receive multiple Schema and RecordBatch, as expressions may now contain columns from more than one RecordBatch.

I agree that this is a blocker. I will work on that and leave this PR for now, as we can't progress any further here.

(This is a major change)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we would want to do that. Each operator has a single output schema. The join operator creates a new schema derived from the input schemas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output schema of the join would simply have fields named "a.id", "a.name", "b.id", "b.name" based on the previous example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgecarleitao Just wanted to make sure you saw this ☝️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. This is simpler and forces the user to add aliases as required to avoid conflicts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think we need to support the case where the join key columns have different names in each table though. Here is an example from one of the simplest TPC-H queries.

FROM
    orders,
    lineitem
WHERE
    o_orderkey = l_orderkey

Copy link
Member Author

@jorgecarleitao jorgecarleitao Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will prototype to double check that it works well at the schema validation.

Just to check whether we agree on some cases:

df.columns = ['a', 'l_orderkey']

df1.columns = ['o_orderkey', 'l_orderkey']

df.join(df1, on=df['l_orderkey'] == df1['o_orderkey'])

# raises an error (there will be two columns named `l_orderkey`).
df.columns = ['l_orderkey', 'b']

df1.columns = ['o_orderkey', 'c']

df.join(df1, on=df['l_orderkey'] == df1['o_orderkey'])

# Ok: columns = ['l_orderkey', 'b', 'o_orderkey', 'c']  (order: left, right, different column names => both are returned)
df.columns = ['a', 'o_orderkey']

df1.columns = ['o_orderkey', 'c']

df.join(df1, on=df['o_orderkey'] == df1['o_orderkey'])

# Ok: columns = ['a', 'o_orderkey', 'c']  (order: left, right, same column name => only one is returned)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should error (long term) if the two inputs have the same column names. It is fine for an initial implementation, but I think we should automatically created aliases to disambiguate such outputs long term

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb , it is difficult to argue against no arguments :)

My argument is based on the principle of least surprise: every time we desambiguate, we surprise the users by mutating the finely crafted plan that they created.

Note that this does not happen in SQL, as in SQL all names are disambiguated by the table qualifier (e.g. SELECT a.b, b.b ...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. The output partitioning is the same as the partitioning of the right-hand input.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming! I was not really sure about this :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense for us to add a HashPartitioning variant here, but this isn't required for implementing the hash join operator since we aren't changing partitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I was working on the repartition and join at the same time at some point, and this slipped through the cracks.

@andygrove
Copy link
Member

I did a first pass review and this is looking really great @jorgecarleitao. I left comments for the things that I think need to be addressed in this PR. I will be able to help out with plumbing this through from SQL/DataFrame and query planner. This is going to allow us to support most of the TPC-H queries so I am really excited to see this get merged 🚀

@jorgecarleitao
Copy link
Member Author

I did a first pass review and this is looking really great @jorgecarleitao. I left comments for the things that I think need to be addressed in this PR. I will be able to help out with plumbing this through from SQL/DataFrame and query planner. This is going to allow us to support most of the TPC-H queries so I am really excited to see this get merged 🚀

Thanks a lot, @andygrove . I will be focusing on the physical plans alone, so any help on the SQL/DataFrame and logical plans would be really appreciated.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLDR; Thank you very much @jorgecarleitao ❤️ ❤️ -- I haven't reviewed all this code carefully, but at this point given how large this PR is and how important this functionality is, I suggest we get this PR merged in as quickly as possible and then iterate on it afterwards.

Joins in particular can get arbitrarily complicated I think, so just getting something in and iterating is key.

I suspect as we start plumbing joins through the planner we'll find things we want to change, but I don't think we are likely to get all that right if we try to guess it up front.

Some other thoughts:

  • Joins typically use the terms "inner" and "outer" for their two input relations. I would describe this operator as a classic "inner join" where the semantics of the two inputs are identical (though performance characteristics are not)

  • In a hash join such as this, terms from the literature are "build" input (the side that you build the the hash table from) and the probe input (the side you test for being in the hash table)

FWIW I am used to seeing the build side on the right, and the probe side on the left, but that is just convention.

this pr collects all batches from the left, and then issues a stream per part on the right.

This is the standard hash join approach and I think it is a good initial implementation. There are more sophisticated strategies that re-partition both the left and right inputs so you can build multiple disjoint hashtables, but that doesn't always go faster and is significantly more complicated to implement

most notably the whole logical planning, the output_partition logic, the bindings to SQL and DataFrame API

I can make time ot help with this -- it is directly related to some of the longer term thing we want to do in IOx and I have some experience in doing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"left, build side -- built into a hash table"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb thanks a lot for the review. Really great knowledge and notation about this. I changed the wording to match that.

My largest concern (which I think you also had) was that this PR was changing the filter code. I removed all those changes to have this PR focused on the join, and not force us to change the filter because we wanted this.

I agree with your comment on the other PR that we may want to keep the filter code as is. The change of making the MutableDataArray accepting multiple arrays makes the filter even slower, which IMO further strengths that idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"/// left (build) side which gets hashed"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"how --> now"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be easier code to write if the on expresion was of the form:

equality_expressions: Vec<(Expr, Expr)>
other_expressions: Vec<Expr>

Where the planner identifies quality expressions and breaks them up -- again I can help write this code.

@jorgecarleitao
Copy link
Member Author

Hi, thanks a lot for the feedback @andygrove and @alamb . I have changed this PR in the following ways:

  1. removed all changes wrt to the filter and just kept the MutableDataArray construct for the join, to not mix the changes.
  2. Made "on" be Vec<(String, String)>.

The tests are not passing, but I will work on them over the weekend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here and I think this code might not be doing what you intended. left_missing contains all the left fields except for the fields references in the on clause, so this function will always fail unless the both tables only have join keys and no other columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the tests pass with this code:

    let on_left = &on.iter().map(|on| on.0.to_string()).collect::<HashSet<_>>();
    let left_keys = left
        .iter()
        .filter(|f| on_left.contains(*f))
        .map(|s| s.clone())
        .collect::<HashSet<_>>();
    let left_missing = on_left.difference(&left_keys).collect::<HashSet<_>>();

    let on_right = &on.iter().map(|on| on.1.to_string()).collect::<HashSet<_>>();
    let right_keys = right
        .iter()
        .filter(|f| on_right.contains(*f))
        .map(|s| s.clone())
        .collect::<HashSet<_>>();
    let right_missing = on_right.difference(&right_keys).collect::<HashSet<_>>();

    if left_missing.len() > 0 || right_missing.len() > 0 {
        return Err(DataFusionError::Plan(format!(
                "The left or right side of the join does not have columns \"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}",
                left_missing,
                right_missing,
            )));
    };

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry for the confusion: pushed without testing 🤦

I think that this is ready now for the second round. Main changes are described above.

@jorgecarleitao jorgecarleitao marked this pull request as ready for review November 20, 2020 20:32
@jorgecarleitao
Copy link
Member Author

I've added a couple of more tests verifying that the parts of the join are correct. I promoted this PR out of a draft, as I am now confident that at least we would not merge buggy code ^_^

@andygrove
Copy link
Member

I will be reviewing this tomorrow (Saturday). Thanks for all the work on this @jorgecarleitao

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I think we should merge this, then get the DataFrame.join PR reviewed and merged, and then we can implement some of the TPC-H queries that have joins so we can run some comparisons with other query engines to see if that finds any issues.

@andygrove andygrove closed this in 34ffc93 Nov 21, 2020
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Epic 🥇 for @jorgecarleitao

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants