From 2631624ae820df66206d8bdcc2e1d84bedbc696a Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 26 Apr 2016 16:34:32 -0700 Subject: [PATCH] Don't advance beyond last valid value --- .../runners/dataflow/TestCountingSource.java | 4 ++-- .../dataflow/TestCountingSourceTest.java | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java index 226b3cb1c1e8..10631c2aa6bc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java @@ -173,7 +173,7 @@ public boolean start() { @Override public boolean advance() { - if (current >= numMessagesPerShard) { + if (current >= numMessagesPerShard - 1) { return false; } // If testing dedup, occasionally insert a duplicate value; @@ -181,7 +181,7 @@ public boolean advance() { return true; } current++; - return current < numMessagesPerShard; + return true; } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSourceTest.java index 9377905c277c..6ba060e8cfef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSourceTest.java @@ -53,4 +53,22 @@ public void testRespectsCheckpointContract() throws IOException { assertEquals(2L, (long) reader.getCurrent().getValue()); assertFalse(reader.advance()); } + + @Test + public void testCanResumeWithExpandedCount() throws IOException { + TestCountingSource source = new TestCountingSource(1); + PipelineOptions options = PipelineOptionsFactory.create(); + TestCountingSource.CountingSourceReader reader = + source.createReader(options, null /* no checkpoint */); + assertTrue(reader.start()); + assertEquals(0L, (long) reader.getCurrent().getValue()); + assertFalse(reader.advance()); + TestCountingSource.CounterMark checkpoint = reader.getCheckpointMark(); + checkpoint.finalizeCheckpoint(); + source = new TestCountingSource(2); + reader = source.createReader(options, checkpoint); + assertTrue(reader.start()); + assertEquals(1L, (long) reader.getCurrent().getValue()); + assertFalse(reader.advance()); + } }