Skip to content

Conversation

@singhpk234
Copy link

What changes were proposed in this pull request?

We should propagate the row count stats in SizeInBytesOnlyStatsPlanVisitor if available. Row counts are propagated from connectors to spark in case of v2 tables.

Why are the changes needed?

This can improve stats estimation for v2 tables, since row count is used at places to estimate sizeInBytes.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Modified existing UT's to match the proposed behavior.

@github-actions github-actions bot added the SQL label Jul 5, 2022
@singhpk234
Copy link
Author

cc @huaxingao @cloud-fan @wangyum

Copy link
Member

@wangyum wangyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you enable spark.sql.cbo.enabled to estimate row count?

def stats: Statistics = statsCache.getOrElse {
if (conf.cboEnabled) {
statsCache = Option(BasicStatsPlanVisitor.visit(self))
} else {
statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
}
statsCache.get
}

@singhpk234
Copy link
Author

Could you enable spark.sql.cbo.enabled to estimate row count?

Thanks @wangyum, I am aware of the alternate visitor we use with cbo.

I raised this pr considering :

  1. cbo is turned off by default.
  2. We already have rowCount propagated via LeafNodes (DSv2Relation) which are used for estimating output size in SizeInBytesOnlyStatsPlanVisitor
    override def visitOffset(p: Offset): Statistics = {
    val offset = p.offsetExpr.eval().asInstanceOf[Int]
    val childStats = p.child.stats
    val rowCount: BigInt = childStats.rowCount.map(c => c - offset).map(_.max(0)).getOrElse(0)
    Statistics(
    sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
    rowCount = Some(rowCount))
    }
  3. ANALYZE is not supported for v2 tables so except row count, IMHO we can't have ndv etc. I am refering to this jira : https://issues.apache.org/jira/browse/SPARK-39420
  4. As per my understanding v1 tables can only pass in sizeInBytes unless they have some stats in catalog. whereas v2 tables already give both from the relation itself, hence I thought it's un-accounted for v2 tables.
    catalogTable
    .flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled)))
    .getOrElse(Statistics(sizeInBytes = relation.sizeInBytes))

Are you recommending it's an expected behavior / by design ?

@singhpk234 singhpk234 force-pushed the fix/stats_estimation_for_v2_sources branch from 6f683ef to dcaebec Compare July 5, 2022 11:10
@singhpk234
Copy link
Author

rebased and regenerated the golden files via :

  • SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly PlanStabilitySuite"
  • SPARK_GENERATE_GOLDEN_FILES=1 SPARK_ANSI_SQL_MODE=true build/sbt "sql/testOnly PlanStabilitySuite"

@singhpk234 singhpk234 force-pushed the fix/stats_estimation_for_v2_sources branch from dcaebec to c5526c2 Compare July 5, 2022 13:53
@singhpk234 singhpk234 force-pushed the fix/stats_estimation_for_v2_sources branch from c5526c2 to 2175a1a Compare July 5, 2022 14:26
@singhpk234 singhpk234 force-pushed the fix/stats_estimation_for_v2_sources branch from 5e5e72c to 7d88612 Compare July 5, 2022 17:18
@wangyum
Copy link
Member

wangyum commented Jul 6, 2022

I think it's by design. So enabling spark.sql.cbo.enabled is what you want?

@singhpk234
Copy link
Author

singhpk234 commented Jul 6, 2022

Thanks @wangyum !

So enabling spark.sql.cbo.enabled is what you want?

I believe then setting spark.sql.cbo.enabled to true by default could help, (what i wanted was to take this stat of row count, bubbled up from v2 connector to be accounted for in default spark behaviour) but I think it requires some additional efforts, since our other defaults such as auto-bhj etc needs to adjusted accordingly.

I think it's by design

for my knowledge, can you please point me to some jira's ,happy to learn more.

Love to know your thoughts on the same, Happy to close this as well if we consider this is not a problem at all.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@cloud-fan
Copy link
Contributor

I'm a bit confused. After this PR, what's the difference between SizeInBytesOnlyStatsPlanVisitor and BasicStatsPlanVisitor?

@singhpk234
Copy link
Author

After this PR, what's the difference between SizeInBytesOnlyStatsPlanVisitor and BasicStatsPlanVisitor

BasicStatsPlanVisitor additionally takes has columnStats such as (NDV / NullCount / min / max etc) on estimation, which generally is not passed from DSv1 / Dsv2 relation itself.

As per my understanding, prior to this PR, SizeInBytesOnlyStatsPlanVisitor was estimating stats on the subset of info i.e only sizeInBytes and BasicStatsPlanVisitor on all 3 info (sizeInBytes, rowcount,ColumStats (min /max /NDV etc), now via this PR SizeInBytesOnlyStatsPlanVisitor is estimating stats on the subset of info but this subset is now (sizeInBytes / rowCount) and BasicStatsPlanVisitor on all 3 info (sizeInBytes, rowcount,ColumStats (min /max /NDV etc).

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 8, 2022

Maybe we should name them BasicStatesPlanVisitor and AdvancedStatsPlanVisitor. We also need to make sure the updated SizeInBytesOnlyStatsPlanVisitor can propagate row count properly in all cases.

BTW, with CBO off, where do we use row count?

@singhpk234
Copy link
Author

singhpk234 commented Jul 8, 2022

BTW, with CBO off, where do we use row count?

we use it in places like :

override def visitOffset(p: Offset): Statistics = {
val offset = p.offsetExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
val rowCount: BigInt = childStats.rowCount.map(c => c - offset).map(_.max(0)).getOrElse(0)
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
rowCount = Some(rowCount))
}

where we just multiply row-count with row size. We also use it for BF to create bloomFilterAgg. In v1 scenario in case of logical relation row-count can seep in from catalog stats but as you correctly pointed out it has a has a chance of row-count getting lost in places where we assume we only have sizeInBytes for example here :

override def default(p: LogicalPlan): Statistics = p match {
case p: LeafNode => p.computeStats()
case _: LogicalPlan =>
Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product)
}

@cloud-fan
Copy link
Contributor

OK I think the idea makes sense. With CBO off, the optimizer/planner only needs size in bytes, but row count is also an important statistics to estimate size in bytes, and should be propagated in the stats plan visitor.

@cloud-fan
Copy link
Contributor

cc @wzhfy @c21 can you take a look first?

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks singhpk234 for the work! Having some comments/questions.


override def visitIntersect(p: Intersect): Statistics = fallback(p)

override def visitJoin(p: Join): Statistics = fallback(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we fallback here, but not use JoinEstimation.estimate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fallback here would endup calling BasicStatsPlanVisitor.visit(p) which, will in turn call BasicStatsPlanVisitor#visitJoin which will be JoinEstimation(p).estimate.getOrElse(default(p)). Hence added the same.


// v2 sources can bubble-up rowCount, so always propagate.
// Don't propagate attributeStats, since they are not estimated here.
Statistics(sizeInBytes = sizeInBytes, rowCount = p.child.stats.rowCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused here. In the top-level comment - computes a single dimension for plan stats: size in bytes. But why we populate rowCount as well here?

Copy link
Author

@singhpk234 singhpk234 Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this estimator i.e visitUnaryNode, we adjust the size by scaling it by (input row size / output row size) but since we don't have much info (in terms of min / max / ndv etc) to estimate the row count we just say the node output's child output row count which is mostly true for operators like project etc.

Since we were just computing sizeInBytes and just propagating rowCounts as it is.
Appologies I forgot to update the comment as per proposed behaviour.

Should I rephrase it to:

  • estimates size in bytes, row count for plan stats

Copy link
Author

@singhpk234 singhpk234 Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for dsv2 sources rowCount can be passed from the relation itself without running analyze, hence BasicStatsPlanVisitor which will be our default now, post this change will take rowcount into consideration.

+- ReusedExchange (28)
TakeOrderedAndProject (37)
+- * Project (36)
+- * SortMergeJoin Inner (35)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this expected ? looks like a plan regression to me

@zinking
Copy link

zinking commented Jul 25, 2022

BTW, with CBO off, where do we use row count?

we use it in places like :

override def visitOffset(p: Offset): Statistics = {
val offset = p.offsetExpr.eval().asInstanceOf[Int]
val childStats = p.child.stats
val rowCount: BigInt = childStats.rowCount.map(c => c - offset).map(_.max(0)).getOrElse(0)
Statistics(
sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
rowCount = Some(rowCount))
}

where we just multiply row-count with row size. We also use it for BF to create bloomFilterAgg. In v1 scenario in case of logical relation row-count can seep in from catalog stats but as you correctly pointed out it has a has a chance of row-count getting lost in places where we assume we only have sizeInBytes for example here :

override def default(p: LogicalPlan): Statistics = p match {
case p: LeafNode => p.computeStats()
case _: LogicalPlan =>
Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).filter(_ > 0L).product)
}

thought these stats are available in AQE and more accurate though

@github-actions
Copy link

github-actions bot commented Nov 3, 2022

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants