From f00517e0771a4ca0a71170b7bd08146ffbf0bee2 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Mon, 25 Apr 2016 11:43:48 -0700 Subject: [PATCH 1/2] fix flaky KafkaIO test --- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 96ffc9859a21..1f533bac3228 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -58,6 +58,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -337,6 +338,18 @@ public Instant apply(KV input) { } } + // Kafka records are read in a separate thread inside the reader. As a result advance() might not + // read any records even from the mock consumer, especially for the first record. + // This is a helper method to loop until we read a record. + private static void advanceOnce(UnboundedReader reader) throws IOException { + int attempts = 0; + while (!reader.advance()) { + attempts++; + // very rarely will there be more than one attempts. + assertTrue("could not advance() even after 1000 attempts.", attempts < 1000); + } + } + @Test public void testUnboundedSourceCheckpointMark() throws Exception { int numElements = 85; // 85 to make sure some partitions have more records than other. @@ -350,16 +363,15 @@ public void testUnboundedSourceCheckpointMark() throws Exception { UnboundedReader> reader = source.createReader(null, null); final int numToSkip = 3; - // advance once: - assertTrue(reader.start()); - // Advance the source numToSkip-1 elements and manually save state. - for (long l = 0; l < numToSkip - 1; ++l) { - assertTrue(reader.advance()); + // advance numToSkip elements + for (long l = 0; l < numToSkip; ++l) { + if (l > 0 || !reader.start()) { + advanceOnce(reader); + } } // Confirm that we get the expected element in sequence before checkpointing. - assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue()); assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis()); @@ -367,14 +379,15 @@ public void testUnboundedSourceCheckpointMark() throws Exception { KafkaCheckpointMark mark = CoderUtils.clone( source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); reader = source.createReader(null, mark); - assertTrue(reader.start()); // Confirm that we get the next elements in sequence. // This also confirms that Reader interleaves records from each partitions by the reader. for (int i = numToSkip; i < numElements; i++) { + if (i > numToSkip || !reader.start()) { + advanceOnce(reader); + } assertEquals(i, (long) reader.getCurrent().getKV().getValue()); assertEquals(i, reader.getCurrentTimestamp().getMillis()); - reader.advance(); } } } From 768fcb7e1e0367c3928766c9a7fbabf7336b171f Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Mon, 25 Apr 2016 12:30:58 -0700 Subject: [PATCH 2/2] review comments : 'remove limit on advance() attempts' + one more --- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 1f533bac3228..f766d73a4aee 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -342,11 +342,9 @@ public Instant apply(KV input) { // read any records even from the mock consumer, especially for the first record. // This is a helper method to loop until we read a record. private static void advanceOnce(UnboundedReader reader) throws IOException { - int attempts = 0; while (!reader.advance()) { - attempts++; // very rarely will there be more than one attempts. - assertTrue("could not advance() even after 1000 attempts.", attempts < 1000); + // in case of a bug we might end up looping forever, and test will fail with a timeout. } } @@ -365,10 +363,12 @@ public void testUnboundedSourceCheckpointMark() throws Exception { final int numToSkip = 3; // advance numToSkip elements - for (long l = 0; l < numToSkip; ++l) { - if (l > 0 || !reader.start()) { - advanceOnce(reader); - } + if (!reader.start()) { + advanceOnce(reader); + } + + for (long l = 0; l < numToSkip - 1; ++l) { + advanceOnce(reader); } // Confirm that we get the expected element in sequence before checkpointing. @@ -382,12 +382,17 @@ public void testUnboundedSourceCheckpointMark() throws Exception { // Confirm that we get the next elements in sequence. // This also confirms that Reader interleaves records from each partitions by the reader. + + if (!reader.start()) { + advanceOnce(reader); + } + for (int i = numToSkip; i < numElements; i++) { - if (i > numToSkip || !reader.start()) { - advanceOnce(reader); - } assertEquals(i, (long) reader.getCurrent().getKV().getValue()); assertEquals(i, reader.getCurrentTimestamp().getMillis()); + if ((i + 1) < numElements) { + advanceOnce(reader); + } } } }