Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4169106
fix issue : KafkaConsumer cannot jump out of the poll method, and the…
RivenSun2 Sep 19, 2021
cd84dbc
fix this issue for AUTO_PATTERN and AUTO_TOPICS in cleanUpConsumedOff…
RivenSun2 Sep 20, 2021
39ee67a
1. fix NPE issue
RivenSun2 Sep 22, 2021
9aa7bd7
commit changeCodes by showuon review
RivenSun2 Sep 26, 2021
5e09d03
fix package auto-import issue by IDEA Editor
RivenSun2 Sep 26, 2021
8b1382a
commit reviewed codeChange
RivenSun2 Oct 1, 2021
7346e88
commit reviewed codeChange
RivenSun2 Oct 30, 2021
0a77af1
commit test code
RivenSun2 Oct 31, 2021
9e544b2
fix merge conflicts
RivenSun2 Oct 31, 2021
3334bc5
Merge branch 'apache:trunk' into trunk
RivenSun2 Nov 2, 2021
a5f6f81
commit code changes for review
RivenSun2 Nov 4, 2021
12359c2
commit code changes for showuon's review
RivenSun2 Nov 5, 2021
359af7d
commit code changes for showuon's review
RivenSun2 Nov 15, 2021
82b3585
commit code changes for guozhangwang's review
RivenSun2 Dec 23, 2021
a98bd87
fix issue for ConsumerBounceTest testClose case failed
RivenSun2 Dec 24, 2021
efef844
fix issue for KafkaConsumerTest testCase failed
RivenSun2 Dec 28, 2021
12fb32c
fix issue for testCloseManualAssignment case failed
RivenSun2 Dec 28, 2021
996b8d9
LeaveGroupResponse should be prepared conditionally
RivenSun2 Dec 29, 2021
de1580c
fix issue for testInvalidGroupMetadata case
RivenSun2 Dec 29, 2021
587a2bd
Merge branch 'apache:trunk' into trunk
RivenSun2 Jan 5, 2022
c5e7595
Merge branch 'trunk' of https://github.com/apache/kafka into trunk
RivenSun2 Feb 8, 2022
f78969c
branch conflicts resolved
RivenSun2 Feb 8, 2022
e0ab2e5
in order to retrigger jenkins tests
RivenSun2 Feb 9, 2022
8b8c78f
Merge branch 'apache:trunk' into trunk
RivenSun2 Feb 16, 2022
282692f
Merge branch 'apache:trunk' into trunk
RivenSun2 Feb 23, 2022
e7c66e0
Merge branch 'apache:trunk' into trunk
RivenSun2 Feb 24, 2022
77718c0
Merge branch 'apache:trunk' into trunk
RivenSun2 Feb 28, 2022
d7b986e
Merge branch 'apache:trunk' into trunk
RivenSun2 Mar 2, 2022
4ade6dd
KAFKA-13694: When the Broker side processes the ProduceRequest, it pr…
RivenSun2 Mar 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(batchIndex, message);
}

@Override
public String toString() {
return "RecordError("
+ "batchIndex=" + batchIndex
+ ", message=" + ((message == null) ? "null" : "'" + message + "'")
+ ")";
}
}

public static ProduceResponse parse(ByteBuffer buffer, short version) {
Expand Down
12 changes: 4 additions & 8 deletions core/src/main/scala/kafka/log/LogValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException}
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -571,13 +571,9 @@ private[log] object LogValidator extends Logging {
private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
if (recordErrors.nonEmpty) {
val errors = recordErrors.map(_.recordError)
if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do we no longer need this special case?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, looks like we need this, because we break 2 test cases after removing it. PR to bring it back to fix failed tests is opened: #11853 . Thanks.

throw new RecordValidationException(new InvalidTimestampException(
"One or more records have been rejected due to invalid timestamp"), errors)
} else {
throw new RecordValidationException(new InvalidRecordException(
"One or more records have been rejected"), errors)
}
throw new RecordValidationException(new InvalidRecordException(
"One or more records have been rejected due to " + errors.size + " record errors " +
"in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
}
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.message._
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
Expand Down Expand Up @@ -1352,7 +1352,7 @@ class LogValidatorTest {
requestLocal = RequestLocal.withThreadConfinedCaching)
)

assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 3)
}
Expand Down Expand Up @@ -1397,8 +1397,9 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
)
// if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
// InvalidTimestampException takes precedence
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
// InvalidTimestampException is no longer takes precedence. The type of invalidException
// is unified as InvalidRecordException
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(6, e.recordErrors.size)
}
Expand Down