Fixup KAFKA-3160: catch decompression errors in constructor#1344
Fixup KAFKA-3160: catch decompression errors in constructor#1344dpkp wants to merge 2 commits intoapache:trunkfrom
Conversation
|
Thanks for the PR @dpkp. Would it be possible to add a test for this? |
| new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream)) | ||
| } catch { | ||
| case ioe: IOException => | ||
| throw new CorruptRecordException(ioe) |
There was a problem hiding this comment.
It's a bit weird to throw an ApiException (CorruptRecordException is a subclass) from here, I think. Let's see what @junrao thinks.
There was a problem hiding this comment.
This seems fine since we already throw CorruptRecordException in line 116 in this class.
There was a problem hiding this comment.
That's right, but that line was added as part of the LZ4 PR. I think we have to use InvalidMessageException in both places for compatibility with older clients. The scaladoc for InvalidMessageException says:
* Indicates that a message failed its checksum and is corrupt
*
* InvalidMessageException extends CorruptRecordException for temporary compatibility with the old Scala clients.
* We want to update the server side code to use and catch the new CorruptRecordException.
* Because ByteBufferMessageSet.scala and Message.scala are used in both server and client code having
* InvalidMessageException extend CorruptRecordException allows us to change server code without affecting the client.|
The next RC is meant to go out this morning, so we need to get this merged ASAP. |
77e8e46 to
b02502a
Compare
|
Sorry, offline last night / this a.m. I merged in your changes, @ijuma . Thanks!! |
|
LGTM. Thanks @dpkp for catching this in the first place and for merging the PR. Gwen reviewed by PR, so I'll go ahead and merge this as soon as the tests pass. |
After testing KAFKA-3160 a bit more, I found that the error code was not being set properly in ProduceResponse. This happened because the validation error is raised in the CompressionFactory constructor, which was not wrapped in a try / catch. ijuma junrao (This contribution is my original work and I license the work under Apache 2.0.) Author: Dana Powers <dana.powers@gmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com>, Gwen Shapira <cshapi@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes #1344 from dpkp/decompress_error_code (cherry picked from commit 4331bf4) Signed-off-by: Ismael Juma <ismael@juma.me.uk>
After testing KAFKA-3160 a bit more, I found that the error code was not being set properly in ProduceResponse. This happened because the validation error is raised in the CompressionFactory constructor, which was not wrapped in a try / catch. ijuma junrao (This contribution is my original work and I license the work under Apache 2.0.) Author: Dana Powers <dana.powers@gmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com>, Gwen Shapira <cshapi@gmail.com>, Ismael Juma <ismael@juma.me.uk> Closes apache#1344 from dpkp/decompress_error_code
Conflicts: GroupCoordinatorIntegrationTest.scala - line 50 SocketServerTest.scala - TestableSocketServer class, removed verifyAcceptorIdlePercent() method, apache#1301 removed pollBlockMs var used in apache#1344 poll() method; testConnectionRateLimit() method changed SocketServer.scala - apache#82, removed SocketServerMetricsGroup var, apache#455 removed idlePercentMeter KafkaApis.scala - apache#50 imports RequestQuotaTest.scala - apache#28 imports
After testing KAFKA-3160 a bit more, I found that the error code was not being set properly in ProduceResponse. This happened because the validation error is raised in the CompressionFactory constructor, which was not wrapped in a try / catch.
@ijuma @junrao
(This contribution is my original work and I license the work under Apache 2.0.)