-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6967] Add clearJobStatus api in HoodieEngineContext #9899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); | ||
|
|
||
| List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable); | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be added. Key range loading has not finished here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code after here will not create new job, it seems OK to clear job status here. WDYT?
| } | ||
|
|
||
| // Now delete partially written files | ||
| context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why deleting this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This status will be overwritten in HoodieTable#deleteInvalidFilesByPartitions, so just delete it.
| // perform index loop up to get existing location of records | ||
| context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); | ||
| taggedRecords = tag(dedupedRecords, context, table); | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If lazy execution happens afterwards, the job status may not be properly populated. Have you verified all places that this won't happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check all lazy execution. For this one, "Tagging xxx" status will also populate todeduplicateRecords, but clear here will not affect other jobs, so we retain this line.
| partitionFsPair.getRight().getLeft(), keyGenerator)); | ||
| partitionFsPair.getRight().getLeft(), keyGenerator)); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method composes a DAG and is triggered by lazy execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove clearJobStatus of lazy execution in CommitActionExecutor, because it will clear job status finally:
Lines 499 to 505 in 051eb0e
| } finally { | |
| // close the write client in all cases | |
| val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, tableConfig, parameters, jsc.hadoopConfiguration()) | |
| val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, parameters) | |
| if (!asyncCompactionEnabled && !asyncClusteringEnabled) { | |
| log.info("Closing write client") | |
| writeClient.close() |
| } | ||
| return recordsAndPendingClusteringFileGroups.getLeft(); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check here for lazy execution too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| }; | ||
| }); | ||
| } finally { | ||
| engineContext.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be subject to lazy execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this clearJobStatus.
| }); | ||
| }); | ||
| } finally { | ||
| engineContext.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check this one too for lazy execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| new HoodieJsonPayload(genericRecord.toString())); | ||
| }); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| new HoodieJsonPayload(genericRecord.toString())); | ||
| }); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| executorOutputFs.getConf()); | ||
| }, parallelism); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be at the end of the method, correct? since context.foreach also triggers Spark stages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
| String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); | ||
|
|
||
| List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable); | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code after here will not create new job, it seems OK to clear job status here. WDYT?
| } | ||
|
|
||
| // Now delete partially written files | ||
| context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This status will be overwritten in HoodieTable#deleteInvalidFilesByPartitions, so just delete it.
| // perform index loop up to get existing location of records | ||
| context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); | ||
| taggedRecords = tag(dedupedRecords, context, table); | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check all lazy execution. For this one, "Tagging xxx" status will also populate todeduplicateRecords, but clear here will not affect other jobs, so we retain this line.
| partitionFsPair.getRight().getLeft(), keyGenerator)); | ||
| partitionFsPair.getRight().getLeft(), keyGenerator)); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove clearJobStatus of lazy execution in CommitActionExecutor, because it will clear job status finally:
Lines 499 to 505 in 051eb0e
| } finally { | |
| // close the write client in all cases | |
| val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, tableConfig, parameters, jsc.hadoopConfiguration()) | |
| val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, parameters) | |
| if (!asyncCompactionEnabled && !asyncClusteringEnabled) { | |
| log.info("Closing write client") | |
| writeClient.close() |
| } | ||
| return recordsAndPendingClusteringFileGroups.getLeft(); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| }; | ||
| }); | ||
| } finally { | ||
| engineContext.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this clearJobStatus.
| new HoodieJsonPayload(genericRecord.toString())); | ||
| }); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| executorOutputFs.getConf()); | ||
| }, parallelism); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
| return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); | ||
| return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); | ||
| } finally { | ||
| hoodieSparkContext.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one seems also lazy execution.
| new HoodieJsonPayload(genericRecord.toString())); | ||
| }); | ||
| } finally { | ||
| context.clearJobStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Change Logs
Impact
Fix the incorrect job group and descriptions.
Risk level (write none, low medium or high below)
None.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist