diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 96ffc9859a21..f766d73a4aee 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -58,6 +58,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -337,6 +338,16 @@ public Instant apply(KV input) { } } + // Kafka records are read in a separate thread inside the reader. As a result advance() might not + // read any records even from the mock consumer, especially for the first record. + // This is a helper method to loop until we read a record. + private static void advanceOnce(UnboundedReader reader) throws IOException { + while (!reader.advance()) { + // very rarely will there be more than one attempts. + // in case of a bug we might end up looping forever, and test will fail with a timeout. + } + } + @Test public void testUnboundedSourceCheckpointMark() throws Exception { int numElements = 85; // 85 to make sure some partitions have more records than other. @@ -350,16 +361,17 @@ public void testUnboundedSourceCheckpointMark() throws Exception { UnboundedReader> reader = source.createReader(null, null); final int numToSkip = 3; - // advance once: - assertTrue(reader.start()); - // Advance the source numToSkip-1 elements and manually save state. + // advance numToSkip elements + if (!reader.start()) { + advanceOnce(reader); + } + for (long l = 0; l < numToSkip - 1; ++l) { - assertTrue(reader.advance()); + advanceOnce(reader); } // Confirm that we get the expected element in sequence before checkpointing. - assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue()); assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis()); @@ -367,14 +379,20 @@ public void testUnboundedSourceCheckpointMark() throws Exception { KafkaCheckpointMark mark = CoderUtils.clone( source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); reader = source.createReader(null, mark); - assertTrue(reader.start()); // Confirm that we get the next elements in sequence. // This also confirms that Reader interleaves records from each partitions by the reader. + + if (!reader.start()) { + advanceOnce(reader); + } + for (int i = numToSkip; i < numElements; i++) { assertEquals(i, (long) reader.getCurrent().getKV().getValue()); assertEquals(i, reader.getCurrentTimestamp().getMillis()); - reader.advance(); + if ((i + 1) < numElements) { + advanceOnce(reader); + } } } }