KAFKA-7243: Add unit integration tests to validate metrics in Kafka Streams#6080
KAFKA-7243: Add unit integration tests to validate metrics in Kafka Streams#6080guozhangwang merged 28 commits intoapache:trunkfrom
Conversation
|
Hi @mjsax @guozhangwang can you check the pull request ? |
|
@vvcephei could you take a look when you have time? |
vvcephei
left a comment
There was a problem hiding this comment.
Hi @khaireddine120 ,
Thanks for this PR.
When you get the chance, please fill out the PR description, as it will become the commit message when it's merged.
Looking at the ticket, I think the intent was actually to enumerate the expected metrics and make sure they are all actually registered.
I think your strategy to make sure that we de-register all metrics on shutdown works. But after startup, we need to verify each of the metrics we have documented: https://kafka.apache.org/documentation/#kafka_streams_monitoring
Thanks,
-John
|
Hi @vvcephei, i have 3 questions regarding this test:
|
|
Hi @vvcephei, any update ? |
|
Hi @khaireddine120 , Sorry for the delay. Here are my thoughts...
I looked at your updates to the test, and it looks good to me. Thanks! |
|
Hi @vvcephei @guozhangwang, |
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks for the PR @khaireddine120 , I've made a pass on it. LGTM overall.
| Thread.sleep(10000); | ||
|
|
||
| final List<Metric> listMetricAfterStartingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains("stream")).collect(Collectors.toList()); | ||
| Assert.assertTrue(listMetricAfterStartingApp.size() > 0); |
There was a problem hiding this comment.
Just checking there are some metrics with stream is very loose.. I'd suggest we:
- have a simple topology that reads from topic1, access a store1, and then write to a sink topic2.
- get the list of metrics from kafkaStreams.metrics() which contains producer, consumer, admin and streams' own metrics (thread, task, processor node, store, cache), and check that they all exist with the exact number of metrics (with the PR for KIP-414 is in it should be easy to get the corresponding client id for different modules).
- Close the app; and wait for the metrics to be all closed with a timeout (see comment below).
|
|
||
| closeApplication(); | ||
|
|
||
| Thread.sleep(10000); |
There was a problem hiding this comment.
We avoid using time-based operations in integrations since they usually leads to flaky tests.
Consider using TestUtils.waitForCondition() with a timeout.
| } | ||
|
|
||
| @Test | ||
| public void testStreamMetricOfWindowStore() throws Exception { |
There was a problem hiding this comment.
I was originally thinking about getting the list of all metrics from kafkaStreams.metrics() which contains producer, consumer, admin and streams' own metrics (thread, task, processor node, store, cache), and check that they all exist with the exact number of metrics (with the PR for KIP-414 is in it should be easy to get the corresponding client id for different modules).
But after reading @vvcephei 's comment I think I'm convinced that we can save on getting non-streams embedded client's metrics later, and also the actual metrics name validation may better not in Streams metrics test, so we'd probably only check that the corresponding groups with clientIds existed rather than checking each metric exists. So I'm fine with the current scope of this PR.
| kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); | ||
| kafkaStreams.start(); | ||
|
|
||
| Thread.sleep(10000); |
There was a problem hiding this comment.
We avoid using time-based operations in integrations since they usually leads to flaky tests.
Consider using TestUtils.waitForCondition() with a timeout.
There was a problem hiding this comment.
@guozhangwang the issue is that i have no condition to be sure that the metrics has been registred to begin the test, i just tested with 5 sec (no all metric are registred) then tested with 10 sec and the test never fails. any suggestions ?
There was a problem hiding this comment.
What I was suggesting is that we can 1) remove the sleep in startApplication function, and 2) in the test itself, use TestUtils.waitForCondition() after startApplication for each testXX check, such that if it is not yet satisfied, instead of failing the test immediately, it will backoff a bit and re-execute the check and see if it is good now -- so we are giving the check continuously than doing a one-time shot.
Of course, this requires you to replace the assertions in testMetricByName, and instead to return booleans plus propagating the error message bottom-up so that it can shows which exact metric name did not find when we've exhausted the wait time and finally decide to fail (otherwise it will just say "fail to meet condition" but the internal information like which test function's which metric name caused it, will be lost).
Does that make sense?
There was a problem hiding this comment.
yes, thank you.
|
Hi @guozhangwang , can you confirm the fix ? |
|
Don't worry, I will squash / merge when needed. |
guozhangwang
left a comment
There was a problem hiding this comment.
Made another pass, and left a couple minor comments.
| Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue()); | ||
| } | ||
| } catch (final Throwable e) { | ||
| throw e; |
There was a problem hiding this comment.
Since in the caller we will capture throwable and swallow it by returning false, would the actual error message would be lost?
There was a problem hiding this comment.
I put a logger, but the checkstyle refuse it, should i find a way to log the error ? Or ignore it ?
There was a problem hiding this comment.
@khaireddine120 I thought about it again and now I realized adding a logger is not ideal, since it may fail a couple of time within the timeout until it succeeds, but the assert failure would still log an record before succeeds.
So how about doing this?
-
Create an error string variable (initialized as null) which will be passed to
testXXfunction, which will also pass it totestMetricByNameas the additional parameter, then insidetestMetricByNamereplace the assertion with the size / not null conditions directly; and if failed modify the passed-in string reference with the necessary information likewhich metric name contains unexpected numbers, or name equals to null. -
And this string can be passed in as the second parameter if
waitForCondition, this is because that parameter is actually used as aSupplier<String>which is evaluated lazily. So if the condition did not met indeed, that evaluation will then take whatever the current string reference to construct the final message.
WDYT?
There was a problem hiding this comment.
it make sense. thanks
| for (final Metric m : metrics) { | ||
| Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue()); | ||
| } | ||
| } catch (final Throwable e) { |
There was a problem hiding this comment.
Do we need to capture-and-rethrow here? Since the caller will always capture and swallow, this seems unnecessary to me.
|
@khaireddine120 Please ping me whenever it is ready for reviews again. |
|
Of course
Le mar. 12 mars 2019 à 18:00, Guozhang Wang <notifications@github.com> a
écrit :
… @khaireddine120 <https://github.com/khaireddine120> Please ping me
whenever it is ready for reviews again.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#6080 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/ACKWVUKENMRSoGJemcRRPrzHpedgWAypks5vV91DgaJpZM4ZlR2t>
.
--
Ingénieur en informatique
|
|
Hi @guozhangwang, can you take a look on the last update ? |
|
Hmm.. the current approach will still be printing multiple error messages for previous runs right? For example let's say in while what really want is just Does that make sense? |
|
I will recheck this case |
|
Ah I see. Yeah |
|
retest this please |
|
retest this please |
|
LGTM. @vvcephei could you take another look as well? |
|
retest this please |
vvcephei
left a comment
There was a problem hiding this comment.
Hey @khaireddine120 , I just have a couple of small remarks...
| streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); | ||
| } | ||
|
|
||
| @Before |
There was a problem hiding this comment.
Yes, good catch :)
| .collect(Collectors.toList()); | ||
| testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); | ||
| testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); | ||
| testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0); |
There was a problem hiding this comment.
Do I read this correctly: it's verifying that we have 0 metrics registered for PUT_IF_ABSENT_LATENCY_AVG?
There was a problem hiding this comment.
That's what i found after starting the app. I don't know the required "given" to got this type of metrics :)
There was a problem hiding this comment.
For window / session stores, there's no putIfAbsent function and hence no metrics would be registered.
There was a problem hiding this comment.
i remove it or let the test on 0 ?
| } | ||
| } | ||
|
|
||
| private boolean testStoreMetricByType(final String storeType, final StringBuilder errorMessage) { |
There was a problem hiding this comment.
nit: rename to testStoreMetricKeyValueByType
guozhangwang
left a comment
There was a problem hiding this comment.
@khaireddine120 could you address the comments and ping me again?
|
retest this please |
|
Hi @guozhangwang, can you recheck the pull request ? |
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @khaireddine120 !
|
@guozhangwang @vvcephei , thanks for your help guys. |
* apache/trunk: (23 commits) KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (apache#6493) KAFKA-8102: Add an interval-based Trogdor transaction generator (apache#6444) MINOR: Fix misspelling in protocol documentation KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (apache#6489) KAFKA-8014: Extend Connect integration tests to add and remove workers dynamically (apache#6342) MINOR: Remove line for testing repartition topic name (apache#6488) MINOR: add MacOS requirement to Streams docs MINOR: fix message protocol help text for ElectPreferredLeadersResult (apache#6479) MINOR: list-topics should not require topic param MINOR: Clean up ThreadCacheTest (apache#6485) MINOR: Avoid unnecessary collection copy in MetadataCache (apache#6397) KAFKA-8142: Fix NPE for nulls in Headers (apache#6484) KAFKA-7243: Add unit integration tests to validate metrics in Kafka Streams (apache#6080) MINOR: Add verification step for Streams archetype to Jenkins build (apache#6431) KAFKA-7819: Improve RoundTripWorker (apache#6187) KAFKA-7989: RequestQuotaTest should wait for quota config change before running tests (apache#6482) KAFKA-8098: Fix Flaky Test testConsumerGroups KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (apache#6409) MINOR: capture result timestamps in Kafka Streams DSL tests (apache#6447) MINOR: updated names for deprecated streams constants (apache#6466) ...
…treams (apache#6080) The goal of this task is to implement an integration test for the kafka stream metrics. We have to check 2 things: 1. After streams application are started, all metrics from different levels (thread, task, processor, store, cache) are correctly created and displaying recorded values. 2. When streams application are shutdown, all metrics are correctly de-registered and removed. Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
The goal of this task is to implement an integration test for the kafka stream metrics.
We have to check 2 things:
1. After streams application are started, all metrics from different levels (thread, task, processor, store, cache) are correctly created and displaying recorded values.
2. When streams application are shutdown, all metrics are correctly de-registered and removed.