Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ RequestResponseIO<RequestT, ResponseT> withSleeperSupplier(SerializableSupplier<
* need for a {@link SerializableSupplier} instead of setting this directly is that some {@link
* BackOff} implementations, such as {@link FluentBackoff} are not {@link Serializable}.
*/
RequestResponseIO<RequestT, ResponseT> withBackOffSupplier(SerializableSupplier<BackOff> value) {
public RequestResponseIO<RequestT, ResponseT> withBackOffSupplier(
SerializableSupplier<BackOff> value) {
return new RequestResponseIO<>(
rrioConfiguration, callConfiguration.toBuilder().setBackOffSupplier(value).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.Matchers.greaterThan;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -40,6 +41,7 @@
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.SerializableSupplier;
Expand Down Expand Up @@ -333,6 +335,36 @@ public void givenCustomBackoff_thenBackoffBehaviorCustom() {
greaterThan(0L));
}

@Test
public void givenBoundedBackoff_thenRetriesStopAfterLimit() {
int maxRetries = 3;
Caller<Request, Response> caller = new CallerImpl(5);
SerializableSupplier<BackOff> boundedBackoffSupplier = () -> new BoundedBackOff(maxRetries);

Result<Response> result =
requests()
.apply(
"rrio",
RequestResponseIO.of(caller, RESPONSE_CODER)
.withBackOffSupplier(boundedBackoffSupplier)
.withMonitoringConfiguration(
Monitoring.builder().setCountCalls(true).setCountFailures(true).build()));

PAssert.that(result.getResponses()).empty();
PAssert.thatSingleton(result.getFailures().apply("CountFailures", Count.globally()))
.isEqualTo(1L);

PipelineResult pipelineResult = pipeline.run();
MetricResults metrics = pipelineResult.metrics();
pipelineResult.waitUntilFinish();

assertThat(
getCounterResult(metrics, Call.class, Monitoring.callCounterNameOf(caller)),
equalTo((long) maxRetries + 1));
assertThat(
getCounterResult(metrics, Call.class, Monitoring.FAILURES_COUNTER_NAME), equalTo(1L));
}

// TODO(damondouglas): Count metrics of caching after https://github.com/apache/beam/issues/29888
// resolves.
@Ignore
Expand Down Expand Up @@ -463,6 +495,29 @@ MetricName getCounterName() {
}
}

private static class BoundedBackOff implements BackOff, Serializable {
private final int maxRetries;
private int retries = 0;

private BoundedBackOff(int maxRetries) {
this.maxRetries = maxRetries;
}

@Override
public void reset() {
retries = 0;
}

@Override
public long nextBackOffMillis() {
if (retries >= maxRetries) {
return BackOff.STOP;
}
retries++;
return 0L;
}
}

private static class CustomBackOffSupplier implements SerializableSupplier<BackOff> {

private final Counter counter = Metrics.counter(CustomBackOffSupplier.class, "custom_counter");
Expand Down
Loading