Skip to content

MINOR: Improve Join integration test coverage, PART I#4331

Closed
guozhangwang wants to merge 23 commits intoapache:trunkfrom
guozhangwang:KMinor-join-integration-tests
Closed

MINOR: Improve Join integration test coverage, PART I#4331
guozhangwang wants to merge 23 commits intoapache:trunkfrom
guozhangwang:KMinor-join-integration-tests

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Dec 15, 2017

  1. Rename JoinIntegrationTest to StreamStreamJoinIntegrationTest, which is only for KStream-KStream joins.
  2. Extract the AbstractJoinIntegrationTest which is going to be used for all the join integration test classes, parameterized with and without caching.
  3. Merge KStreamRepartitionJoinTest.java into StreamStreamJoinIntegrationTest.java with augmented stream-stream join.
  4. Add TableTableJoinIntegrationTest with detailed per-step expected results and removed KTableKTableJoinIntegrationTest.

Findings of the integration test:

  1. Confirmed KAFKA-4309 with caching turned on.
  2. Found bug KAFKA-6398.
  3. Found bug KAFKA-6443.
  4. Found a bug that in CachingKeyValueStore, we would flush before putting the record into the underlying store, when the store is going to be used in the downstream processors with flushing it would result in incorrect results, fixed the issue along with this PR.
  5. Consider a new optimization described in KAFKA-6286.

Future works including stream-table joins will be in other PRs.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)


final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;

underlying.put(entry.key(), entry.newValue());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional as a bug fix. See description.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we want to have an else clause that still performs underlying.put in case flushListener is null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, and I think this is the root cause of the jenkins failure.

@guozhangwang guozhangwang changed the title [WIP] MINOR: Improve Join integration test coverage, PART I MINOR: Improve Join integration test coverage, PART I Jan 11, 2018
@guozhangwang
Copy link
Copy Markdown
Contributor Author

@mjsax @bbejeck

@Rule
public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());

@Parameterized.Parameters
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO we should consider changing to @Parameterized.Parameters(name = "caching enabled = {0}") which prints the whether caching is enabled or not vs. just the index of the parameter.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

void runTest(final List<List<String>> expectedResult, final String storeName) throws Exception {
assert expectedResult.size() == input.size();

System.out.println(builder.build().describe());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional?

TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
System.out.println("RESULT: " + finalResultReached.get());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover debugging?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jan 11, 2018

left some comments.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jan 11, 2018

@guozhangwang I think the failure could be related (or maybe a rebase is needed), I can reproduce locally.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@bbejeck Thanks for reminding, the leftover debugging was my bad..

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang LGTM

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Local runs saved about 40 seconds for the streams unit test suite.

if (flushListener != null) {

final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
underlying.put(entry.key(), entry.newValue());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually needed? If a downstream processor tries to get this value won't it get it from the cache? i.e., i don't think an evicted entry is removed from the cache until after flush has finished

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it will be removed, i.e. when listener.apply(entries); is called the entry is no longer in the cache anymore.

What I observed originally is an issue when caching is turned off: note that with that case we still go through this code path (which should be optimized away anyways in the future I think), when you call a put on the store, it will immediately trigger flush and hence be processed downstream while it is not put into the underlying and also not in the cache any more.


checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);

if (storeName != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: java style

}
}

if (storeName != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang. Left a couple of comments, but overall LGTM. Feel free to merge once you've addressed them

assertThat(onlyEntry.value, is(expectedFinalResult));
assertThat(all.hasNext(), is(false));

all.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will never be called if one of the assertions fails

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Thanks for your reviews. Merged to trunk.

guozhangwang added a commit that referenced this pull request Feb 13, 2019
1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see #4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.

2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending `null -> null` in this case.

Modifies corresponding unit tests.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
guozhangwang added a commit that referenced this pull request Feb 13, 2019
1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see #4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.

2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending `null -> null` in this case.

Modifies corresponding unit tests.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
1. In the caching layer's flush listener call, we should always write to the underlying store, before flushing (see apache#4331 's point 4) for detailed explanation). When fixing 4331, it only touches on KV stores, but it turns out that we should fix for window and session store as well.

2. Also apply the optimization that was in session-store already: when the new value bytes and old value bytes are all null (this is possible e.g. if there is a put(K, V) followed by a remove(K) or put(K, null) and these two operations only hit the cache), upon flushing this mean the underlying store does not have this value at all and also no intermediate value has been sent to downstream as well. We can skip both putting a null to the underlying store as well as calling the flush listener sending `null -> null` in this case.

Modifies corresponding unit tests.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
@guozhangwang guozhangwang deleted the KMinor-join-integration-tests branch April 24, 2020 23:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants