Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ project(':iceberg-core') {
implementation libs.jackson.databind
implementation libs.caffeine
implementation libs.roaringbitmap
implementation libs.failsafe
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀

compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
Expand Down
106 changes: 40 additions & 66 deletions core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
Expand All @@ -50,6 +47,7 @@
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -228,71 +226,45 @@ private FileIO scanFileIO(List<Credential> storageCredentials) {
}

private CloseableIterable<FileScanTask> fetchPlanningResult() {
RetryPolicy<FetchPlanningResultResponse> retryPolicy =
RetryPolicy.<FetchPlanningResultResponse>builder()
.handleResultIf(response -> response.planStatus() == PlanStatus.SUBMITTED)
.withBackoff(
Duration.ofMillis(MIN_SLEEP_MS), Duration.ofMillis(MAX_SLEEP_MS), SCALE_FACTOR)
.withJitter(0.1) // Add jitter up to 10% of the calculated delay
.withMaxAttempts(MAX_ATTEMPTS)
.withMaxDuration(Duration.ofMillis(MAX_WAIT_TIME_MS))
.onFailedAttempt(
e -> {
// Log when a retry occurs
LOG.debug(
"Plan {} still SUBMITTED (Attempt {}/{}). Previous attempt took {} ms.",
planId,
e.getAttemptCount(),
MAX_ATTEMPTS,
e.getElapsedAttemptTime().toMillis());
})
.onFailure(
e -> {
LOG.warn(
"Polling for plan {} failed due to: {}",
planId,
e.getException().getMessage());
cleanupPlanResources();
})
.build();
AtomicReference<FetchPlanningResultResponse> result = new AtomicReference<>();
Tasks.foreach(planId)
.exponentialBackoff(MIN_SLEEP_MS, MAX_SLEEP_MS, MAX_WAIT_TIME_MS, SCALE_FACTOR)
.retry(MAX_ATTEMPTS)
Comment thread
singhpk234 marked this conversation as resolved.
.onlyRetryOn(NotCompleteException.class)
.onFailure(
(id, err) -> {
LOG.warn("Planning failed for plan ID: {}", id, err);
cleanupPlanResources();
})
.throwFailureWhenFinished()
.run(
id -> {
FetchPlanningResultResponse response =
client.get(
resourcePaths.plan(tableIdentifier, id),
headers,
FetchPlanningResultResponse.class,
headers,
ErrorHandlers.planErrorHandler(),
parserContext);

try {
FetchPlanningResultResponse response =
Failsafe.with(retryPolicy)
.get(
() ->
client.get(
resourcePaths.plan(tableIdentifier, planId),
headers,
FetchPlanningResultResponse.class,
headers,
ErrorHandlers.planErrorHandler(),
parserContext));
Preconditions.checkState(
response.planStatus() == PlanStatus.COMPLETED,
"Plan finished with unexpected status %s for planId: %s",
response.planStatus(),
planId);
if (response.planStatus() == PlanStatus.SUBMITTED) {
throw new NotCompleteException();
} else if (response.planStatus() != PlanStatus.COMPLETED) {
throw new IllegalStateException(
String.format(
"Invalid planStatus: %s for planId: %s", response.planStatus(), id));
}

this.scanFileIO =
!response.credentials().isEmpty() ? scanFileIO(response.credentials()) : table().io();
result.set(response);
});

return scanTasksIterable(response.planTasks(), response.fileScanTasks());
} catch (FailsafeException e) {
// FailsafeException is thrown when retries are exhausted (Max Attempts/Duration)
// Cleanup is handled by the .onFailure() hook, so we just wrap and rethrow.
throw new IllegalStateException(
String.format("Polling timed out or exceeded max attempts for planId: %s.", planId), e);
} catch (Exception e) {
// Catch any immediate non-retryable exceptions (e.g., I/O errors, auth errors)
try {
cleanupPlanResources();
} catch (Exception cancelException) {
// Ignore cancellation failures during exception handling
e.addSuppressed(cancelException);
}
throw e;
}
FetchPlanningResultResponse response = result.get();

this.scanFileIO =
!response.credentials().isEmpty() ? scanFileIO(response.credentials()) : table().io();

return scanTasksIterable(response.planTasks(), response.fileScanTasks());
}

private CloseableIterable<FileScanTask> scanTasksIterable(
Expand Down Expand Up @@ -342,4 +314,6 @@ public boolean cancelPlan() {
return false;
}
}

private static class NotCompleteException extends RuntimeException {}
}