Skip to content

KAFKA-7652: Part III; Put to underlying before Flush#6191

Merged
guozhangwang merged 23 commits intoapache:trunkfrom
guozhangwang:K7652-caching-session-remove
Feb 13, 2019
Merged

KAFKA-7652: Part III; Put to underlying before Flush#6191
guozhangwang merged 23 commits intoapache:trunkfrom
guozhangwang:K7652-caching-session-remove

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Jan 24, 2019

  1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see MINOR: Improve Join integration test coverage, PART I #4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.

  2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending null -> null in this case.

Modifies corresponding unit tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@mjsax @bbejeck @vvcephei

@mjsax mjsax added the streams label Jan 27, 2019
entry.entry().context().timestamp());
} else {
if (flushListener != null) {
final byte[] newValueBytes = entry.newValue();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an optimization that I did for all three caching stores:

  1. get the new bytes from cache, read the old bytes from underlying store.
  2. if either old / new bytes are not null, go to 3) below; otherwise skip so that we do not need to deserialize.
  3. deserialize to objects, and apply flush listener to downstream.

final AGG newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null;
final AGG oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null;
// we need to get the old values if needed, and then put to store, and then flush
bytesStore.put(bytesKey, entry.newValue());
Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang Jan 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another optimization I did for window / session stores: previously we deserialize bytes to windowed<K>, and then we serialize it back to windowed<Bytes> for underlying.put, which is a bit waste.

Now what I did is bytes -> windowed<Bytes> -> windowed<K> where the first deser is always executed, while the second deser is executed only if new / old bytes are not null. Note that second deser actually only does Bytes -> K and we just wrap with the same window.


// this is an optimization: if this key did not exist in underlying store and also not in the cache,
// we can skip flushing to downstream as well as writing to underlying store
if (newValueBytes != null || oldValueBytes != null) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual fix here: we need to 1) read the old bytes, and 2) put the new bytes to underlying, and 3) apply flush listener.

Previously the ordering was 1) -> 3) -> 2), which has an issue that if the flushed downstream access the store, it does not have the new data yet which is incorrect. This fix was merged into KVStore some time ago, but we did not do the same for window / session store here.

kafkaStreams.start();

waitUntilAtLeastNumRecordProcessed(outputTopic, 2);
waitUntilAtLeastNumRecordProcessed(outputTopic, 1);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the effect of the optimization: we will only sent down one record now, and the second null record will be omitted.

final KTable<String, Integer> table1 = builder.table(topic1, consumed);

final KTable<String, Integer> table2 = table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
final KTable<String, Integer> table2 = table1.filter(predicate,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed since with the optimization, we will omit flush those unnecessary nulls.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not update the expected test output? Or better, have a test for both cases?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be covered in the store test case itself, not the KTableFilter here. I've added a separate test case for this purpose.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well. Guess the "problem" is, that it's unclear what this test is supposed to test. The method name is not very good.

It seems to be about the case, that the filter does the right thing based on the input data.

  • testKTable -- table store is not materialized thus everything is pass-through
  • testQueryableKTable -- table store is materialized (we disable caching to preserve pass-through behavior)
    similar below for
  • testNotSendingOldValue (not materialized)
  • testQueryableNotSendingOldValue (preserve pass-through via disabling caching)

Does this sound correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds right to me. Let me rename the test cases a bit to be more clear.

assertNull(store.get(3));
assertEquals("four", store.get(4));
assertEquals("five", store.get(5));
store.flush();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here due to the optimization.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this test case I've tried to do that, but there's some trickiness about that: 1) AbstractKVStoreTest is shared in five tests, and only one of them is caching store which is affected. 2) this effect is dependent on the caching size.

So I've decided to add a separate test in CachingKVStoreTest only for this optimization effectiveness.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, testPutGetRange is not a good name -- maybe that is what confused me.

Also, if we do the flush() to be able to share code, maybe a comment about the why -- basically, a test should work for caching or non-caching (or explicitly state in the test name that is test the one or other behavior).

store.setFlushListener(cacheFlushListener, true);
store.put(bytesKey("1"), bytesValue("a"));
store.flush();
assertEquals("a", cacheFlushListener.forwarded.get("1").newValue);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The augmented unit tests here and below are for 1) demonstrating the optimization, 2) demonstrating the bug fix as well.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@bbejeck @vvcephei @ableegoldman @mjsax for reviews.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM @guozhangwang . Thanks!

I left one comment to maybe improve readability, and one question about an additional possible optimization (but maybe out of scope).

Thanks,
-John


// this is an optimization: if this key did not exist in underlying store and also not in the cache,
// we can skip flushing to downstream as well as writing to underlying store
if (newValueBytes != null || oldValueBytes != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we apply deMorgan's rule and flip the conditional (with an empty body) here?

It seems easier to understand:

if (newValueBytes == null && oldValueBytes == null) {
  // no need to flush or write to underlying store
} else {
  ... the rest of the code
}

In other words, the empty "skip" block is more self-documenting than the preceeding comment explaining the algorithm.

Which actually makes me wonder: is there a more general optimization that we can skip desserialization, flush, and write any time the new and old values are identical?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot generally do this optimization if oldBytes == newBytes, since the timestamp may be updated for flushListener.apply.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an empty then-body would be bad code style.

Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey folks, I'm slightly modifying this logic to NOT always blindly read from the underlying store, but do sth. like this:

            final byte[] newValueBytes = entry.newValue();
            final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.get(entry.key()) : null;

            if (newValueBytes != null || oldValueBytes != null) {
                    ....
            }

The main motivation is that, for session stores, the likelihood of newValueBytes == null could be high while for other two types, the likelihood would be low. As a result, for other two types we would fail the above condition and if sendOldValues == false we would not need to read the old bytes for this optimization since it is doomed to not happen.


// this is an optimization: if this key did not exist in underlying store and also not in the cache,
// we can skip flushing to downstream as well as writing to underlying store
if (newValueBytes != null || oldValueBytes != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an empty then-body would be bad code style.

final KTable<String, Integer> table1 = builder.table(topic1, consumed);

final KTable<String, Integer> table2 = table1.filter(predicate, Materialized.as("anyStoreNameFilter"));
final KTable<String, Integer> table2 = table1.filter(predicate,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not update the expected test output? Or better, have a test for both cases?

assertNull(store.get(3));
assertEquals("four", store.get(4));
assertEquals("five", store.get(5));
store.flush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

cachingStore.flush();
cachingStore.remove(a);
cachingStore.flush();
//cachingStore.flush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 11, 2019

Failed with checkstyle error:

14:47:08 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java:41:8: Unused import - java.io.IOException. [UnusedImports]

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Failed with checkstyle error:

Fixed it on my latest commit, did not push since I thought you have more comments to come.

} else {
if (flushListener != null) {
final byte[] newValueBytes = entry.newValue();
final byte[] oldValueBytes = underlying.get(entry.key());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we only flush() if newValueBytes != null || oldValueBytes != null, I think can actually do get get() only, if newValueBytes == null || sendOldValues is true. Same for the other stores.

Thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same idea: #6191 (comment)

if (newValueBytes != null || oldValueBytes != null) {
final K key = serdes.keyFrom(entry.key().get());
final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null;
final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check sendOldValues here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, since I've add the check above I can remove it here. Ditto elsewhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Actually, it is not correct: when sendOldValues is false, we should never send old values downstreams. So suppose newValueBytes != null, and hence we read the underlying store, we still need to have the check here so that we can have oldValue as null.

final Windowed<Bytes> bytesKey = SessionKeySchema.from(binaryKey);
if (flushListener != null) {
final byte[] newValueBytes = entry.newValue();
final byte[] oldValueBytes = bytesStore.fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. Only do fetchSession if newValueBytes == null || sendOldValues is true

if (newValueBytes != null || oldValueBytes != null) {
final Windowed<K> key = SessionKeySchema.from(bytesKey, serdes.keyDeserializer(), topic);
final AGG newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null;
final AGG oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check sendOldValues here?

} finally {
context.setRecordContext(current);
final byte[] newValueBytes = entry.newValue();
final byte[] oldValueBytes = underlying.fetch(key, windowStartTimestamp);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

assertNull(store.get(3));
assertEquals("four", store.get(4));
assertEquals("five", store.get(5));
store.flush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, testPutGetRange is not a good name -- maybe that is what confused me.

Also, if we do the flush() to be able to share code, maybe a comment about the why -- basically, a test should work for caching or non-caching (or explicitly state in the test name that is test the one or other behavior).

store.flush();
assertEquals("a", cacheFlushListener.forwarded.get("1").newValue);
assertNull(cacheFlushListener.forwarded.get("1").oldValue);
store.put(bytesKey("1"), bytesValue("b"));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do a third put store.put(bytesKey("1"), bytesValue("c")); here and test if old value is a and new value is c to make sure we return the correct old value -- with 2 puts, it's unclear it it's correct. (If would be incorrect if oldValue would be b)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@@ -176,10 +179,14 @@ public void shouldRemove() {
cachingStore.put(b, "2".getBytes());
cachingStore.flush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this flush if we don't need the one below?

assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
cachingStore.put(bytesKey("1"), bytesValue("b"));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

cachingStore.flush();
assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cachingStore.put(bytesKey("1"), bytesValue("b"));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang overall looks good to me I just have one minor question and I'll take another pass after you update the PR as mentioned in https://github.com/apache/kafka/pull/6191/files#r255764702

// we can skip flushing to downstream as well as writing to underlying store
if (newValueBytes != null || oldValueBytes != null) {
final Windowed<K> windowedKey = WindowKeySchema.fromStoreKey(windowedKeyBytes, serdes.keyDeserializer(), serdes.topic());
final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that after setting final Windowed<K> key in the putAndMaybeForward method the code is more or less the same for all caching stores in question.

Just a thought, but would it be worth extracting the logic from the two similar methods and placing them in a method of WrappedStateStore.AbstractStateStore

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think AbstractStateStore is not a good place to share this logic, since it is not only for wrapping caching layers, but also for other layers as well.

I feel good about keeping these three functions separated so far, mainly because their callees (fetches) are still different.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Updated per comments.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 13, 2019

Tests failed, because trunk was broken. Fixed via #6257

Retest this please. (If it fails again, this needs to be rebased.)

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Feel free to merge after build is green.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Pushed another commit to fix checkstyle fixes (junit upgrade causing deprecated assertion APIs).

@guozhangwang guozhangwang merged commit 0a1c269 into apache:trunk Feb 13, 2019
guozhangwang added a commit that referenced this pull request Feb 13, 2019
1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see #4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.

2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending `null -> null` in this case.

Modifies corresponding unit tests.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
@guozhangwang
Copy link
Copy Markdown
Contributor Author

Cherry-picked to 2.2 as well.

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see apache#4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.

2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending `null -> null` in this case.

Modifies corresponding unit tests.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants