MINOR: add test for StreamsSmokeTestDriver#6231
MINOR: add test for StreamsSmokeTestDriver#6231bbejeck merged 14 commits intoapache:trunkfrom vvcephei:test-streams-smoketest
Conversation
Also, add more output for debuggability
|
Hi @bbejeck @mjsax @ableegoldman @guozhangwang , Please take a look at this when you get the chance. The primary concern is adding the test. It will help us verify changes to the smoke test (such as adding suppression). I've also added some extra output to the smoke test stdout, which will hopefully aid us in diagnosing the flaky tests. Finally, I bundled in some cleanup. It was my intention to do that in a separate PR, but it wound up getting smashed together during refactoring. Please let me know if you'd prefer for me to pull any of these out into a separate request. Thanks, |
| * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number of Kafka brokers. | ||
| */ | ||
| public class EmbeddedKafkaCluster extends ExternalResource { | ||
| public class EmbeddedKafkaCluster extends ExternalResource implements AutoCloseable { |
There was a problem hiding this comment.
Allows using the embedded kafka with try-with-resources.
| public class SmokeTestClient extends SmokeTestUtil { | ||
|
|
||
| private final Properties streamsProperties; | ||
| private final String name; |
There was a problem hiding this comment.
adds a name to the streams instance to help correlate logs and stdout events
| e.printStackTrace(); | ||
| } | ||
| streams.setUncaughtExceptionHandler((t, e) -> { | ||
| System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
There was a problem hiding this comment.
add new outputs with the name, but avoid touching existing lines that are used in the system tests.
There was a problem hiding this comment.
To what extend are existing lines used? How difficult would it be to update the testa to avoid duplicate output? For example, if we only grep for the pattern, adding a prefix name should not require any updates to system test code.
There was a problem hiding this comment.
Yeah, I've been thinking about this... I agree I should either confirm your suspicion or just ditch the new lines.
There was a problem hiding this comment.
I agree I think that by adding a prefix with space should be fine for the test.
There was a problem hiding this comment.
👍 I've removed the unprefixed lines, and am running the system tests to see.
| import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; | ||
| import static org.apache.kafka.streams.tests.SmokeTestDriver.verify; | ||
|
|
||
| public class SmokeTestDriverTest { |
There was a problem hiding this comment.
Here's the new test.
| e.printStackTrace(); | ||
| } | ||
| streams.setUncaughtExceptionHandler((t, e) -> { | ||
| System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
There was a problem hiding this comment.
To what extend are existing lines used? How difficult would it be to update the testa to avoid duplicate output? For example, if we only grep for the pattern, adding a prefix name should not require any updates to system test code.
| final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); | ||
| final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); | ||
| final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); | ||
| final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); |
There was a problem hiding this comment.
Why do we need to verify the input topic?
There was a problem hiding this comment.
I was curious if there was dirty state from a prior run (there was). Also, this enables us to print the input events for the same key so we can analytically verify where the output went wrong.
|
|
||
| final int recordsGenerated = allData.size() * maxRecordsPerKey; | ||
| int recordsProcessed = 0; | ||
| final Map<String, AtomicInteger> processed = Stream.of("data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg").collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); |
| } | ||
|
|
||
| private static boolean verifyMin(final Map<String, Integer> map, final Map<String, Set<Integer>> allData, final boolean print) { | ||
| private static <V> void addEvent(final String key, final HashMap<String, LinkedList<V>> eventsMap, final V value) { |
There was a problem hiding this comment.
Don't think we need this method. Use Map#computeIfAbsent() instead?
| final HashMap<String, LinkedList<Long>> cntEvents = new HashMap<>(); | ||
| final HashMap<String, LinkedList<Double>> avgEvents = new HashMap<>(); | ||
| final HashMap<String, LinkedList<Long>> wcntEvents = new HashMap<>(); | ||
| final HashMap<String, LinkedList<Long>> taggEvents = new HashMap<>(); |
There was a problem hiding this comment.
Why do we need all output events? Also, if we store all output events, why do we still need the current once from below? Seems redundant?
There was a problem hiding this comment.
Also for debugging. If a key has the wrong value, we can compare the input and output events to see where it went wrong.
Yes, it's redundant with the last result below, I can take it out.
| } | ||
|
|
||
| @Test | ||
| public void shouldWorkWithRebalance() throws InterruptedException, IOException { |
There was a problem hiding this comment.
I agree, just throw Exception here.
| Thread.sleep(1000); | ||
|
|
||
| // add a new client | ||
| final SmokeTestClient smokeTestClient = new SmokeTestClient("streams" + numClientsCreated++); |
There was a problem hiding this comment.
nit: add dash -> "streams-" + numClientsCreated++
| } | ||
| for (final SmokeTestClient client : clients) { | ||
| client.close(); | ||
| } |
There was a problem hiding this comment.
Sorry, I don't follow. The first one is async and tells all the instances to stop. The second one blocks until they do stop. Otherwise, we'd be waiting for them to stop one at a time.
There was a problem hiding this comment.
Ups. That comment is weird. I meant: Why do we need both loops?
|
Question for the reviewers: We currently base our expectations on the data that we produced (I.e., the producer encodes the expected values in the key itself), and this is what we compare the results against. It seems more resilient to instead consume the input topic and compute the expected results analytically (by finding the max, min, etc.). Doing so would render us blind to input events that got dropped (or otherwise corrupted) by the broker, though. Alternatively, now that we're printing the input events and output events when there's a failure, we can more easily reason about what actually happened. So maybe we should instead just take the step in this PR and then decide what to do from there? What do you think? |
| e.printStackTrace(); | ||
| } | ||
| streams.setUncaughtExceptionHandler((t, e) -> { | ||
| System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
There was a problem hiding this comment.
I agree I think that by adding a prefix with space should be fine for the test.
| } | ||
|
|
||
| @Test | ||
| public void shouldWorkWithRebalance() throws InterruptedException, IOException { |
There was a problem hiding this comment.
I agree, just throw Exception here.
| final int maxRecordsPerKey) { | ||
| return generate(kafka, numKeys, maxRecordsPerKey, true); | ||
| } | ||
|
|
There was a problem hiding this comment.
Just to simplify the paths. It was used in exactly one place. Might as well just pass the flag from there.
I am fine with this approach.
I am actually a little concerned with this, because we pipe a lof of data through... Not sure if it is feasible (or helpful) to compare all the raw data manually in case of failure. It's like searching a needle in a haystack. |
|
@mjsax :
Note, it's only the raw data for the relevant key. Is there a better way to approach this that I'm not seeing? |
Maybe. Not sure.
Don't think so. Was just raising a question. Have no better suggestion. |
| streams4.close(); | ||
|
|
||
| System.out.println("shutdown"); | ||
| } |
There was a problem hiding this comment.
Removed the main method. It actually didn't work, and it wasted a bunch of my time debugging it. Its function is now served by the integration test.
| if (retry++ > MAX_RECORD_EMPTY_RETRIES) { | ||
| break; | ||
| } | ||
| break; |
There was a problem hiding this comment.
The prior logic would keep polling for up to 30 seconds after we pulled as many records as we generated, as long as the verification kept failing.
If we pull more than we generated, it's already a failure, so we might as well fail fast. If you agree, then the only remaining logic was to poll for 30 seconds in .5 second increments. Now that we have long polling in the consumer, we might as well just poll for up to 30 seconds once.
There was a problem hiding this comment.
Just one minor point, if we don't have EOS enabled, we could end up pulling duplicates in some tests like the StreamsUpgradeTest::upgrade_downgrade_brokers
There was a problem hiding this comment.
This is a good point.
I have noticed in running the test locally that the long-poll does seem slower than our prior code here, so maybe I'll go ahead and revert this part of the diff.
| break; | ||
| default: | ||
| System.out.println("unknown topic: " + record.topic()); | ||
| for (final ConsumerRecord<String, byte[]> record : records) { |
There was a problem hiding this comment.
This refactor is just to eliminate redundancies in the code and hopefully make it easier to read.
| final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); | ||
| final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); | ||
| final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); | ||
| final String[] topics = {"data", "echo", "max", "min", "dif", "sum", "cnt", "avg", "tagg"}; |
There was a problem hiding this comment.
note: "wcnt" is removed from the list. We weren't verifying it, and it turns out, we don't produce to it at all. So I guess it's a fossil from a prior refactor.
| } | ||
|
|
||
| @Test | ||
| public void shouldWorkWithRebalance() throws InterruptedException { |
There was a problem hiding this comment.
A test you can run to make sure the smoke test driver actually works.
There was a problem hiding this comment.
Thanks for this @vvcephei overall looks good. I just have one minor comment.
Additionally, I'll kick off a branch builder for the streams upgrade test which is failing at the moment and it will be a good test to see if looking through the results is any easier than our current approach.
EDIT: I just saw that is the system test you kicked off. So I'll just wait until it's done and look through the results then.
| if (retry++ > MAX_RECORD_EMPTY_RETRIES) { | ||
| break; | ||
| } | ||
| break; |
There was a problem hiding this comment.
Just one minor point, if we don't have EOS enabled, we could end up pulling duplicates in some tests like the StreamsUpgradeTest::upgrade_downgrade_brokers
|
@bbejeck , the system test job is: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2319/console I did notice one failure scroll by, and I had the same thought. I'd definitely welcome your thoughts on seeing the output in context. |
|
Java 11 build was healthy, but timed out after 3 hours :( Java8 had one unrelated test failure:
Retest this, please. |
|
Java 11 passed: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2182/ |
|
@mjsax I think this PR is ready. Do you mind making another quick pass when you have a chance? Thanks, |
| if (!verificationResult.passed()) { | ||
| verificationResult = verifyAll(inputs, events); | ||
| } | ||
| success &= verificationResult.passed(); |
There was a problem hiding this comment.
no, it's only a success if all the checks pass. So I think it's correct as an "and". Does that seem right?
There was a problem hiding this comment.
Ack. I guess I mixed up old code and new code -- also the comment give it one more try if it's not already passing is confusing. The interplay between verificationResult.passed() and success seems a little convoluted.
| final Number value; | ||
| switch (record.topic()) { | ||
| case "data": { | ||
| value = intSerde.deserializer().deserialize("", record.value()); |
There was a problem hiding this comment.
We should pass in record.topic() instead of "" into deserialize.
| if (print) { | ||
| System.out.println("verifying min"); | ||
| case "echo": { | ||
| value = intSerde.deserializer().deserialize("", record.value()); |
There was a problem hiding this comment.
This is s repetition of "data" case -- similar below -- we should put all int/long/double cases etc together to void code duplication using "case-fall-through" pattern.
There was a problem hiding this comment.
I actually did that in my follow-up PR. I'll go ahead and pull it into this one.
| private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs, | ||
| final Map<String, Map<String, LinkedList<Number>>> events) { | ||
| final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); | ||
| final PrintStream resultStream = new PrintStream(byteArrayOutputStream); |
There was a problem hiding this comment.
Should we use try-with-resource here?
| pass = verifyTAgg(resultStream, inputs, events.get("tagg")); | ||
| pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin); | ||
| pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); | ||
| pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); |
There was a problem hiding this comment.
Is it a good idea to convert long to int here? Are we sure it's zero risk? Also, it it future proof if we change the test data with potential bigger values that don't fit into ints?
There was a problem hiding this comment.
It's not converting long to int. It's reading a Number as an int. Since we populated the Number with an int to begin with, it should be perfectly safe.
| pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax); | ||
| pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue()); | ||
| pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum); | ||
| pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L); |
| if (!expected.equals(actual)) { | ||
| resultStream.printf("fail: key=%s %s=%s expected=%s%n\t inputEvents=%s%n\toutputEvents=%s%n", | ||
| key, | ||
| topicName, |
There was a problem hiding this comment.
why use topicName here? Should it not be failed: key=X actual=A expected=B... instead of failed: key=X <topicName>=A expected=B...
There was a problem hiding this comment.
oh, I was just re-producing the existing log message, which says something like failed: key=X min=A expected=B.
I agree with you. I'll refactor it a bit.
|
Addressed the comments and pulled in some additional improvements I made while debugging the suppression logic. Kicked off another system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2343/ System tests all passed, except for the upgrade test, which is currently broken: |
|
Failures were unrelated broker tests. Retest this, please. |
Retest this please. |
|
More unrelated core test failures for Java 11: Java 8 passed (https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19443/) Retest this, please. |
|
Thanks, @bbejeck ! |
* ak/trunk: (45 commits) KAFKA-7487: DumpLogSegments misreports offset mismatches (apache#5756) MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (apache#6269) KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (apache#6274) KAFKA-7895: Fix stream-time reckoning for suppress (apache#6278) KAFKA-6569: Move OffsetIndex/TimeIndex logger to companion object (apache#4586) MINOR: add log indicating the suppression time (apache#6260) MINOR: Make info logs for KafkaConsumer a bit more verbose (apache#6279) KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (apache#6265) KAFKA-7884; Docs for message.format.version should display valid values (apache#6209) MINOR: Save failed test output to build output directory MINOR: add test for StreamsSmokeTestDriver (apache#6231) MINOR: Fix bugs identified by compiler warnings (apache#6258) KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (apache#5433) MINOR: fix bypasses in ChangeLogging stores (apache#6266) MINOR: Make MockClient#poll() more thread-safe (apache#5942) MINOR: drop dbAccessor reference on close (apache#6254) KAFKA-7811: Avoid unnecessary lock acquire when KafkaConsumer commits offsets (apache#6119) KAFKA-7916: Unify store wrapping code for clarity (apache#6255) MINOR: Add missing Alter Operation to Topic supported operations list in AclCommand KAFKA-7921: log at error level for missing source topic (apache#6262) ...
* MINOR: add test for StreamsSmokeTestDriver Hi @bbejeck @mjsax @ableegoldman @guozhangwang , Please take a look at this when you get the chance. The primary concern is adding the test. It will help us verify changes to the smoke test (such as adding suppression). I've also added some extra output to the smoke test stdout, which will hopefully aid us in diagnosing the flaky tests. Finally, I bundled in some cleanup. It was my intention to do that in a separate PR, but it wound up getting smashed together during refactoring. Please let me know if you'd prefer for me to pull any of these out into a separate request. Thanks, -John Also, add more output for debuggability * cleanup * cleanup * refactor * refactor * remove redundant printlns * Update EmbeddedKafkaCluster.java * move to integration package * replace early-exit on pass * use classrule for embedded kafka * pull in smoke test improvements from side branch * try-with-resources * format events instead of printing long lines * minor formatting fix Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
Also, add more output for debuggability
Committer Checklist (excluded from commit message)