-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-52578][SQL] Add metrics for rows to track case and action in MergeRowsExec #51285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
0ad95e8 to
d20374e
Compare
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. This refactoring looks better than before because case class Keep with context is more generic. Thank you, @szehon-ho and @cloud-fan .
Merged to master for Apache Spark 4.1.0.
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late +1 with a few minor comments.
| override lazy val metrics: Map[String, SQLMetric] = Map( | ||
| "numTargetRowsCopied" -> SQLMetrics.createMetric(sparkContext, | ||
| "Number of target rows copied unmodified because they did not match any action.")) | ||
| "Number of target rows copied unmodified because they did not match any action"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: It seems like we call it "action" here and "clause" in all other metrics. It would be nice to align.
| } | ||
|
|
||
| // For group based merge, copy is inserted if row matches no other case | ||
| private def incrementCopyMetric(): Unit = longMetric("numTargetRowsCopied") += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the cost of doing this per each row? I know we implement some tricks for regular writes to update metrics only once per 100 rows. Do we need to worry about it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh good point. We should do the same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that one, I took a closer look. It look like it was from this discussion: #31451 (comment). In that case there was concern to get the metric from the DSV2 connector, to avoid calling currentMetricsValue so many times because the external implementation can be heavy.
In our case, getting the metric is in memory, so it should be quick.
On the sending end, it looks like SQLMetric is an accumulator and updating it just sets an in memory value as well.
So at first glance, I think adding complexity to update the metric per 100 rows may not be worth it. But I may be missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems to make sense to me. @cloud-fan, do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea I agree
|
|
||
| case split: SplitExec => | ||
| incrementUpdateMetric(sourcePresent) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Shall we drop the empty line?
|
|
||
| case _: DiscardExec => | ||
| incrementDeleteMetric(sourcePresent) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Shall we drop the empty line?
| case Delete => incrementDeleteMetric(sourcePresent) | ||
| case _ => throw new IllegalArgumentException( | ||
| s"Unexpected context for KeepExec: ${keep.context}") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Shall we drop the empty line?
|
thanks all for merge and reviews, @aokolnychyi @pan3793 thanks for additional comments, i will work on it in a follow up |
…rge metrics ### What changes were proposed in this pull request? Fix capitalization and extra new lines for new Merge metrics code ### Why are the changes needed? Very minor review comment follow up for #51285 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #51486 from szehon-ho/merge_metrics_follow. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

What changes were proposed in this pull request?
Add more metrics to MergeRowsExec: numTargetRowsInserted, numTargetRowsDeleted, numTargetRowsUpdated, numTargetRowsMatchedUpdated, numTargetRowsMatchedDeleted, numTargetRowsNotMatchedBySourceUpdated, numTargetRowsNotMatchedBySourceDeleted
Why are the changes needed?
Help debug-ability of MERGE INTO operation by tracking how many rows fall into each case + action
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add more unit test to MergeIntoTablesSuiteBase
Was this patch authored or co-authored using generative AI tooling?
No