Skip to content

Conversation

@jiangzhx
Copy link
Contributor

@jiangzhx jiangzhx commented Apr 11, 2023

Which issue does this PR close?

it is difficult to write the same logic based on the DataFrame API as SQL.
Therefore, I created this example hoping to help others.

The reason for the difficulty is,

Maybe it's just my personal reason.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jiangzhx -- I do not think the challenge of making subqueries is only felt by you -- the focus has been on handling subqueries from SQL I think rather than with DataFrame.

I have some ideas how to make this better. I'll see what I can do

ctx.table("t1")
.await?
.filter(
Expr::ScalarSubquery(datafusion_expr::Subquery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is 🤮 -- let me see if I can come up with some way to make this easier to construct

),
outer_ref_columns: vec![],
})
.gt(lit(ScalarValue::UInt8(Some(0)))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.gt(lit(ScalarValue::UInt8(Some(0)))),
.gt(lit(0u8)),

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one way we could simplify the example: jiangzhx#179 (a PR into this branch)

I do think there

    ctx.table("t1")
        .await?
        .filter(
            exists(Arc::new(
                ctx.table("t2")
                    .await?
                    .filter(col("t1.c1").eq(col("t2.c1")))?
                    .aggregate(vec![], vec![avg(col("t2.c2"))])?
                    .select(vec![avg(col("t2.c2"))])?
                    .into_unoptimized_plan(),
            ))
            .gt(lit(0u8)),

By implementing some traits, I think we could remove the Arc and into_unoptimized_plan call

    ctx.table("t1")
        .await?
        .filter(
            exists( ctx.table("t2")
                    .await?
                    .filter(col("t1.c1").eq(col("t2.c1")))?
                    .aggregate(vec![], vec![avg(col("t2.c2"))])?
                    .select(vec![avg(col("t2.c2"))])?
            ))
            .gt(lit(0u8)),

with something like

pub fn scalar_subquery(subquery: impl IntoSubquery) -> Expr {
...
}

/// Something that can be converted into a plan suitable for a subquery
pub trait IntoSubquery {
  fn into_subquery(self) -> Arc<LogicalPlan>
}

// and then implement IntoSubqury for `LogicalPlan`, `Arc<LogicalPlan>` and `DataFrame` 

Though maybe that is getting to complicated 🤔

@alamb
Copy link
Contributor

alamb commented Apr 12, 2023

I think we can refine this example further but it is better than what we have at the moment

@alamb
Copy link
Contributor

alamb commented Apr 12, 2023

Thanks again @jiangzhx

@alamb alamb merged commit 0e5f6df into apache:main Apr 12, 2023
korowa pushed a commit to korowa/arrow-datafusion that referenced this pull request Apr 13, 2023
* add an example of using DataFrame to create a subquery

* Simplify expression examples

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants