Skip to content

Conversation

@tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #1939.

Rationale for this change

Currently SortExec performs the sort during the query plan

What changes are included in this PR?

This makes the SortExec lazy

Are there any user-facing changes?

SortExec no longer computes values during query planning

Ok(Box::pin(RecordBatchBoxStream::new(
self.schema(),
futures::stream::once(
do_sort(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nicer fix would be to make ExternalSort stream, or less async, or some combination of the two, but in the short term this is a quick fix

@tustvold tustvold requested a review from yjshen April 21, 2022 18:17
Comment on lines 78 to 79
/// Combines a [`BoxStream`] with [`SchemaRef`] implementing
/// [`RecordBatchStream`] for the combination
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Combines a [`BoxStream`] with [`SchemaRef`] implementing
/// [`RecordBatchStream`] for the combination
/// Combines a [`BoxStream`] (created by calling `boxed` on a stream that produced `RecordBatch`es)
/// with a [`SchemaRef`] implementing [`RecordBatchStream`] for the combination

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the whole boxed stream concept somewhat confusing, so I am trying to leave hints around for my future self

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it to no longer need a boxed stream, the trade-off is more pinning nonsense 😅 PTAL

@tustvold tustvold requested a review from alamb April 21, 2022 18:31
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better now (though I will be honest I don't really understand how pin_project works and how the code generated by this will be different than a Boxd stream 🤷

pin_project! {
/// Combines a [`Stream`] with a [`SchemaRef`] implementing
/// [`RecordBatchStream`] for the combination
pub(crate) struct RecordBatchStreamAdapter<S> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@tustvold
Copy link
Contributor Author

It'll have one-less stack allocation per stream, and one less dyn-dispatch per poll. Whether that matters 🤷‍♂️

@alamb
Copy link
Contributor

alamb commented Apr 21, 2022

It'll have one-less stack allocation per stream, and one less dyn-dispatch per poll. Whether that matters 🤷‍♂️

What I was getting at was that I don't know what sort of magic occurs as part of pin_project there may be more allocations I am not aware of. I am not worried, just opining

@tustvold
Copy link
Contributor Author

tustvold commented Apr 21, 2022

It's really just a trick to ensure a given field is consistently either structurally or non-structurally pinned - https://doc.rust-lang.org/nightly/core/pin/index.html#projections-and-structural-pinning.

Unlike say async_trait, there isn't that much wizardry occuring under the hood, the generated code is actually relatively straightforward, well as straightforward as anything to do with pinning 😆

pub(crate) fn project<'__pin>(
    self: ::pin_project_lite::__private::Pin<&'__pin mut Self>,
) -> Projection<'__pin, S> {
    unsafe {
        let Self {
            schema, stream
        } = self.get_unchecked_mut();
        Projection {
            schema: schema,
            stream
            : ::pin_project_lite::__private::Pin::new_unchecked(stream
            ),
        }
    }
}

@alamb
Copy link
Contributor

alamb commented Apr 21, 2022

It's really just a trick to ensure a given field is consistently either s

TIL

@tustvold tustvold merged commit 54b15f6 into apache:master Apr 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SortExec No Longer Streams Correctly

2 participants