From f2b7c60b52b92685388c60055eea194f04afe334 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Wed, 2 Mar 2016 20:49:53 -0800 Subject: [PATCH 1/2] [BEAM-90] TestCountingSource can throw on checkpointing --- .../runners/dataflow/TestCountingSource.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java index 181ddcae5bcc..750b2a0558b7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java @@ -27,6 +27,8 @@ import com.google.cloud.dataflow.sdk.values.KV; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -46,31 +48,41 @@ */ public class TestCountingSource extends UnboundedSource, TestCountingSource.CounterMark> { + private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class); + private static List finalizeTracker; private final int numMessagesPerShard; private final int shardNumber; private final boolean dedup; + private boolean throwOnFirstSnapshot; + private static boolean thrown = false; public static void setFinalizeTracker(List finalizeTracker) { TestCountingSource.finalizeTracker = finalizeTracker; } public TestCountingSource(int numMessagesPerShard) { - this(numMessagesPerShard, 0, false); + this(numMessagesPerShard, 0, false, false); } public TestCountingSource withDedup() { - return new TestCountingSource(numMessagesPerShard, shardNumber, true); + return new TestCountingSource(numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot); } private TestCountingSource withShardNumber(int shardNumber) { - return new TestCountingSource(numMessagesPerShard, shardNumber, dedup); + return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot); + } + + public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) { + return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot); } - private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup) { + private TestCountingSource( + int numMessagesPerShard, int shardNumber, boolean dedup, boolean throwOnFirstSnapshot) { this.numMessagesPerShard = numMessagesPerShard; this.shardNumber = shardNumber; this.dedup = dedup; + this.throwOnFirstSnapshot = throwOnFirstSnapshot; } public int getShardNumber() { @@ -187,6 +199,11 @@ public Instant getWatermark() { @Override public CheckpointMark getCheckpointMark() { + if (throwOnFirstSnapshot && !thrown) { + thrown = true; + LOG.error("Throwing exception while checkpointing counter"); + throw new RuntimeException("failed during checkpoint"); + } return new CounterMark(current); } From 5dc0c49b15527abffb606c5a7fe84ad7f0ea908a Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Thu, 3 Mar 2016 14:48:14 -0800 Subject: [PATCH 2/2] Address review comments. --- .../dataflow/sdk/runners/dataflow/TestCountingSource.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java index 750b2a0558b7..d0863a46f5b2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/dataflow/TestCountingSource.java @@ -54,7 +54,13 @@ public class TestCountingSource private final int numMessagesPerShard; private final int shardNumber; private final boolean dedup; - private boolean throwOnFirstSnapshot; + private final boolean throwOnFirstSnapshot; + + /** + * We only allow an exception to be thrown from getCheckpointMark + * at most once. This must be static since the entire TestCountingSource + * instance may re-serialized when the pipeline recovers and retries. + */ private static boolean thrown = false; public static void setFinalizeTracker(List finalizeTracker) {