Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,8 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
/* Read first record (if any). we need to loop here because :
* - (a) some records initially need to be skipped if they are before consumedOffset
* - (b) if curBatch is empty, we want to fetch next batch and then advance.
* - (c) curBatch is an iterator of iterators. we interleave the records from each.
* - (a) if curBatch is empty, we want to fetch next batch and then advance.
* - (b) curBatch is an iterator of iterators. we interleave the records from each.
* curBatch.next() might return an empty iterator.
*/
while (true) {
Expand All @@ -163,7 +162,7 @@ public boolean advance() throws IOException {

PartitionState<K, V> pState = curBatch.next();

if (!pState.recordIter.hasNext()) { // -- (c)
if (!pState.recordIter.hasNext()) { // -- (b)
pState.recordIter = Collections.emptyIterator(); // drop ref
curBatch.remove();
continue;
Expand All @@ -173,19 +172,6 @@ public boolean advance() throws IOException {
elementsReadBySplit.inc();

ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
long expected = pState.nextOffset;
long offset = rawRecord.offset();

if (offset < expected) { // -- (a)
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
// should we check if the offset is way off from consumedOffset (say > 1M)?
LOG.warn(
"{}: ignoring already consumed offset {} for {}",
this,
offset,
pState.topicPartition);
continue;
}

// Apply user deserializers. User deserializers might throw, which will be propagated up
// and 'curRecord' remains unchanged. The runner should close this reader.
Expand All @@ -212,7 +198,7 @@ public boolean advance() throws IOException {
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
pState.recordConsumed(offset, recordSize);
pState.recordConsumed(rawRecord.offset(), recordSize);
bytesRead.inc(recordSize);
bytesReadBySplit.inc(recordSize);

Expand All @@ -223,7 +209,7 @@ public boolean advance() throws IOException {

kafkaResults.flushBufferedMetrics();
return true;
} else { // -- (b)
} else { // -- (a)
kafkaResults = KafkaSinkMetrics.kafkaMetrics();
nextBatch();
kafkaResults.flushBufferedMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,10 @@ public ProcessContinuation processElement(
topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
}

long startOffset = tracker.currentRestriction().getFrom();
long expectedOffset = startOffset;
long expectedOffset = tracker.currentRestriction().getFrom();
consumer.resume(Collections.singleton(topicPartition));
consumer.seek(topicPartition, startOffset);
long skippedRecords = 0L;
final Stopwatch sw = Stopwatch.createStarted();
consumer.seek(topicPartition, expectedOffset);
final Stopwatch pollTimer = Stopwatch.createUnstarted();

final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
Expand All @@ -602,7 +600,7 @@ public ProcessContinuation processElement(
// A consumer will often have prefetches waiting to be returned immediately in which case
// this timer may contribute more latency than it measures.
// See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information.
final Stopwatch pollTimer = Stopwatch.createStarted();
pollTimer.reset().start();
// Fetch the next records.
final ConsumerRecords<byte[], byte[]> rawRecords =
consumer.poll(this.consumerPollingTimeout);
Expand All @@ -627,37 +625,6 @@ public ProcessContinuation processElement(
// Visible progress within the consumer polling timeout.
// Partially or fully claim and process records in this batch.
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
// If the Kafka consumer returns a record with an offset that is already processed
// the record can be safely skipped. This is needed because there is a possibility
// that the seek() above fails to move the offset to the desired position. In which
// case poll() would return records that are already cnsumed.
if (rawRecord.offset() < startOffset) {
// If the start offset is not reached even after skipping the records for 10 seconds
// then the processing is stopped with a backoff to give the Kakfa server some time
// catch up.
if (sw.elapsed().getSeconds() > 10L) {
LOG.error(
"The expected offset ({}) was not reached even after"
+ " skipping consumed records for 10 seconds. The offset we could"
+ " reach was {}. The processing of this bundle will be attempted"
+ " at a later time.",
expectedOffset,
rawRecord.offset());
consumer.pause(Collections.singleton(topicPartition));
return ProcessContinuation.resume()
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
}
skippedRecords++;
continue;
}
if (skippedRecords > 0L) {
LOG.warn(
"{} records were skipped due to seek returning an"
+ " earlier position than requested position of {}",
skippedRecords,
expectedOffset);
skippedRecords = 0L;
}
if (!tracker.tryClaim(rawRecord.offset())) {
consumer.seek(topicPartition, rawRecord.offset());
consumer.pause(Collections.singleton(topicPartition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -525,12 +526,9 @@ public void testProcessElementWithEarlierOffset() throws Exception {
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
KafkaSourceDescriptor descriptor =
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
ProcessContinuation result =
dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver);
assertEquals(ProcessContinuation.stop(), result);
assertEquals(
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
receiver.getGoodRecords());
assertThrows(
IllegalArgumentException.class,
() -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver));
}

@Test
Expand Down
Loading