Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -337,6 +338,16 @@ public Instant apply(KV<byte[], Long> input) {
}
}

// Kafka records are read in a separate thread inside the reader. As a result advance() might not
// read any records even from the mock consumer, especially for the first record.
// This is a helper method to loop until we read a record.
private static void advanceOnce(UnboundedReader<?> reader) throws IOException {
while (!reader.advance()) {
// very rarely will there be more than one attempts.
// in case of a bug we might end up looping forever, and test will fail with a timeout.
}
}

@Test
public void testUnboundedSourceCheckpointMark() throws Exception {
int numElements = 85; // 85 to make sure some partitions have more records than other.
Expand All @@ -350,31 +361,38 @@ public void testUnboundedSourceCheckpointMark() throws Exception {

UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
final int numToSkip = 3;
// advance once:
assertTrue(reader.start());

// Advance the source numToSkip-1 elements and manually save state.
// advance numToSkip elements
if (!reader.start()) {
advanceOnce(reader);
}

for (long l = 0; l < numToSkip - 1; ++l) {
assertTrue(reader.advance());
advanceOnce(reader);
}

// Confirm that we get the expected element in sequence before checkpointing.

assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());

// Checkpoint and restart, and confirm that the source continues correctly.
KafkaCheckpointMark mark = CoderUtils.clone(
source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
reader = source.createReader(null, mark);
assertTrue(reader.start());

// Confirm that we get the next elements in sequence.
// This also confirms that Reader interleaves records from each partitions by the reader.

if (!reader.start()) {
advanceOnce(reader);
}

for (int i = numToSkip; i < numElements; i++) {
assertEquals(i, (long) reader.getCurrent().getKV().getValue());
assertEquals(i, reader.getCurrentTimestamp().getMillis());
reader.advance();
if ((i + 1) < numElements) {
advanceOnce(reader);
}
}
}
}