kafka-4576: Log segments close to max size break on fetch#2304
kafka-4576: Log segments close to max size break on fetch#2304huxihx wants to merge 14 commits intoapache:trunkfrom huxihx:kafka4576_FileChannel_read
Conversation
JVM spec does not make a guarantee that the buffer will be filled up when invoking FileChannel.read, which is called in searchOffsetWithSize to fail the follower replicas' reading data from leader when large log segment is set.
|
@ijuma @onurkaraman Please have a review on this PR. Thanks |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR. In addition to the other review comments:
- It would be good to add unit tests for the new method
- FileRecords.readInto also has to be updated.
cc @junrao @hachikuji who may have thoughts on this as well.
| * @throws IllegalArgumentException If invalid arguments were passed | ||
| * @throws IOException If some other I/O error occurs | ||
| */ | ||
| public static void readFullyFrom(FileChannel channel, ByteBuffer buffer, long startPosition) throws IOException { |
There was a problem hiding this comment.
Maybe the name should simply be readFully. And buffer should maybe be destinationBuffer. And startPosition can maybe just be position for consistency with FileChannel.read.
Finally, we need a mechanism to indicate that EOF has been reached. Maybe a boolean is good enough. The usual way of returning an int is a bit weird for the case where some bytes are read and then EOF is reached.
| } | ||
| if (buffer == null) { | ||
| throw new IllegalArgumentException("Must specify the byte buffer for the channel."); | ||
| } |
There was a problem hiding this comment.
I don't think we need the null checks.
| throw new IllegalArgumentException("Must specify the byte buffer for the channel."); | ||
| } | ||
| if (startPosition < 0) { | ||
| throw new IllegalArgumentException("The file position cannot be negative."); |
There was a problem hiding this comment.
We should include startPosition in the message.
| if (startPosition < 0) { | ||
| throw new IllegalArgumentException("The file position cannot be negative."); | ||
| } | ||
| final int totalBytesToRead = buffer.capacity(); |
There was a problem hiding this comment.
This is not correct, it should be buffer.remaining. It seems simpler to just do while (buffer.remaining()) with an early exit in the case read returns -1.
| int bytesRead = channel.read(buffer, currentPosition); | ||
| if (bytesRead == -1) { | ||
| log.warn(String.format("Read incomplete data from the given file channel. Current file channel size: %d, and wanted to read at position: %d", | ||
| channel.size(), currentPosition)); |
There was a problem hiding this comment.
What's the purpose of this warning? It doesn't seem needed.
| totalBytesRead += bytesRead; | ||
| currentPosition += bytesRead; | ||
| } | ||
| buffer.flip(); // Make the buffer ready for read |
There was a problem hiding this comment.
A bit unclear why we need this. In my mind, readFully should be as close to FileChannel.read as possible with the exception that it attempts to fill the buffer while end of file is not reached.
|
Note that there are a number of test failures caused by this change. |
|
Thanks for the comments. Will work on these comments asap. |
Addressed Ijuma's comments 1. Refined some variaiable names 2. Fix code bug to have all unit test cases passed
|
@ijuma Thanks for the comments. Refined the code as per your suggestions. Now all the unit test cases should pass. Since we already have about 7 unit test cases (such as FileRecordsTest.testRead), do you think we still need to add new test cases for this method? Thanks again. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the updates. Looks good overall, just a few nits.
| public static void readFully(FileChannel channel, ByteBuffer destinationBuffer, long position) throws IOException { | ||
| if (channel == null) { | ||
| throw new IllegalArgumentException("Cannot read an empty file channel."); | ||
| } |
There was a problem hiding this comment.
I don't think we need this null check either.
| } | ||
|
|
||
| /** | ||
| * Read data from the channel to the given byte buffer |
There was a problem hiding this comment.
Maybe add at the end until there are no bytes remaining in the buffer or the channel has reached end of stream.
| * @param position The file position at which the transfer is to begin; must be non-negative | ||
| * | ||
| * @throws IllegalArgumentException If invalid arguments were passed | ||
| * @throws IOException If some other I/O error occurs |
| throw new IllegalArgumentException("Cannot read an empty file channel."); | ||
| } | ||
| if (position < 0) { | ||
| throw new IllegalArgumentException("The file position cannot be negative, but we got " + position); |
There was a problem hiding this comment.
Nit: "The file channel position..."
| @@ -113,7 +113,7 @@ public FileChannel channel() { | |||
| * @throws IOException | |||
| */ | |||
| public ByteBuffer readInto(ByteBuffer buffer, int position) throws IOException { | |||
There was a problem hiding this comment.
Worth clarifying the docs, similar to the readFully documentation.
| throw new IllegalArgumentException("The file position cannot be negative, but we got " + position); | ||
| } | ||
| long currentPosition = position; | ||
| while (destinationBuffer.hasRemaining()) { |
There was a problem hiding this comment.
I think we probably want a do/while loop here. There should be no difference in behaviour, but it seems to model the problem better (i.e. we first do a read and then we check if there is still space remaining in the buffer. Maybe:
long currentPosition = position;
int bytesRead;
do {
bytesRead = channel.read(destinationBuffer, currentPosition);
currentPosition += bytesRead;
} while (bytesRead != -1 && destinationBuffer.hasRemaining());…ystic/kafka into kafka4576_FileChannel_read
adjusted some comments as per Ijuma's suggestion.
|
@ijuma already addressed your comments, please take some time to review again. Appreciated. |
| * @return The same buffer | ||
| * @throws IOException | ||
| * Read data from the channel to the given byte buffer until there are no bytes remaining in the buffer or | ||
| * the channel has reached end of stream |
There was a problem hiding this comment.
Hmm, the idea was to extend the existing definition to explain in more detail what happens, not to change to be the same as readFully.
There was a problem hiding this comment.
sorry, but I do not catch your meaning. Do you mean we should modify the comments for readInto method to explain why we use readFully instead of naive Channel.read?
There was a problem hiding this comment.
The previous comment said "Read log entries into the given buffer". This is more specific than "Read data from the channel to the given byte buffer..." and we should keep that. We could perhaps say:
"Read log entries into the given buffer until there are no bytes remaining in the buffer or the channel has reached end of stream."
There was a problem hiding this comment.
Thank you for clarifying this. Already addressed. Please check.
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
Restored the comments for FileRecords.readInto method and tweaked some expression.
ijuma
left a comment
There was a problem hiding this comment.
LGTM. Will wait for @hachikuji or @junrao to take a look as well before merging.
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
The fix LGTM. Would be nice to have some test cases for |
|
Good point about the test, I had asked for it and then forgot to reply to @amethystic's question. I agree with @hachikuji that it would be useful to have a simple test with a mocked |
|
@ijuma sorry but not familiar with the mock things. Could you give an example on how to get started with that? |
|
@amethystic, if you search for The existing unit tests don't exercise these different paths (since they already passed before your change). |
|
What readFully must guarantee is that it never leads to a situation where channel still has something to read but buffer does not get filled up. As a utility method, readFully should not assume channel's EOF is an exceptional case. The enclosing method could take over that responsibility. |
|
@amethystic : I think this issue is in FileRecords.readInto(). In this method, the expectation is that the buffer may not be filled if there is not enough bytes in the channel. So, for that, we probably need a slightly different util function that reads until the min(Buffer capacity and the remaining bytes in channel). In other places, we do expect the buffer will be filled up. |
| currentPosition += bytesRead; | ||
| } while (bytesRead != -1 && destinationBuffer.hasRemaining()); | ||
| if (bytesRead == -1) { | ||
| throw new IOException(String.format("Reached EOF before filling up the buffer. Current file channel position = %d, buffer remaining = %d", |
There was a problem hiding this comment.
It would be nice if the calling code could pass a string with context for what is being read. That way we don't lose information like:
if (buf.hasRemaining())
- throw new KafkaException("Failed to read magic byte from FileChannel " + channel);For the case above, we could pass "magic byte" and then the error message would read: "Reached EOF before filling the buffer with magic byte..." or something along those lines.
There was a problem hiding this comment.
Thanks for the feedback. Personally, I agree @junrao 's suggestion that we might need another read function to take care of the cases when reading EOF is expected. As for @ijuma 's "magic byte", I am not sure if I fully understand how it could be used, but I still think we have put too much responsibilities to readFully which otherwise should do only one thing: READING CHANNEL TO THE BUFFER. So I strongly recommend we refine the enclosing readInto to have all of these things pass the tests.
What do you think?
There was a problem hiding this comment.
I am not sure what is your concrete suggestion. Our aim here is to provide internal utility methods that do the right thing concisely and with good error messages.
I agree that it may make sense to have two methods. One could be readFullyOrFail and the other could simply be readFully maybe. The former would throw the exception on EOF while the other would not.
To clarify my suggestion, I meant that the readFullyOrFail variant could take a string parameter that would be included in the IOException message. It's useful to know that we failed to read "magic string", for example.
There was a problem hiding this comment.
Okay, I will consider to combine all the suggestion and submit a fix asap. Thank for the comments.
Addressed Jun's comments 1. Renamed readFully to readFullyOrFail 2. Added a new method 'readFully' that do not throw IOException when encountering EOF 3. Added two unit test cases to test readFully 3. Added two unit test cases to test readFully 3. Added two unit test cases to test readFully
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| channel.read(logHeaderBuffer, position); | ||
| if (logHeaderBuffer.hasRemaining()) | ||
| return null; | ||
| Utils.readFullyOrFail(channel, logHeaderBuffer, position); |
There was a problem hiding this comment.
@junrao, is it OK to throw an exception instead of returning null if we reach EOF before filling the buffer here?
| // Scenario 1: test single read | ||
| Utils.readFullyOrFail(channel, perfectBuffer, 0); | ||
| assertTrue("Buffer should be filled up.", !perfectBuffer.hasRemaining()); | ||
| // Scenario 2: test multiple reads |
There was a problem hiding this comment.
I am not sure what you mean by test multiple reads here. I think we're testing two things here:
- That the first read doesn't cause a side-effect to the channel that could prevent the second read from succeeding.
- That we can read into a smaller buffer than what's in the file channel.
"multiple reads" is a little unclear as it sounds like we are causing the underlying file channel to do multiple reads, which I don't think we are.
| } | ||
|
|
||
| @Test | ||
| public void testReadFullyOrFailThrowIOExceptionForEOF() throws IOException { |
There was a problem hiding this comment.
Does this add anything over testReadFullyOrFailWithRealFile? If not, we can simply remove it.
| assertTrue("Buffer should be filled up.", !perfectBuffer.hasRemaining()); | ||
| // Scenario 2: test multiple reads | ||
| Utils.readFullyOrFail(channel, smallBuffer, 0); | ||
| assertTrue("Buffer should be filled up.", !smallBuffer.hasRemaining()); |
There was a problem hiding this comment.
Maybe we should add one case where position > 0.
|
|
||
| EasyMock.replay(channelMock); | ||
| Utils.readFully(channelMock, buffer, 0L); | ||
| Assert.assertTrue("The buffer should be filled up", !buffer.hasRemaining()); |
There was a problem hiding this comment.
Seems like assertFalse would be more appropriate here. There are a few cases like that. Also, it would be good to verify the buffer contents.
| } | ||
|
|
||
| @Test | ||
| public void testReadFullyOrFailWithRealFile() throws IOException { |
There was a problem hiding this comment.
It would be good to verify that the the buffer is populated correctly.
|
|
||
| EasyMock.replay(channelMock); | ||
| Utils.readFullyOrFail(channelMock, buffer, 0L); | ||
| Assert.assertTrue("The buffer should be filled up", !buffer.hasRemaining()); |
There was a problem hiding this comment.
It would be good to verify that the buffer contents are correct as well.
| } | ||
| }); | ||
| totalMockedBytesRead += mockedBytesRead; | ||
| } |
There was a problem hiding this comment.
Seems like this is the same in testReadFullyOrFailWithMultiReads. Maybe we can extract it to a helper method.
| } | ||
|
|
||
| @Test | ||
| public void testReadFullyCanFillUpBuffer() throws IOException { |
There was a problem hiding this comment.
Is this name correct? It seems like the buffer is never filled in this test.
Implementation and documentation improvements
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
Merged Ijuma's fix and addressed Ijuma's comments.
|
@ijuma already merged your fix and addressed the comments. Thanks for the feedback and review again:) |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
junrao
left a comment
There was a problem hiding this comment.
@amethystic : Thanks for the patch. LGTM. Just a couple of minor comments.
| break; | ||
| } | ||
| // If the remaining byte number is less than a step, | ||
| // directly assign the last mocked bytes read to fix `bufferSize` exactly |
| * @param channel File channel containing the data to read from | ||
| * @param destinationBuffer The buffer into which bytes are to be transferred | ||
| * @param position The file position at which the transfer is to begin; it must be non-negative | ||
| * @param description A description of what is being read, this will be included in the EOFException if we |
There was a problem hiding this comment.
Incomplete sentence after "if we"?
|
Sorry for the typos. Thanks. |
Fixed some typos
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
`FileChannel.read` may not fill the destination buffer even if there are enough bytes in the channel to do so. Add a couple of utility methods that ensure this and use them from all the relevant places. Author: huxi <huxi@zhenrongbao.com> Author: amethystic <huxi_2b@hotmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes #2304 from amethystic/kafka4576_FileChannel_read (cherry picked from commit 337f576) Signed-off-by: Ismael Juma <ismael@juma.me.uk>
|
Thanks for the updates, I merged to trunk and 0.10.2 after tweaking the tests a little. |
`FileChannel.read` may not fill the destination buffer even if there are enough bytes in the channel to do so. Add a couple of utility methods that ensure this and use them from all the relevant places. Author: huxi <huxi@zhenrongbao.com> Author: amethystic <huxi_2b@hotmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk> Closes apache#2304 from amethystic/kafka4576_FileChannel_read
JVM spec does not make a guarantee that the buffer will be filled up when invoking FileChannel.read, which is called in searchOffsetWithSize to fail the follower replicas' reading data from leader when large log segment is set.