From 56a33c8d028588655b062d86e6d63125be493fe0 Mon Sep 17 00:00:00 2001 From: ffccites <99155080+PDGGK@users.noreply.github.com> Date: Tue, 20 Jan 2026 19:37:19 +0800 Subject: [PATCH] [#37198] Make withBackOffSupplier public to enable bounded retry configuration Users need to configure bounded backoff to prevent infinite retry loops. Making withBackOffSupplier public allows users to set FluentBackoff.DEFAULT.withMaxRetries(n) and control retry behavior. Changes: - Changed withBackOffSupplier() visibility from package-private to public - Added comprehensive integration test with zero-delay BoundedBackOff - Test verifies: responses empty, 1 failure emitted, call count = maxRetries+1 The test uses a serializable BoundedBackOff class with assertions on both PAssert (pipeline outputs) and Metrics (retry counts) to ensure bounded retry behavior works correctly. Fixes #37198 Related to #37176 Co-Authored-By: Claude Sonnet 4.5 --- .../io/requestresponse/RequestResponseIO.java | 3 +- .../RequestResponseIOTest.java | 55 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java index 8a04f76b6829..7033bdf40111 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java @@ -229,7 +229,8 @@ RequestResponseIO withSleeperSupplier(SerializableSupplier< * need for a {@link SerializableSupplier} instead of setting this directly is that some {@link * BackOff} implementations, such as {@link FluentBackoff} are not {@link Serializable}. */ - RequestResponseIO withBackOffSupplier(SerializableSupplier value) { + public RequestResponseIO withBackOffSupplier( + SerializableSupplier value) { return new RequestResponseIO<>( rrioConfiguration, callConfiguration.toBuilder().setBackOffSupplier(value).build()); } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java index cd0b29bab661..5a199225f396 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.greaterThan; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.Coder; @@ -40,6 +41,7 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.SerializableSupplier; @@ -333,6 +335,36 @@ public void givenCustomBackoff_thenBackoffBehaviorCustom() { greaterThan(0L)); } + @Test + public void givenBoundedBackoff_thenRetriesStopAfterLimit() { + int maxRetries = 3; + Caller caller = new CallerImpl(5); + SerializableSupplier boundedBackoffSupplier = () -> new BoundedBackOff(maxRetries); + + Result result = + requests() + .apply( + "rrio", + RequestResponseIO.of(caller, RESPONSE_CODER) + .withBackOffSupplier(boundedBackoffSupplier) + .withMonitoringConfiguration( + Monitoring.builder().setCountCalls(true).setCountFailures(true).build())); + + PAssert.that(result.getResponses()).empty(); + PAssert.thatSingleton(result.getFailures().apply("CountFailures", Count.globally())) + .isEqualTo(1L); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + + assertThat( + getCounterResult(metrics, Call.class, Monitoring.callCounterNameOf(caller)), + equalTo((long) maxRetries + 1)); + assertThat( + getCounterResult(metrics, Call.class, Monitoring.FAILURES_COUNTER_NAME), equalTo(1L)); + } + // TODO(damondouglas): Count metrics of caching after https://github.com/apache/beam/issues/29888 // resolves. @Ignore @@ -463,6 +495,29 @@ MetricName getCounterName() { } } + private static class BoundedBackOff implements BackOff, Serializable { + private final int maxRetries; + private int retries = 0; + + private BoundedBackOff(int maxRetries) { + this.maxRetries = maxRetries; + } + + @Override + public void reset() { + retries = 0; + } + + @Override + public long nextBackOffMillis() { + if (retries >= maxRetries) { + return BackOff.STOP; + } + retries++; + return 0L; + } + } + private static class CustomBackOffSupplier implements SerializableSupplier { private final Counter counter = Metrics.counter(CustomBackOffSupplier.class, "custom_counter");