From f4cebdfce11e3cecf213da41fc88f82e2565d0e1 Mon Sep 17 00:00:00 2001 From: puru Date: Thu, 2 Feb 2023 23:50:14 +0530 Subject: [PATCH 1/2] avoid creating new instants for rollbacks --- .../org/apache/hudi/client/BaseHoodieTableServiceClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 390bc4b97148a..e4d99661017d8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -734,7 +734,7 @@ protected List getInstantsToRollback(HoodieTableMetaClient metaClient, H @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { LOG.info("Begin rollback of instant " + commitInstantTime); - final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); + final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(commitInstantTime); final Timer.Context timerContext = this.metrics.getRollbackCtx(); try { HoodieTable table = createTable(config, hadoopConf); From 2ec476d8ac621dcb87b10cad701f6bb3febac8a4 Mon Sep 17 00:00:00 2001 From: puru Date: Sun, 5 Feb 2023 00:19:51 +0530 Subject: [PATCH 2/2] perform rollback before initiating commit instant --- .../apache/hudi/client/BaseHoodieTableServiceClient.java | 2 +- .../org/apache/hudi/client/BaseHoodieWriteClient.java | 9 +++++++++ .../apache/hudi/utilities/deltastreamer/DeltaSync.java | 5 +---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index e4d99661017d8..390bc4b97148a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -734,7 +734,7 @@ protected List getInstantsToRollback(HoodieTableMetaClient metaClient, H @Deprecated public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { LOG.info("Begin rollback of instant " + commitInstantTime); - final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(commitInstantTime); + final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); final Timer.Context timerContext = this.metrics.getRollbackCtx(); try { HoodieTable table = createTable(config, hadoopConf); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 17956479762e3..7f9f8504ea463 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -811,6 +811,15 @@ public String startCommit(String actionType, HoodieTableMetaClient metaClient) { return instantTime; } + + /** + * Provides a new commit time for a write operation (insert/update/delete/insert_overwrite/insert_overwrite_table) with specified action. + */ + public String startCommit(String actionType) { + HoodieTableMetaClient metaClient = createMetaClient(true); + return startCommit(actionType, metaClient); + } + /** * Provides a new commit time for a write operation (insert/update/delete/insert_overwrite/insert_overwrite_table) without specified action. * @param instantTime Instant time to be generated diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 8c010e9484dc6..2a8ea1a17b507 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -55,7 +55,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; @@ -759,10 +758,8 @@ private String startCommit() { RuntimeException lastException = null; while (retryNum <= maxRetries) { try { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); - writeClient.startCommitWithTime(instantTime, commitActionType); - return instantTime; + return writeClient.startCommit(commitActionType); } catch (IllegalArgumentException ie) { lastException = ie; LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie);