diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 4680304d5a6c..58b2548900de 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -151,9 +151,8 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { /* Read first record (if any). we need to loop here because : - * - (a) some records initially need to be skipped if they are before consumedOffset - * - (b) if curBatch is empty, we want to fetch next batch and then advance. - * - (c) curBatch is an iterator of iterators. we interleave the records from each. + * - (a) if curBatch is empty, we want to fetch next batch and then advance. + * - (b) curBatch is an iterator of iterators. we interleave the records from each. * curBatch.next() might return an empty iterator. */ while (true) { @@ -163,7 +162,7 @@ public boolean advance() throws IOException { PartitionState pState = curBatch.next(); - if (!pState.recordIter.hasNext()) { // -- (c) + if (!pState.recordIter.hasNext()) { // -- (b) pState.recordIter = Collections.emptyIterator(); // drop ref curBatch.remove(); continue; @@ -173,19 +172,6 @@ public boolean advance() throws IOException { elementsReadBySplit.inc(); ConsumerRecord rawRecord = pState.recordIter.next(); - long expected = pState.nextOffset; - long offset = rawRecord.offset(); - - if (offset < expected) { // -- (a) - // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10) - // should we check if the offset is way off from consumedOffset (say > 1M)? - LOG.warn( - "{}: ignoring already consumed offset {} for {}", - this, - offset, - pState.topicPartition); - continue; - } // Apply user deserializers. User deserializers might throw, which will be propagated up // and 'curRecord' remains unchanged. The runner should close this reader. @@ -212,7 +198,7 @@ public boolean advance() throws IOException { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - pState.recordConsumed(offset, recordSize); + pState.recordConsumed(rawRecord.offset(), recordSize); bytesRead.inc(recordSize); bytesReadBySplit.inc(recordSize); @@ -223,7 +209,7 @@ public boolean advance() throws IOException { kafkaResults.flushBufferedMetrics(); return true; - } else { // -- (b) + } else { // -- (a) kafkaResults = KafkaSinkMetrics.kafkaMetrics(); nextBatch(); kafkaResults.flushBufferedMetrics(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index cd3ad2526d30..70015847e19d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -588,12 +588,10 @@ public ProcessContinuation processElement( topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark())); } - long startOffset = tracker.currentRestriction().getFrom(); - long expectedOffset = startOffset; + long expectedOffset = tracker.currentRestriction().getFrom(); consumer.resume(Collections.singleton(topicPartition)); - consumer.seek(topicPartition, startOffset); - long skippedRecords = 0L; - final Stopwatch sw = Stopwatch.createStarted(); + consumer.seek(topicPartition, expectedOffset); + final Stopwatch pollTimer = Stopwatch.createUnstarted(); final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); try { @@ -602,7 +600,7 @@ public ProcessContinuation processElement( // A consumer will often have prefetches waiting to be returned immediately in which case // this timer may contribute more latency than it measures. // See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information. - final Stopwatch pollTimer = Stopwatch.createStarted(); + pollTimer.reset().start(); // Fetch the next records. final ConsumerRecords rawRecords = consumer.poll(this.consumerPollingTimeout); @@ -627,37 +625,6 @@ public ProcessContinuation processElement( // Visible progress within the consumer polling timeout. // Partially or fully claim and process records in this batch. for (ConsumerRecord rawRecord : rawRecords) { - // If the Kafka consumer returns a record with an offset that is already processed - // the record can be safely skipped. This is needed because there is a possibility - // that the seek() above fails to move the offset to the desired position. In which - // case poll() would return records that are already cnsumed. - if (rawRecord.offset() < startOffset) { - // If the start offset is not reached even after skipping the records for 10 seconds - // then the processing is stopped with a backoff to give the Kakfa server some time - // catch up. - if (sw.elapsed().getSeconds() > 10L) { - LOG.error( - "The expected offset ({}) was not reached even after" - + " skipping consumed records for 10 seconds. The offset we could" - + " reach was {}. The processing of this bundle will be attempted" - + " at a later time.", - expectedOffset, - rawRecord.offset()); - consumer.pause(Collections.singleton(topicPartition)); - return ProcessContinuation.resume() - .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); - } - skippedRecords++; - continue; - } - if (skippedRecords > 0L) { - LOG.warn( - "{} records were skipped due to seek returning an" - + " earlier position than requested position of {}", - skippedRecords, - expectedOffset); - skippedRecords = 0L; - } if (!tracker.tryClaim(rawRecord.offset())) { consumer.seek(topicPartition, rawRecord.offset()); consumer.pause(Collections.singleton(topicPartition)); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 2504188d97e2..4d22b1d6ea96 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.nio.charset.StandardCharsets; @@ -525,12 +526,9 @@ public void testProcessElementWithEarlierOffset() throws Exception { new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3)); KafkaSourceDescriptor descriptor = KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null); - ProcessContinuation result = - dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver); - assertEquals(ProcessContinuation.stop(), result); - assertEquals( - createExpectedRecords(descriptor, startOffset, 3, "key", "value"), - receiver.getGoodRecords()); + assertThrows( + IllegalArgumentException.class, + () -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver)); } @Test