Skip to content

KAFKA-9206: throw a KafkaException when encountering CORRUPT_MESSAGE#8111

Merged
hachikuji merged 7 commits intoapache:trunkfrom
agam:AK-9206
Feb 21, 2020
Merged

KAFKA-9206: throw a KafkaException when encountering CORRUPT_MESSAGE#8111
hachikuji merged 7 commits intoapache:trunkfrom
agam:AK-9206

Conversation

@agam
Copy link
Copy Markdown
Contributor

@agam agam commented Feb 13, 2020

  • If the completed fetch has an error code signifying a corrupt message, throw a KafkaException that notes the offset and the topic-partition.

  • Also added a test that triggers the warning and verifies it is thrown.

  • Jira: https://issues.apache.org/jira/browse/KAFKA-9206

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@agam agam requested a review from hachikuji February 13, 2020 19:58
@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 13, 2020

Note: I found LogCaptureAppender in the streams module to be useful for verifying log messages, and copied it to the clients module. Let me know if there's some common place I can place it instead.

Comment thread build.gradle Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Supposedly the slf4jlog4j dependency brings this too?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're doing it because we refer to log4j at compile time now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's an import for LogCaptureAppender

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main question here is whether we want to raise this error to the user. If we happen to see message corruption in the fetched data, we raise that to the user, so should this case be different?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. As discussed, will (1) change this to correspond to the current behavior for corruption in fetched data, i.e. throwing a KafkaException that wraps around a CorruptRecordException, and (2) remove the log message expectation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: just a KafkaException (the wrapping around I referred to was the text of the CorruptRecordException, not the exception itself)

@agam agam changed the title KAFKA-9206: log a warning when encountering CORRUPT_MESSAGE KAFKA-9206: throw a KafkaException when encountering CORRUPT_MESSAGE Feb 14, 2020
@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 14, 2020

Updated, please take a look.

Copy link
Copy Markdown
Contributor

@soondenana soondenana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. throwing a KafkaException that wraps around a CorruptRecordException

Are you planing to have a CorruptRecordException exception too?

// Trigger the exception.
assertThrows(KafkaException.class, () -> {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords =
fetchedRecords();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just calling fetchedRecords() should work here, or is compiler not liking it and forcing the return value assignment?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, made the change to discard the return value.

@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 14, 2020

i.e. throwing a KafkaException that wraps around a CorruptRecordException

Are you planing to have a CorruptRecordException exception too?

No, clarified with a comment above too, I initially thought we were wrapping the exception in the "data corruption" case, but we're just wrapping its text.

} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else if (error == Errors.CORRUPT_MESSAGE) {
throw new KafkaException("Encountered corrupt message when fetching offset "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we're at it, could we add the fetch offset to the IllegalStateException message below and the UNKNOWN_SERVER_ERROR log message above?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@hachikuji
Copy link
Copy Markdown
Contributor

ok to test

@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 18, 2020

Test failure seems unrelated to this change: org.apache.kafka.clients.admin.KafkaAdminClientTest.testDefaultApiTimeoutOverride ?

(will rerun tests once)

@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 19, 2020

retest this please

@hachikuji
Copy link
Copy Markdown
Contributor

The test failure is known flaky. I have a fix in #8101.

log.debug("Received unknown leader epoch error in fetch for partition {}", tp);
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
log.warn("Unknown error fetching data while fetching offset "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for log messages, can we use the {} placeholders?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@hachikuji
Copy link
Copy Markdown
Contributor

ok to test

@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 20, 2020

Couple of tests that failed in this run but not the previous run, likely flaky:

  • kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout
  • kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the PR!

@hachikuji
Copy link
Copy Markdown
Contributor

@agam One other small thing before we merge. Can you add CORRUPT_MESSAGE to the list of error codes in the javadoc for FetchResponse?

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@hachikuji
Copy link
Copy Markdown
Contributor

ok to test

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@agam
Copy link
Copy Markdown
Contributor Author

agam commented Feb 21, 2020

Two test failures:

  • kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout (same as earlier)
  • kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup (newer failure in this run)

@hachikuji hachikuji merged commit 84c4025 into apache:trunk Feb 21, 2020
@agam agam deleted the AK-9206 branch February 21, 2020 19:35
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 24, 2020
* apache-github/trunk: (23 commits)
  KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154)
  HOTFIX: fix NPE in Kafka Streams IQ (apache#8158)
  MINOR: set scala version automatically based on gradle.properties
  KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142)
  KAFKA-9441: Add internal TransactionManager (apache#8105)
  MINOR: Document endpoints for connector topic tracking (KIP-558)
  MINOR: Standby task commit needed when offsets updated (apache#8146)
  KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111)
  MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py
  KAFKA-9586: Fix errored json filename in ops documentation
  KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade
  KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058)
  HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140)
  MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337)
  MINOR: Improve EOS example exception handling (apache#8052)
  MINOR: Fix a number of warnings in clients test (apache#8073)
  MINOR: Update shell scripts to support z/OS system (apache#7913)
  MINOR: Wording fix in Streams DSL docs (apache#5692)
  MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141)
  KAFKA-9533: ValueTransform forwards `null` values (apache#8108)
  ...
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 24, 2020
…etrics-common

* confluent/master: (76 commits)
  KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154)
  HOTFIX: fix NPE in Kafka Streams IQ (apache#8158)
  MINOR: set scala version automatically based on gradle.properties
  KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142)
  KAFKA-9441: Add internal TransactionManager (apache#8105)
  MINOR: Document endpoints for connector topic tracking (KIP-558)
  MINOR: Standby task commit needed when offsets updated (apache#8146)
  Changes to migrate to Artifactory (#263)
  KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111)
  MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py
  KAFKA-9586: Fix errored json filename in ops documentation
  KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade
  KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058)
  HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140)
  MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337)
  MINOR: Improve EOS example exception handling (apache#8052)
  MINOR: Fix a number of warnings in clients test (apache#8073)
  MINOR: Update shell scripts to support z/OS system (apache#7913)
  MINOR: Wording fix in Streams DSL docs (apache#5692)
  MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants