diff --git a/build.gradle b/build.gradle index 765a7fe8203e..52d25bc33b51 100644 --- a/build.gradle +++ b/build.gradle @@ -379,7 +379,6 @@ project(':iceberg-core') { implementation libs.jackson.databind implementation libs.caffeine implementation libs.roaringbitmap - implementation libs.failsafe compileOnly(libs.hadoop3.client) { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java index f8860c42d7e1..f533f2c87f22 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java @@ -21,13 +21,10 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; -import dev.failsafe.Failsafe; -import dev.failsafe.FailsafeException; -import dev.failsafe.RetryPolicy; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -50,6 +47,7 @@ import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,71 +226,45 @@ private FileIO scanFileIO(List storageCredentials) { } private CloseableIterable fetchPlanningResult() { - RetryPolicy retryPolicy = - RetryPolicy.builder() - .handleResultIf(response -> response.planStatus() == PlanStatus.SUBMITTED) - .withBackoff( - Duration.ofMillis(MIN_SLEEP_MS), Duration.ofMillis(MAX_SLEEP_MS), SCALE_FACTOR) - .withJitter(0.1) // Add jitter up to 10% of the calculated delay - .withMaxAttempts(MAX_ATTEMPTS) - .withMaxDuration(Duration.ofMillis(MAX_WAIT_TIME_MS)) - .onFailedAttempt( - e -> { - // Log when a retry occurs - LOG.debug( - "Plan {} still SUBMITTED (Attempt {}/{}). Previous attempt took {} ms.", - planId, - e.getAttemptCount(), - MAX_ATTEMPTS, - e.getElapsedAttemptTime().toMillis()); - }) - .onFailure( - e -> { - LOG.warn( - "Polling for plan {} failed due to: {}", - planId, - e.getException().getMessage()); - cleanupPlanResources(); - }) - .build(); + AtomicReference result = new AtomicReference<>(); + Tasks.foreach(planId) + .exponentialBackoff(MIN_SLEEP_MS, MAX_SLEEP_MS, MAX_WAIT_TIME_MS, SCALE_FACTOR) + .retry(MAX_ATTEMPTS) + .onlyRetryOn(NotCompleteException.class) + .onFailure( + (id, err) -> { + LOG.warn("Planning failed for plan ID: {}", id, err); + cleanupPlanResources(); + }) + .throwFailureWhenFinished() + .run( + id -> { + FetchPlanningResultResponse response = + client.get( + resourcePaths.plan(tableIdentifier, id), + headers, + FetchPlanningResultResponse.class, + headers, + ErrorHandlers.planErrorHandler(), + parserContext); - try { - FetchPlanningResultResponse response = - Failsafe.with(retryPolicy) - .get( - () -> - client.get( - resourcePaths.plan(tableIdentifier, planId), - headers, - FetchPlanningResultResponse.class, - headers, - ErrorHandlers.planErrorHandler(), - parserContext)); - Preconditions.checkState( - response.planStatus() == PlanStatus.COMPLETED, - "Plan finished with unexpected status %s for planId: %s", - response.planStatus(), - planId); + if (response.planStatus() == PlanStatus.SUBMITTED) { + throw new NotCompleteException(); + } else if (response.planStatus() != PlanStatus.COMPLETED) { + throw new IllegalStateException( + String.format( + "Invalid planStatus: %s for planId: %s", response.planStatus(), id)); + } - this.scanFileIO = - !response.credentials().isEmpty() ? scanFileIO(response.credentials()) : table().io(); + result.set(response); + }); - return scanTasksIterable(response.planTasks(), response.fileScanTasks()); - } catch (FailsafeException e) { - // FailsafeException is thrown when retries are exhausted (Max Attempts/Duration) - // Cleanup is handled by the .onFailure() hook, so we just wrap and rethrow. - throw new IllegalStateException( - String.format("Polling timed out or exceeded max attempts for planId: %s.", planId), e); - } catch (Exception e) { - // Catch any immediate non-retryable exceptions (e.g., I/O errors, auth errors) - try { - cleanupPlanResources(); - } catch (Exception cancelException) { - // Ignore cancellation failures during exception handling - e.addSuppressed(cancelException); - } - throw e; - } + FetchPlanningResultResponse response = result.get(); + + this.scanFileIO = + !response.credentials().isEmpty() ? scanFileIO(response.credentials()) : table().io(); + + return scanTasksIterable(response.planTasks(), response.fileScanTasks()); } private CloseableIterable scanTasksIterable( @@ -342,4 +314,6 @@ public boolean cancelPlan() { return false; } } + + private static class NotCompleteException extends RuntimeException {} }