Skip to content

[MINOR] Guard against crashing on invalid key range queries#6521

Merged
bbejeck merged 10 commits intoapache:trunkfrom
ableegoldman:GuardKeyRangeQueries
Apr 10, 2019
Merged

[MINOR] Guard against crashing on invalid key range queries#6521
bbejeck merged 10 commits intoapache:trunkfrom
ableegoldman:GuardKeyRangeQueries

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

Due to KAFKA-8159, Streams will throw an unchecked exception when a caching layer or in-memory underlying store is queried over a range of keys from negative to positive. We should add a check for this and log it then return an empty iterator (as the RocksDB stores happen to do) rather than crash

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ableegoldman ableegoldman changed the title Guard against crashing on invalid key range queries [MINOR] Guard against crashing on invalid key range queries Mar 29, 2019
@ableegoldman
Copy link
Copy Markdown
Member Author

@ableegoldman
Copy link
Copy Markdown
Member Author

Streams should at least be consistent across store types in its handling of invalid range queries, and I felt it was better to log the error and return nothing than to throw an exception. However maybe silently returning "incorrect" results is worse than crashing and alerting users to the issue...WDYT?

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One meta comment: should we add documentations similar to https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java#L71 to indicate the object logical ordering and the serialized lexicographic ordering to be consistent as well?

@ableegoldman
Copy link
Copy Markdown
Member Author

This ordering only needs to be enforced for IQ, correct?

@guozhangwang
Copy link
Copy Markdown
Contributor

This ordering only needs to be enforced for IQ, correct?

I think it should be applied universally, since whenever you call a put, which will serialize the object to bytes, ordering happened already -- if the serialized bytes are put not accordingly to the object order, then a follow-up range-fetch, either from IQ or from the processor, will return wrong result.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman. Overall looks good to me I just have a minor comment regarding the logging level.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since this represents an invalid range maybe this could be a WARN?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, good point

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here and below

@bbejeck bbejeck added the streams label Apr 4, 2019
@ableegoldman ableegoldman force-pushed the GuardKeyRangeQueries branch from 1e4286f to ac27e85 Compare April 4, 2019 22:33
Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @ableegoldman LGTM.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 5, 2019

call for second review any of @guozhangwang @mjsax @vvcephei @cadonna

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ableegoldman,

Are there unit tests in place to verify the changes in this PR?

For the rest, I have just a couple of nits.

@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
// Make sure this is a valid query
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would remove the comment here (and in all occurrences below), because the code itself is clear enough about what it does. Maybe rename from and to to fromKey and toKey (or similar) to make it even more clearer. Renaming would also apply to some of the changes below.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

final Bytes to) {
// Make sure this is a valid query
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for range query with invalid range: keyFrom > keyTo.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would avoid to write variable names (i.e., keyFrom and keyTo) to a log, because they are hard to maintain consistently with the code (as you can see here).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack


@Test
public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
store.range(-1, 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender to assert the correct log message

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks, will add to tests

@ableegoldman
Copy link
Copy Markdown
Member Author

retest this, please

LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();

store.range(-1, 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a check to verify that the returned iterator is empty. Something along the lines of assertThat(iterator.hasNext(), is(false))?

Could you also add a test for a range query where the start key is equal to the end key? Such a unit test ensures correct behaviour for this special case.

nit: I would rename the test to shouldReturnEmptyIteratorForRangeQueryWithInvalidKeyRange. Correct me, if I am wrong, but I think the empty iterator and the invalid key range are the points here, not the negative starting key. I would even change the range from (-1, 1) to (5, 3). It took me a bit to understand why (-1, 1) is an invalid range.

These comments apply also to the unit tests below.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack Re: verify returned iterator is empty, add unit tests for equal start/end keys

Regarding your third point, this patch is mostly aimed at the bug in [https://issues.apache.org/jira/browse/KAFKA-8159]

which went undiscovered for a while because there were no tests of range queries with a negative key. I actually think it's fair to say we make no guarantees about what will happen if your app makes an invalid query; however we definitely shouldn't crash on what appears to be a valid query range (ie [-1,1]), which is the key point here

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 9, 2019

Java 8 failed with kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
Java 11 passed

retest this please

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 9, 2019

Java 8 passed Java 11 failure unrelated

retest this please

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 10, 2019

Java 8 failed Execution failed for task ':core:integrationTest' Java 11 passed

retest this please

@bbejeck bbejeck merged commit 9f5a69a into apache:trunk Apr 10, 2019
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 10, 2019

Merged 6521 to trunk

ableegoldman added a commit to ableegoldman/kafka that referenced this pull request Apr 10, 2019
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
)

Due to KAFKA-8159, Streams will throw an unchecked exception when a caching layer or in-memory underlying store is queried over a range of keys from negative to positive. We should add a check for this and log it then return an empty iterator (as the RocksDB stores happen to do) rather than crash

Reviewers: Bruno Cadonna <bruno@confluent.io> Bill Bejeck <bbejeck@gmail.com>
@ableegoldman ableegoldman deleted the GuardKeyRangeQueries branch June 26, 2020 22:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants