Skip to content

KAFKA-7652: Part I; Fix SessionStore's findSession(single-key)#6134

Merged
guozhangwang merged 11 commits intoapache:trunkfrom
guozhangwang:K7652-session-find
Jan 18, 2019
Merged

KAFKA-7652: Part I; Fix SessionStore's findSession(single-key)#6134
guozhangwang merged 11 commits intoapache:trunkfrom
guozhangwang:K7652-session-find

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

  1. Let findSessions(final K key) to call on underlying bytes store directly, using the more restricted range.

  2. Fix the conservative upper range for multi-key range in session schema.

  3. Minor: removed unnecessary private WrappedSessionStoreBytesIterator class as it is only used in unit test.

  4. Minor: removed unnecessary schema#init function by using the direct bytes-to-binary function.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L);
assertEquals(expected, toList(values));

final List<KeyValue<Windowed<String>, Long>> expected2 = Collections.singletonList(KeyValue.pair(a2, 2L));
Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang Jan 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This added test is to cover the discovered bug, it will fail if 1) is not fixed.

final byte[] maxSuffix = ByteBuffer.allocate(SUFFIX_SIZE)
.putLong(to)
// start can at most be equal to end
// the end timestamp can be as large as possible as long as it's larger than start time
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for 2).

@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return findSessions(key, key, earliestSessionEndTime, latestSessionStartTime);
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for 1).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe saving someone else from having to dig down and figure out why it's different...

Because of the way the keys are formatted, when we know we're looking for this exact key, we can search essentially from the key [my-key , start time] through [my-key , end time], whereas the other method we were delegating to has to scan from [start-key] to [end-key], which (because of the way the keys are structured) can't also bound the start and end time, but has to iterate over all the rocks keys and filter out the ones that fall outside the desired time bound.

Therefore, this change can significantly tighten the range that actually gets scanned in Rocks.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@vvcephei @bbejeck @mjsax

@guozhangwang
Copy link
Copy Markdown
Contributor Author

guozhangwang commented Jan 12, 2019

Here is the full story..

  1. First of all: the findSessions implementation itself is quite tricky to understand, but to understand the bug we need to firstly go through it:
    as documented in the javadoc for findSessions(key, ...):
Fetch any sessions with the matching key and the sessions end is &ge; earliestSessionEndTime and the sessions start is &le; latestSessionStartTime

so let's take an example: let's say you have the following sessions for the same key:

[t0, t1]
[t1, t2]
[t1, t5]
[t4, t5]
[t4, t6]
[t5, t6]

And we query findSessions(key, t2, t4)
it means any sessions whose end time is larger than t2, AND start time is smaller than t4, should be returned.
the reason for this semantics is that, during sessioned aggregation the key is to find the sessions that "need to be merged together". So suppose your session inactivity gap is one unit (i.e. one t), and you have a record with t3 coming in, you'd need to find exactly findSessions(key, t2, t4) these sessions to merge them together into a single session, since due to this record these sessions should not be broken up anymore
And the above query should return [t1, t2], [t4, t5], [t1, t5], [t4, t6] but NOT return [t0, t1] and [t5, t6].

  1. Okay now here's the trick we did in the session store impl: we store the entry as (key, end-time, start-time) note we first store end time, and then start time.

The key idea here is that, in order to support the findSessions(key, t2, t4) above, with the above layout our entries are actually stored as (in bytes ordering)

[t1, t0]
[t2, t1] *
[t5, t1] *
[t5, t4] *
[t6, t4] *
[t6, t5]

note the ones with * are the ones that should be returned.
So the underlying rocksDB query range could be defined in as

from: [t2, 0]   -> more generally, [earliestEnd, 0]
to: [MAX, t4]   -> more generally, [MAX, latestStart]

Note this is a conservative range, but still pretty efficient. And this is exactly what we define here:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java#L47-L57

  1. Now the history how a bug gets developed: in 0.11.0 a couple of PRs merged in:

3.a) that adds the key-range query as well (i.e. findSessions(keyFrom, keyTo, ... for session stores):

https://github.com/apache/kafka/pull/3027/files#diff-8d2f47dd4b3fa8aa87565fa1f5b01d3fR93

Note the line I highlighted, it just delegate findSessions(key, .. ) to call the new API as findSessions(key, key, ...) instead of directly calling the underlying findSessions. This is the root cause of the bug.

3.b) https://github.com/apache/kafka/pull/2972/files that fixed a bug in SessionWindowedAggregateProcessor, the fix itself is fine, but it actually exposed the bug we planted in 3.a) above.

And then later we realized that for multiple-key range query, it is actually more complicated than we thought (we've discussed about this, details here:
https://issues.apache.org/jira/browse/KAFKA-5285

The end result is that we need to have a much, much, much more conservative range for multi-key ranges, and hence this:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java#L59-L71

In fact, even this range turns out to be not conservative enough as I've investigated it (my PR have also fixed it).

So this actually result in both the performance degradation, as mentioned in https://issues.apache.org/jira/browse/KAFKA-7652, but even worse, because the not-conservative-enough range of multi-keys, it is also introducing a correctness bug, as reported in the ML some time ago. Search for Streams SessionStore - bug or documentation error (I originally thought it was a doc bug, but turns out Tom was reporting a real correctness issue).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

cc @twbecker who reported this issue before as well.

@cwildman
Copy link
Copy Markdown
Contributor

Thanks for digging into this @guozhangwang and the detailed explanation. One thing I'm not following is that the performance issue in https://issues.apache.org/jira/browse/KAFKA-7652 showed us spending significantly more time iterating entries in the NamedCache. If I'm understanding this fix correctly it should reduce the number of entries that would need to be scanned through the storeIterator, not the number of entries in the cacheIterator. Is there something I'm missing?

It will be great to test out with this performance improvement.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

guozhangwang commented Jan 14, 2019

@cwildman This is a question I was asking myself as well: from the code changes from 0.10.2.2 to 0.11.0.0, as listed above all the PRs does not have a direct impact on session store caching layer, or their impact should not be on the cache access frequency directly (e.g. https://github.com/apache/kafka/pull/2972/files or KIP-155). So the only thing I can conjecture is that, because of the bug hidden in underlying store which causes findSessions to miss some key entries, the aggregate processor is less effective in merging sessions into a single one, and hence resulted in more scattered sessions instead of fewer larger sessions. Overtime this would result in more fetches in the caching session as well (imagine you have lots of shorter sessions v.s. few longer sessions, the findSessions call would result in more fetches for the former case on both the cache and store with the same range).

Without having the real workload to reproduce 7652 it is hard to tell, so my plan is to get a cherry-pick PR for 0.11.0 as well and let @jonathanpdx to try it out.

@twbecker
Copy link
Copy Markdown
Contributor

Thanks for the fix @guozhangwang!

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 14, 2019

@guozhangwang I just realized that the is a bug in SessionStore JavaDocs, eg:

@param latestSessionStartTime the end timestamp of the latest session to search for

This should be the [start] timestamp... -- affects multiple methods. Can we fix it in this PR?

@mjsax mjsax added the streams label Jan 14, 2019
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang , This is a great find!

I'm still reading all the related code, but I've left one comment explaining what I think is going on with the (1) fix. Maybe you can correct me if I've gotten it wrong.

@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return findSessions(key, key, earliestSessionEndTime, latestSessionStartTime);
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe saving someone else from having to dig down and figure out why it's different...

Because of the way the keys are formatted, when we know we're looking for this exact key, we can search essentially from the key [my-key , start time] through [my-key , end time], whereas the other method we were delegating to has to scan from [start-key] to [end-key], which (because of the way the keys are structured) can't also bound the start and end time, but has to iterate over all the rocks keys and filter out the ones that fall outside the desired time bound.

Therefore, this change can significantly tighten the range that actually gets scanned in Rocks.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang I've made an initial pass, overall this looks good to me I've left just one comment/question.

What do you think about adding the description at the start of this PR documentation either as javadoc or in our docs?

.putLong(to)
// start can at most be equal to end
// the end timestamp can be as large as possible as long as it's larger than start time
.putLong(Long.MAX_VALUE)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang as I'm reading this PR and getting my head around how this operates for my understanding I'm going to rephrase the issue that this line fixes.

The core issue is that by setting the upper range to (to, to) caused us to find an "endTime" earlier than it should, due to fact that we store session windows as endTime, startTime hence we ended up missing sessions to retrieve which ended up causing more issues down the line.

Is that a fair statement?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is right. Note that the names here are a bit confusing, since the semantics of the findSessions is earliestEndTime (named from here) and latestStartTime (named to here). And going back to my single-key range example in #6134 (comment) but extending a bit to multi-key range the current code use [latestStartTime, latestStartTime] as the max-suffix to apply with key-to. This is an issue if latestStartTime is smaller than earliestEndTime: note in our aggregation it would never be the case, but for other users it is okay to call, e.g. findSessions(100, 1) which would use [1,1] as the suffix with key-to which would be missing entries.

I'll adjust the existing unit test to expose this bug as well.

.putLong(to)
// start can at most be equal to end
// the end timestamp can be as large as possible as long as it's larger than start time
.putLong(Long.MAX_VALUE)
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since OrderedBytes#upperRange chops off the key at the first key-byte that's smaller than the first suffix-byte, it seems like setting the "end timestamp" to max long would be counterproductive, since it'll result in chopping off the key at the first non-max-value byte? Maybe it would be better to use the first successor of the "to key"? Actually, we can consider just using https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/util/BytewiseComparator.java#L73

For example, if our search is:

key: 0xf000 0001 to: 0x0000 0000 0000 0000
=>
key: 0xf000 0001 suffix: 0x7fff ffff ffff ffff 0000 0000 0000 0000

Then, upperRange will trim our query to:

0xf 7fff ffff ffff ffff 0000 0000 0000 0000

which is way more permissive than the first successor to the key:

0xf000 0002

(It permits max_int - 2 = 2,147,483,645 more keys)

Please scrutinize my math, I might have made an error.

If there's no error, though, I think it might pay off to inline upperRange here, since the interactions between the methods are so important in determining what key we actually get.

final List<Long> rangeResults = new ArrayList<>();
while (rangeIterator.hasNext()) {
rangeResults.add(rangeIterator.next().value);
try (final KeyValueIterator<Windowed<String>, Long> iterator =
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the added unit test that does a findSession where the first parameter is larger than the second. Without the fix 2) it will fail.

The rest of the changes are just to wrap Closeable and does not have any logical changes.


assertThat(upper, equalTo(Bytes.wrap(SessionKeySchema.toBinary(
new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))
new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(0, Long.MAX_VALUE))))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test need to be modified because of the change 2).

@guozhangwang guozhangwang changed the title KAFKA-7652: Fix SessionStore's findSession(single-key) KAFKA-7652: Part I; Fix SessionStore's findSession(single-key) Jan 17, 2019
@guozhangwang
Copy link
Copy Markdown
Contributor Author

cc @ableegoldman as well.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@vvcephei I've thought about the fix in 2) again, but cannot come up with a better upper bound than this very-conservative measure. LMK if you have better ideas.

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang LGTM subject to other peoples comments.

verifyWindowedKeyValue(all.next(), new Windowed<>(keyAA, new SessionWindow(0, 0)), "1");
verifyWindowedKeyValue(all.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1");
assertFalse(all.hasNext());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra lines

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @guozhangwang LGTM

@guozhangwang guozhangwang merged commit 56139df into apache:trunk Jan 18, 2019
@guozhangwang guozhangwang deleted the K7652-session-find branch January 18, 2019 20:08
guozhangwang added a commit that referenced this pull request Jan 19, 2019
Let findSessions(final K key) to call on underlying bytes store directly, using the more restricted range.

Fix the conservative upper range for multi-key range in session schema.

Minor: removed unnecessary private WrappedSessionStoreBytesIterator class as it is only used in unit test.

Minor: removed unnecessary schema#init function by using the direct bytes-to-binary function.

Please read the original PR for more detailed explanation of the root cause of the bug.

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>
@guozhangwang
Copy link
Copy Markdown
Contributor Author

Cherry-picked to 2.1 as well.

abbccdda pushed a commit to abbccdda/kafka that referenced this pull request Jan 24, 2019
…e#6134)

Let findSessions(final K key) to call on underlying bytes store directly, using the more restricted range.

Fix the conservative upper range for multi-key range in session schema.

Minor: removed unnecessary private WrappedSessionStoreBytesIterator class as it is only used in unit test.

Minor: removed unnecessary schema#init function by using the direct bytes-to-binary function.

Please read the original PR for more detailed explanation of the root cause of the bug.


Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk:
  MINOR: fix race condition in KafkaStreamsTest (apache#6185)
  KAFKA-4850: Enable bloomfilters (apache#6012)
  MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test
  KAFKA-5117: Stop resolving externalized configs in Connect REST API
  MINOR: Cleanup handling of mixed transactional/idempotent records (apache#6172)
  KAFKA-7844: Use regular subproject for generator to fix *All targets (apache#6182)
  Fix Documentation for cleanup.policy is out of date (apache#6181)
  MINOR: increase timeouts for KafkaStreamsTest (apache#6178)
  MINOR: Rejoin split ssl principal mapping rules (apache#6099)
  MINOR: Handle case where connector status endpoints returns 404 (apache#6176)
  MINOR: Remove unused imports, exceptions, and values (apache#6117)
  KAFKA-3522: Add internal RecordConverter interface (apache#6150)
  Fix Javadoc of KafkaConsumer (apache#6155)
  KAFKA-6455: Extend CacheFlushListener to forward timestamp (apache#6147)
  MINOR: Log partition info when creating new request batch in controller (apache#6145)
  KAFKA-7652: Part I; Fix SessionStore's findSession(single-key) (apache#6134)
  MINOR: Remove the InvalidTopicException handling in InternalTopicManager (apache#6167)
  [KAFKA-7024] Rocksdb state directory should be created before opening the DB (apache#6138)
  MINOR:: Fix typos (apache#6079)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…e#6134)

Let findSessions(final K key) to call on underlying bytes store directly, using the more restricted range.

Fix the conservative upper range for multi-key range in session schema.

Minor: removed unnecessary private WrappedSessionStoreBytesIterator class as it is only used in unit test.

Minor: removed unnecessary schema#init function by using the direct bytes-to-binary function.

Please read the original PR for more detailed explanation of the root cause of the bug.


Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants