-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark : Derive Stats From Manifest on the Fly #11615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hi, @huaxingao @karuppayya @aokolnychyi @RussellSpitzer Can you help review this PR |
| .tableProperty(TableProperties.DERIVE_STATS_FROM_MANIFEST_ENABLED) | ||
| .defaultValue(TableProperties.DERIVE_STATS_FROM_MANIFEST_ENABLED_DEFAULT) | ||
| .parse(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This table-level property takes precedence over the session configuration when it is turned off, enabling users to derive statistics only for a specific table.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| // extract min/max values from the manifests | ||
| private Map<Integer, Object> calculateMinMax( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may have errors if any delete files are present or if there are any non file covering predicates in the query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may also have issues if column stats for a particular column are not present
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
| return nullCount; | ||
| } | ||
|
|
||
| private Object toSparkType(Type type, Object value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we must have this in a helper function somewhere, I know we have to do similar tricks with UTF8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer we saw a similar conversion in the BaseReader:
iceberg/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
Line 209 in f2b1b91
| return Decimal.apply((BigDecimal) value); |
However, it is not extracted to some helper function. And in this case we don't need the logic for Strings/Binary as strings are not supported and binary don't support min/max
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
Outdated
Show resolved
Hide resolved
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some overall worries about our inaccuracy in our stats reporting here. I know based on truncation / collection we may not providing accurate stats for all columns and of course if delete vectors or equality deletes are present the stats will be incorrect.
@huaxingao do you have any thoughts on this? I know you have dealt with similar issues before on the Aggregate pushdowns.
|
@RussellSpitzer, thanks for the review comments,I will address them soon. As per @huaxingao implementation here , aggregate pushdown is skipped when row level deletes are detected, I have applied a similar change here as well. |
@RussellSpitzer Sorry I just saw this. I disable aggregate pushdowns if row level deletes are detected. @saitharun15 Is it possible to reuse some the aggregate pushdown code to get min/max? |
|
@huaxingao yes, it is possible to reuse the logic from the aggregate pushdown by reusing the AggregateEvaluator instead of the current code to aggregate from the manifests. Something along these lines: List<Expression> expressions = table.schema().columns().stream()
.map(field -> {
String colName = field.name(); // Extract the column name
// Create expressions for max and min non-null count
return List.of(
Expressions.min(colName),
Expressions.max(colName),
Expressions.count(colName)
);
})
.flatMap(List::stream) // Flatten the lists into a single stream
.collect(Collectors.toList());
AggregateEvaluator aggregateEvaluator = AggregateEvaluator.create(table.schema(),
expressions);
for (FileScanTask task : fileScanTasks) {
aggregateEvaluator.update(task.file());
}
// get the total row count to compute the number of null rows
long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
// populate the map with the results
StructLike res = aggregateEvaluator.result();
IntStream.range(0, table.schema().columns().size())
.forEach(i -> {
minValues.put(table.schema().columns().get(i).fieldId(), res.get(i*3, Object.class));
maxValues.put(table.schema().columns().get(i).fieldId(), res.get(i*3 + 1, Object.class));
nullCounts.put(table.schema().columns().get(i).fieldId(),
rowsCount - res.get(i*3 + 2, Long.class));
}); |
|
Hi @RussellSpitzer @huaxingao , We’ve updated the implementation to use AggregateEvaluator from the aggregate pushdown code. In summary, we prepare a list of expressions for min, max, and count for all columns, then update the evaluator with a set of data files. This returns the respective min, max, and count values. For nullCount, we calculate it by subtracting the count returned by the evaluator from the total number of records. Please review the changes. Thanks! |
|
@RussellSpitzer @huaxingao Just a friendly reminder, can you review the changes when you have a chance. Thanks! |
|
Hi @RussellSpitzer ,@huaxingao can u please review the pr once, Thanks! |
|
@huaxingao @RussellSpitzer friendly remainder, can you please review this PR. |
|
HI @huaxingao , @RussellSpitzer, kindly have a look at the PR and suggest any update if there. |
|
Sorry, I've been really busy recently. I'll try to take a look soon. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
|
Hi @RussellSpitzer could we please check this PR once? |
This PR helps to derives min,max,numOfNulls Statistics on the fly from manifest files to report back them to Spark.
Currently only Ndv is calculated and reported back to Spark Engine, which leads to inaccurate plans in Spark side since min,max,nullCount are returned as NULL
As there is a discussion still going on whether to store stats partition level or table level, even if we calculate them in either ways there would be an issue as per this comment in discussion #10791
These changes helps to enable the onFly collection of the stats using a table property or a session conf(by default it's false)
cc @guykhazma @jeesou