diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java index 8a04f76b6829..7033bdf40111 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RequestResponseIO.java @@ -229,7 +229,8 @@ RequestResponseIO withSleeperSupplier(SerializableSupplier< * need for a {@link SerializableSupplier} instead of setting this directly is that some {@link * BackOff} implementations, such as {@link FluentBackoff} are not {@link Serializable}. */ - RequestResponseIO withBackOffSupplier(SerializableSupplier value) { + public RequestResponseIO withBackOffSupplier( + SerializableSupplier value) { return new RequestResponseIO<>( rrioConfiguration, callConfiguration.toBuilder().setBackOffSupplier(value).build()); } diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java index cd0b29bab661..5a199225f396 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.greaterThan; import com.google.auto.value.AutoValue; +import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.Coder; @@ -40,6 +41,7 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.SerializableSupplier; @@ -333,6 +335,36 @@ public void givenCustomBackoff_thenBackoffBehaviorCustom() { greaterThan(0L)); } + @Test + public void givenBoundedBackoff_thenRetriesStopAfterLimit() { + int maxRetries = 3; + Caller caller = new CallerImpl(5); + SerializableSupplier boundedBackoffSupplier = () -> new BoundedBackOff(maxRetries); + + Result result = + requests() + .apply( + "rrio", + RequestResponseIO.of(caller, RESPONSE_CODER) + .withBackOffSupplier(boundedBackoffSupplier) + .withMonitoringConfiguration( + Monitoring.builder().setCountCalls(true).setCountFailures(true).build())); + + PAssert.that(result.getResponses()).empty(); + PAssert.thatSingleton(result.getFailures().apply("CountFailures", Count.globally())) + .isEqualTo(1L); + + PipelineResult pipelineResult = pipeline.run(); + MetricResults metrics = pipelineResult.metrics(); + pipelineResult.waitUntilFinish(); + + assertThat( + getCounterResult(metrics, Call.class, Monitoring.callCounterNameOf(caller)), + equalTo((long) maxRetries + 1)); + assertThat( + getCounterResult(metrics, Call.class, Monitoring.FAILURES_COUNTER_NAME), equalTo(1L)); + } + // TODO(damondouglas): Count metrics of caching after https://github.com/apache/beam/issues/29888 // resolves. @Ignore @@ -463,6 +495,29 @@ MetricName getCounterName() { } } + private static class BoundedBackOff implements BackOff, Serializable { + private final int maxRetries; + private int retries = 0; + + private BoundedBackOff(int maxRetries) { + this.maxRetries = maxRetries; + } + + @Override + public void reset() { + retries = 0; + } + + @Override + public long nextBackOffMillis() { + if (retries >= maxRetries) { + return BackOff.STOP; + } + retries++; + return 0L; + } + } + private static class CustomBackOffSupplier implements SerializableSupplier { private final Counter counter = Metrics.counter(CustomBackOffSupplier.class, "custom_counter");