Skip to content

KAFKA-7549: Old ProduceRequest with zstd compression does not return error to client#5925

Merged
hachikuji merged 5 commits intoapache:trunkfrom
dongjinleekr:feature/KAFKA-7549
Dec 10, 2018
Merged

KAFKA-7549: Old ProduceRequest with zstd compression does not return error to client#5925
hachikuji merged 5 commits intoapache:trunkfrom
dongjinleekr:feature/KAFKA-7549

Conversation

@dongjinleekr
Copy link
Copy Markdown
Contributor

As of current version (2.1.0), zstd-related validations are located in following spots:

  1. ProduceRequest

    • MemoryRecordsBuilder: can't create MemoryRecords with magic < 2 (IllegalArgumentException)
    • ProduceRequest.Builder: can't create ProduceRequest with api version below 7 (InvalidRecordException)
  2. FetchRequest

    • KafkaApis#handleFetchRequest: Returns FetchResponse w/ Errors#UNSUPPORTED_COMPRESSION_TYPE if ...
      1. FetchRequest w/ API version < 10 is delivered to a zstd-compressed topic.
      2. Down-conversion failure: LazyDownConversionRecords#makeNextRecordsUtil#downConvert throws UnsupportedCompressionTypeException.
  3. Etc

    • AbstractLegacyRecordBatch.DeepRecordsIterator: A boilerplate validation for legacy record batches.

In short, there is no broker-side validation for ProduceRequest w/ zstd compressed records. This PR compensates this hole.

There is a reason why this validation can't be located in other class, e.g., LogValidator: it can't see the API version of ProduceRequest. The only method that can check both of CompressionType and API version is KafkaApis#handleProduceRequest; it's why.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
@dongjinleekr
Copy link
Copy Markdown
Contributor Author

Here is the update. Since ProducerRequest#validateRecords modifies the internal state of ProducerRequest (i.e., idempotent and transactional), I first separated that logic into ProducerRequest#setFlags and then turned ProducerRequest#validateRecords into static method. KafkaApis#handleProduceRequest now validates ProducerRequest with ProducerRequest#validateRecords.

+1. Now ProducerRequest#validateRecords throws UnsupportedCompressionTypeException when it violates zstd condition.
+2. This update also includes rebasing upon the latest trunk.

@hachikuji
Copy link
Copy Markdown
Contributor

@dongjinleekr Thanks, I think the fix looks good. I am wondering if it is possible to create a test case in kafka.server.ProduceRequestTest to verify that we receive the expected error code when zstd is used in older versions?

@dongjinleekr
Copy link
Copy Markdown
Contributor Author

@hachikuji No problem. I will complete it by this weekend.

@dongjinleekr
Copy link
Copy Markdown
Contributor Author

Here is the update with rebasing upon the latest trunk. In this update, I added a method to go around the validation logic for unit testing(commit c23db83). I know this approach is a little bit dangerous but could not find any other good alternative.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a minor suggestion. Otherwise looks good.

return build(version, true);
}

public ProduceRequest build(short version, boolean validate) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this can be private and we can expose a buildUnsafe method that sets validate to false. Then we will be less tempted to accidentally use the API.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch. @dongjinleekr Note that I went ahead and pushed the minor suggestion that I had.

@dongjinleekr
Copy link
Copy Markdown
Contributor Author

@hachikuji I greatly appreciate your help. 👍

@hachikuji hachikuji merged commit 7a3dffb into apache:trunk Dec 10, 2018
hachikuji pushed a commit that referenced this pull request Dec 10, 2018
…error to client (#5925)

Older versions of the Produce API should return an error if zstd is used. This validation existed, but it was done during request parsing, which means that instead of returning an error code, the broker disconnected. This patch fixes the issue by moving the validation outside of the parsing logic. It also fixes several other record validations which had the same problem.

Reviewers: Jason Gustafson <jason@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…error to client (apache#5925)

Older versions of the Produce API should return an error if zstd is used. This validation existed, but it was done during request parsing, which means that instead of returning an error code, the broker disconnected. This patch fixes the issue by moving the validation outside of the parsing logic. It also fixes several other record validations which had the same problem.

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants