Skip to content

Conversation

@tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #2306

Rationale for this change

See ticket

What changes are included in this PR?

This modifies CrossJoinExec so that it computes the left side data as part of stream execution, and not part of ExecutionPlan::execute

Are there any user-facing changes?

CrossJoinExec no longer evaluates during plan

&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
let left_result = match &self.left_result {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of arcane, hopefully the future changes to ExecutionPlan as part of #2199 will make writing these sorts of pipelines easier

Ok(left_data) => left_data,
Err(e) => {
return Poll::Ready(Some(Err(ArrowError::ExternalError(
e.to_string().into(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a hack as DatafusionError isn't clone-able


// merge all left parts into a single stream
let merge = CoalescePartitionsExec::new(self.left.clone());
let stream = merge.execute(0, context.clone()).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a bug in the old implementation, if part of evaluation errored - it would try it again for the next partition

}

/// Asynchronously collect the result of the left child
async fn load(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the code that used to be evaluated in ExecutionPlan::execute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it

Suggested change
async fn load(
async fn load_left_input(

to make it more explicit what is going on?

let left_result = match &self.left_result {
Some(data) => data,
None => {
let result = ready!(self.left_fut.poll_unpin(cx));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures-rs does have a MaybeDone construct, but this seemed simpler to understand

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ready! effectvely calls return Poll::Pending if the left_fut does the same, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- thank you @tustvold

I wish we had some way to test this, but I can't think of anything reasonable at this point

}

/// Asynchronously collect the result of the left child
async fn load(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it

Suggested change
async fn load(
async fn load_left_input(

to make it more explicit what is going on?

let left_result = match &self.left_result {
Some(data) => data,
None => {
let result = ready!(self.left_fut.poll_unpin(cx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ready! effectvely calls return Poll::Pending if the left_fut does the same, right?

}
};

if left_data.num_rows() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is correct in the context of outer joins -- even if the left data has no rows, the stream may still produce output...

However, I see the original code did the same, so 🤷

       if left_data.num_rows() == 0 {
            return Ok(Box::pin(MemoryStream::try_new(
                vec![],
                self.schema.clone(),
                None,
            )?));
        }

(it probably only matters for joins that don't have an equality predicate)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just blindly reproduced the existing behaviour - I presumed this special case was there for a reason

@andygrove andygrove merged commit 8fcf53f into apache:master Apr 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CrossJoin Evaluates In ExecutionPlan::execute

3 participants