Skip to content

KAFKA-7944: Improve Suppress test coverage#6382

Merged
guozhangwang merged 6 commits intoapache:trunkfrom
vvcephei:KAFKA-7944-suppress-window-test
Mar 12, 2019
Merged

KAFKA-7944: Improve Suppress test coverage#6382
guozhangwang merged 6 commits intoapache:trunkfrom
vvcephei:KAFKA-7944-suppress-window-test

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei commented Mar 6, 2019

  • add a normal windowed suppress with short windows and a short grace
    period
  • improve the smoke test so that it actually verifies the intended
    conditions

See https://issues.apache.org/jira/browse/KAFKA-7944

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

* add a normal windowed suppress with short windows and a short grace
  period
* improve the smoke test so that it actually verifies the indended
  conditions
Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @mjsax @bbejeck @ableegoldman ,

In response to a mailing list message that they still saw a problem with Suppress after the fix, I bumped up the priority of KAFKA-7944. (this was similar to the reporter's topology)

I also took the time to double check we weren't getting a false negative (test passing without the intended verification), and learned that actually we could get false negatives for a number of reasons:

  • we could process the entire data set before the first bounce.
  • sometimes, the node that we bounce hadn't actually processed anything (it was all assigned to the other node).

I also wanted to verify the smoke test app with EOS.

Since I took the time to refactor the smoke_test system test to achieve these goals, and there was a large overlap with the bounce test, I also unified the tests by adding the crash parameter to the smoke test.

I requested a review from you all so that we can get as much scrutiny on this system test as possible. Trying not to create a new flaky test here...

final ArrayList<SmokeTestClient> clients = new ArrayList<>();

CLUSTER.createTopics(SmokeTestDriver.topics());
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More stable and future-proof test setup code. I needed this because I added a second test here, which I've since removed.

Leaving this change, though, so we won't have to waste time debugging again next time we try to add a test.

public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[] {false}, new Object[] {true});
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused!

final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
streamsClient.setStateListener((newState, oldState) -> {
System.out.printf("%s: %s -> %s%n", name, oldState, newState);
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this while debugging the smoke test. Printing the time allows us to correlate events across test nodes and with the test logs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!


streamify(smallWindowSum, "sws-raw");
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a "small window sum" stream to the topology, which does a "normal" windowed computation. The exact data production timing is non-deterministic, so we can't verify the results, but we can verify that there is exactly one result per windowed key in the suppressed stream


public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = new String[] {
private static final String[] TOPICS = {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently this is unnecessary now?

processor2.start()
monitor3.wait_until('REBALANCING -> RUNNING',
timeout_sec=120,
err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor2.node.account)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, wait for the node to start. This isn't strictly necessary, since we're just going to wait for "processed" next, but I thought it made the tests more readable and debuggable. For example, when you get a test failure, you'll know whether the node failed to start or whether it started, but didn't process anything.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment: I actually think it is indeed necessary to avoid flakiness, remember @bbejeck once talked about it in an older PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that was a different situation, like maybe we stopped and then re-started a node? In that case, you need to grab a new monitor after you stop and before you re-start, to be sure your grep won't match the "started" message from the first time it was running.

Since we're querying the same monitor here in both blocks, we're grepping over the same range of the file. The monitor pins the start-point for grepping when you create it. For example, since we create this monitor before we start Streams, it uses "byte 1" as the starting points for all greps for both REBALANCING -> RUNNING and processed.

)

# make sure we're not already done processing (which would invalidate the test)
self.driver.node.account.ssh("! grep 'Result Verification' %s" % self.driver.STDOUT_FILE, allow_fail=False)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, before bouncing the node, make sure the verification isn't already done, which would be pointless.

self.processor2.stop()
self.processor3.stop()
self.processor4.stop()
processor3.stop()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is already over. We can stop the app gracefully.

node = self.driver.node
node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
if crash and not eos:
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we crash without EOS, we might process some duplicates (this is the whole point of EOS), so Streams is operating properly if it either does exactly the right thing or processes too many records.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm.

if crash and not eos:
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
else:
self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all other cases, we expect to get exactly the right result.

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Mar 6, 2019

@bbejeck bbejeck added the streams label Mar 6, 2019
@bbejeck bbejeck requested review from guozhangwang and mjsax March 6, 2019 18:32
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 6, 2019

\cc @ableegoldman for review

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated test @vvcephei this LGTM (assuming system tests pass) just left a couple of minor comments.

public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey,
final Duration timeToSpend,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call!


class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there were already existingEOS tests that use the StreamsSmokeTest?


processor2.stop_nodes(not crash)

processor3.start()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to at least processor3 starts successfully here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could wait on that, it might make the test more readable. But it's not strictly necessary, since if it doesn't start, then we won't finish processing.

I'll add it for readability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, it occurred to me during the change that it also has the side benefit of making sure that the third processor does process some events.

@guozhangwang
Copy link
Copy Markdown
Contributor

org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest.initializationError consistently failing.

final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
streamsClient.setStateListener((newState, oldState) -> {
System.out.printf("%s: %s -> %s%n", name, oldState, newState);
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!

}


@SuppressWarnings("DynamicRegexReplaceableByCompiledPattern")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, oops. I'll remove it.

public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey,
final Duration timeToSpend,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @vvcephei I'm wondering if we can simplify the code further: with timeToSpend now can we just remove autoTerminate?

  1. SmokeTestDriverIntegrationTest.java: we are sending 1000 * 10 = 10K records only, so likely that will stop even before the second instance can be started, similar to the issue you observed in system test. I'd suggest we just set timeToSpend to 10 seconds so we have some enough time to start up to 10 / 1 (sleep time) = 10 instances.

  2. As for StreamsSmokeTest itself, we only disableAutoTerminate in three of StreamsUpgradeTest cases, since we do not need to verify the number of records sent / received but only check upgrade completed. We can also use 30 seconds (not sure if it is sufficient, but we can get on average how much time those three tests will take from our nightlies), and even if the test completes before that it will call driver.stop to force-stop the driver anyways.

public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey,
final Duration timeToSpend,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course we can then remove the whole disableAutoTerminate thing from StreamsSmokeTest.

import time


class StreamsBounceTest(KafkaTest):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

// this starts the stream processing app
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
break;
case "process-eos":
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use StreamsEosTest instead? See my other comment below.


class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my other comment above: what's the difference of running StreamsSmokeTestBaseService#process-eos v.s. StreamsEosTestBaseService#process? The former uses StreamsSmokeTest while the latter use StreamsEosTest client. Can we just consolidate them?

node = self.driver.node
node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
if crash and not eos:
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm.

processor2.start()
monitor3.wait_until('REBALANCING -> RUNNING',
timeout_sec=120,
err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor2.node.account)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a comment: I actually think it is indeed necessary to avoid flakiness, remember @bbejeck once talked about it in an older PR?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 7, 2019

Both Java 8 and Java 11 failed but build results already cleaned up.

retest this please

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Mar 7, 2019

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Mar 7, 2019

Reported unrelated failure: https://issues.apache.org/jira/browse/KAFKA-7965

Retest this, please.

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Mar 8, 2019

test results are unavailable.

Retest this, please.

Comment thread streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java Outdated
@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM to merge as-is (replied to @vvcephei 's question about eos test), leave it to @bbejeck for merging

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Mar 8, 2019

Thanks for the response, @guozhangwang . I've created https://issues.apache.org/jira/browse/KAFKA-8080 to follow up.

@vvcephei
Copy link
Copy Markdown
Contributor Author

test results are unavailable.

Retest this, please.

@guozhangwang guozhangwang merged commit 8e97540 into apache:trunk Mar 12, 2019
@vvcephei
Copy link
Copy Markdown
Contributor Author

Thanks, @guozhangwang !

@vvcephei vvcephei deleted the KAFKA-7944-suppress-window-test branch March 12, 2019 19:34
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* warn-apache-kafka/trunk: (41 commits)
  MINOR: Avoid double null check in KStream#transform() (apache#6429)
  KAFKA-7944: Improve Suppress test coverage (apache#6382)
  KAFKA-3522: add missing guards for TimestampedXxxStore (apache#6356)
  MINOR: Change Trogdor agent's cleanup executor to a cached thread pool (apache#6309)
  KAFKA-7976; Update config before notifying controller of unclean leader update (apache#6426)
  KAFKA-7801: TopicCommand should not be able to alter transaction topic partition count
  KAFKA-8091; Wait for processor shutdown before testing removed listeners (apache#6425)
  MINOR: Update delete topics zk path in assertion error messages
  KAFKA-7939: Fix timing issue in KafkaAdminClientTest.testCreateTopicsRetryBackoff
  KAFKA-7922: Return authorized operations in Metadata request response (KIP-430 Part-2)
  MINOR: Print usage when parse fails during console producer
  MINOR: fix Scala compiler warning (apache#6417)
  KAFKA-7288; Fix check in SelectorTest to wait for no buffered bytes (apache#6415)
  KAFKA-8065: restore original input record timestamp in forward() (apache#6393)
  MINOR: cleanup deprectaion annotations (apache#6290)
  KAFKA-3522: Add TimestampedWindowStore builder/runtime classes (apache#6173)
  KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp (apache#6401)
  KAFKA-8070: Increase consumer startup timeout in system tests (apache#6405)
  KAFKA-8040: Streams handle initTransactions timeout (apache#6372)
  KAFKA-7980 - Fix timing issue in SocketServerTest.testConnectionRateLimit (apache#6391)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
* add a normal windowed suppress with short windows and a short grace
period
* improve the smoke test so that it actually verifies the intended
conditions

See https://issues.apache.org/jira/browse/KAFKA-7944

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
mjsax pushed a commit that referenced this pull request Dec 4, 2025
After #6382, the system test
streams_eos_test.py is redundant. As in
#20718, the verification logic has
already been migrated, so we only need to delete the related system
tests

Reviewers: Matthias J. Sax <matthias@confluent.io>
shashankhs11 pushed a commit to shashankhs11/kafka that referenced this pull request Dec 15, 2025
After apache#6382, the system test
streams_eos_test.py is redundant. As in
apache#20718, the verification logic has
already been migrated, so we only need to delete the related system
tests

Reviewers: Matthias J. Sax <matthias@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants