-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11177: [Java] ArrowMessage failed to parse compressed grpc stream #9147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix! I've added some comments.
Is it possible to add a unit test? (Does CI build against a recent enough gRPC version?)
| int tag = readRawVarint32(stream); | ||
| int tag = readRawVarint32WithEOFCheck(stream); | ||
|
|
||
| switch (tag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we explicitly handle -1 (EOF) here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readRawVarint32 will throw InvalidProtocolBufferException before the tag return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not true, though, since readRawVarint32WithEOFCheck has a branch that returns -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, actually, now that I look at this, ArrowMessage#frame overall doesn't seem right...it shouldn't be checking stream.available() at all, instead it should be looping and reading the first byte, and handing it to readRawVarint32(int, InputStream) if it's >= 0.
I think that would explain the actual issues you are seeing - we should not rely on available() at all and instead use read(). So it should look like
while (true) {
int firstByte = stream.read();
if (firstByte < 0) {
// EOF
break;
}
int tag = readRawVarint32(firstByte, stream);
// ...
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As-is, -1 will get silently ignored and that happens to be correct here, fortunately. But we shouldn't be relying on available() at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Em, it is similar to current code. Besides, current we can return unified exception information for abnormal EOF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current code is wrong, though. (It would be entirely valid for available() == 0 even when the stream is not at EOF. The JavaDocs state that EOF implies available() == 0, but not the other way around.) And the PR raises different exceptions for EOF depending on the case.
Instead of adding flags to massage the current behavior in different cases, let's write code that adheres to the contract of InputStream.
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
Outdated
Show resolved
Hide resolved
| * @throws IOException Read first byte failed. | ||
| */ | ||
| private static int readRawVarint32WithEOFCheck(InputStream is) throws IOException { | ||
| int firstByte = is.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we check firstByte < 0 for EOF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if firstByte < 0 but is.available >0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be an incorrect InputStream implementation, since InputStream.read() == -1 is defined to be EOF - there should be no data left.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Furthermore, available() is defined to be an estimate; available() == 0 doesn't mean EOF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description of java.io.InputStream#available() indicates it will return {@code 0} when it reaches the end of the input stream.
/**
* Returns an estimate of the number of bytes that can be read (or
* skipped over) from this input stream without blocking by the next
* invocation of a method for this input stream. The next invocation
* might be the same thread or another thread. A single read or skip of this
* many bytes will not block, but may read or skip fewer bytes.
*
* <p> Note that while some implementations of {@code InputStream} will return
* the total number of bytes in the stream, many will not. It is
* never correct to use the return value of this method to allocate
* a buffer intended to hold all data in this stream.
*
* <p> A subclass' implementation of this method may choose to throw an
* {@link IOException} if this input stream has been closed by
* invoking the {@link #close()} method.
*
* <p> The {@code available} method for class {@code InputStream} always
* returns {@code 0}.
*
* <p> This method should be overridden by subclasses.
*
* @return an estimate of the number of bytes that can be read (or skipped
* over) from this input stream without blocking or {@code 0} when
* it reaches the end of the input stream.
* @exception IOException if an I/O error occurs.
*/
public int available() throws IOException {
return 0;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, my read of this is that EOF implies available == 0, but not the other way around. Otherwise, since the base InputStream always returns 0 for available, that would imply it's always at EOF, which is rather useless, right? And the method definition is about the number of bytes that can be read without blocking, not the number of bytes that can be read overall.
If empirically, that doesn't match the behavior of the InflaterInputStream, that's fine - but let's make it clear in that case so that it doesn't confuse people without that context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @lidavidm another scenario where zero is perfectly allowable is when no data is buffered but EOF hasn't been reached. @stczwd do you think you will be able to revise the PR to fix the underlying issue?
| } | ||
|
|
||
| private static int readRawVarint32(InputStream is) throws IOException { | ||
| int firstByte = is.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems here we should be checking firstByte < -1 since readRawVarint32 expects the caller to do EOF-checking - so maybe we don't need a new method, and we should fix this for all callers of this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I have thought about this. This function is also called for getting size, we should keep checking of firstByte < -1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean - if is.read() returns -1, the stream is at EOF, so there's never any more data to read.
I also mean, we should consolidate readRawVarint32 and readRawVarint32WithEOFCheck since there's never any reason to proceed if we're at EOF, and furthermore, CodedInputStream#readRawVarint32(int, InputStream) itself doesn't do an EOF check (it assumes the caller has).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readRawVarint32 is called to get tag and byte size, exception is still needed on the byte size reading.
If the tag readed is -1, it means there's never any more data to read.
But if the size readed is -1, it means this is bad data and exception is needed.
d4608a9 to
356c300
Compare
|
@lidavidm @stczwd what is the status of this PR? |
|
I still believe we should not be polling available(). |
|
@stczwd did you want to address feedback here? |
|
sure. I have changed the code with checking -1. |
|
@emkornfield @lidavidm any more comments? |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- Can we add a unit test to ensure the original issue was actually fixed?
- The core of the issue still doesn't seem fixed: the loop inside
ArrowMessage#frameis still checkingavailable, which as discussed isn't the right way to determine EOF, especially for a compressed stream, and so we're erroneously terminating early. But without a unit test it's hard to show one way or the other.
|
@lidavidm Do you want to take this up and finish it, or perhaps one of @lwhite1 @davisusanibar is interested in doing it? |
|
Yes, we should finish this up. I won't have a chance to until at least next week but if David or Larry want to take it up I'm happy to review. |
|
This would still need taking up :-) |
|
Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍 |
InvalidProtocolBufferException will be thrown in ArrowMessage.frame if we use gzip compress in Grpc.
The reason is, stream.available is still a 1 after we read compressed data. It need another read to change InflaterInputStream.reachEOF to be true, and make stream.available to be 0.
Thus we need check stream.available agian after read first byte from stream.