KAFKA-10304: refactor MM2 integration tests#9224
Conversation
7341a27 to
37ff7fd
Compare
37ff7fd to
2a251a2
Compare
There was a problem hiding this comment.
propose TestUtils to be the central place to host common functions that will be used by integration tests
There was a problem hiding this comment.
This is the simple move from MirrorConnectorsIntegrationTest with generalization of connector class
2a251a2 to
49d1f57
Compare
There was a problem hiding this comment.
this is mostly copy-paste from MirrorConnectorsIntegrationTest
There was a problem hiding this comment.
Add a check for topic config sync, since the topic created on primary cluster has a "cleanup.policy" config
There was a problem hiding this comment.
when creating test-topic-1 topic on primary cluster, add a topic config. Later on, we will check if the config is synced from primary to backup cluster.
49d1f57 to
eb80b5c
Compare
|
very appreciated for any feedback on what to test additionally and how to get close to the real scenario. Some of the ideas come from https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java |
80d32a0 to
2c3cbe7
Compare
|
@ryannedolan @mimaison when possible, very appreciated for your attentions and feedback :) Thanks |
08bd906 to
7219be1
Compare
3305c28 to
c5bba1a
Compare
|
@ning2008wisc I've not forgotten this PR, I just haven't had time to do reviews yet :( |
I think that would be a good idea. We can try to introduce this test in a follow up PR. |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the updates. I've made another pass.
I've left a bunch of minor comments but I'd like to highlight bigger issues on the following 2 points:
-
SSL tests: Even though MM2 does not do anything special with SSL (nor SASL) I think it's fine adding tests for SSL. However this should not cause such a large change. In this PR, SSL has leaked in all classes. Instead we want an SSL class that just changes the configuration.
For a good example, see https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala. If you can get it in a similar shape, then it makes sense to cover SSL
-
testWithBrokerRestart: Testing semantics of connectors is tricky. I don't think the current test does it properly at the moment. Have a look at the comments I left on the test. Maybe we should focus on the other issues first and have another attempt in a different PR. WDYT?
There was a problem hiding this comment.
Why do we have SSL specific methods here? Could we move all the SSL bits into the SSL class?
We have fields for the configurations. So we could set them accordingly (without or without SSL) in each concrete class. Then in the base class, we just use the fields to create the clusters without having to know if it's SSL or not.
There was a problem hiding this comment.
Could we move all the SSL bits into the SSL class?
I believe yes
We have fields for the configurations. So we could set them accordingly (without or without SSL) in each concrete class. Then in the base class, we just use the fields to create the clusters without having to know if it's SSL or not.
I agree
There was a problem hiding this comment.
By catching Throwable, aren't we silencing the error and not immediately failing the test?
There was a problem hiding this comment.
since this is in Integration test, if we run into catch, the test should fail immediately, right? If you have suggestions on what to put here, I am happy to follow :)
There was a problem hiding this comment.
Just let the Exception flow, that will automatically fail the test
There was a problem hiding this comment.
Is there a reason we are only enabling SSL in one of the 2 clusters?
There was a problem hiding this comment.
Good question. The primary reasons are: (1) I have seen some MM2 use cases which mirror data from in-house Kafka cluster to public hosted Kafka. For example, https://issues.apache.org/jira/browse/KAFKA-10704 So this setup (non-SSL -> SSL) seems to represent a certain coverage of use cases. (2) There are 4 combinations between SSL and non-SSL, so adding all combinations are great, but also verbose. Proving MM2 can work with one SSL-enabled cluster should be convincing enough to claim "MM2 can mirror in SSL case".
There was a problem hiding this comment.
We have this field in both concrete class. Can we move it to the base class instead?
There was a problem hiding this comment.
Good point. I am thinking in future, if we have test for MirrorSinkConnector (which will also extend the MirrorConnectorsIntegrationBaseTest class), it may be more flexible to have each extended test class to define what is the list of connectors to boot up?
I am also fine to move CONNECTOR_LIST into the base class now and move it out later on when we have MirrorSinkConnector
There was a problem hiding this comment.
Should this class be abstract?
There was a problem hiding this comment.
We can use Collections.singletonMap()
There was a problem hiding this comment.
We can use StringDeserializer.class.getName()
There was a problem hiding this comment.
This test is not specific to SSL, why do we have it ?
Should instead all tests in MirrorConnectorsIntegrationTest go to the base class so we run them both with and without SSL?
There was a problem hiding this comment.
Isn't MM2/Connect using at least once by default? ie, the producer in the runtime can cause duplicates.
There was a problem hiding this comment.
sorry - it is a typo, should be "at least once". I am removing this test from this PR
There was a problem hiding this comment.
I don't understand what this test is doing.
Why do we need background clients instead of producing upfront and consuming the data mirrorred at the end of the test?
It looks like we are testing the primary->backup scenario but we are restarting the backup cluster. The source connector should not interact with the backup cluster.
There was a problem hiding this comment.
Why do we need background clients instead of producing upfront and consuming the data mirrorred at the end of the test?
I think I answered the above question as the comments in the code. Please see below,
// the purpose of background producer and consumer is to better test the failure case
// to avoid serialization (produce -> broker restart -> consumer) by decoupling the
// producer, embedded kafka and consumer. Since the consumer offsets are stored in
// kafka topic at backup cluster and offset commit is periodic, when kafka broker (at backup)
// restarts, duplicate records will be consumed, meaning "at least once" delivery guarantee
I believe this test has unique value and is more close to the realistic scenario, since the producer -> kafka -> consumer are always running on different machines, rather than within one process or thread.
Due to the complexity limitation, I am happy to remove this test for now
|
Thanks @mimaison for your suggestive and insightful feedback. Regarding to your 2 major concerns, I agree and I believe there exists a feasible solution, especially for a very lean SSL test. I will update this PR early next week by addressing all your major and minor comments. |
f2d4125 to
bd6ee1a
Compare
|
@mimaison Thanks again for your previous detailed review. I updated the PR to resolve the exact 2 concerns you raised. Very appreciated for your another review! |
mimaison
left a comment
There was a problem hiding this comment.
Thanks @ning2008wisc for the updates.
It's looking much much better. There are still a few things that need to be improved to merge but it's almost there.
There was a problem hiding this comment.
Can we use allConfigs.get() to find the configs we want instead of searching for them?
There was a problem hiding this comment.
allConfigs is a java.util.Collection object and seems does not directly support get() https://docs.oracle.com/javase/8/docs/api/java/util/Collection.html
There was a problem hiding this comment.
Just let the Exception flow, that will automatically fail the test
|
I think the |
bd6ee1a to
ff7de40
Compare
|
@ning2008wisc, @mimaison: Please see https://issues.apache.org/jira/browse/KAFKA-10811 and #9698 for a required fix to the |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the update @ning2008wisc. You've now addressed the most major issues. I've left a few more comments suggesting small improvements. A
Can you also rebase on trunk to resolve the conflict?
There was a problem hiding this comment.
Yes but the code that created the consumer should close it. If I call waitForConsumingAllRecords(), I'd not expect it to close my consumer instance.
There was a problem hiding this comment.
This still needs to be addressed
ff7de40 to
80019cf
Compare
|
Hello @mimaison I addressed all your of comments, please take the final review. |
a4af47b to
cd77b18
Compare
|
@ning2008wisc, https://issues.apache.org/jira/browse/KAFKA-10811 and #9698 have been merged (to multiple branches). |
|
@ning2008wisc There are a few checkstyle failures: You can run |
cd77b18 to
7b8957a
Compare
|
@mimaison thanks for pointing out. Fixed the checkstyle, sorry for just running the check for |
mimaison
left a comment
There was a problem hiding this comment.
Thanks again for the updates! It looks like we lost some logic that was added recently in MirrorConnectorsIntegrationTest.java .
Can you take a look and make sure we don't lose any logic?
7b8957a to
f57a8c9
Compare
|
@mimaison thanks so much for all your efforts on reviewing. Really appreciated if you may have time before end of this year to do 1-2 final reviews to merge this PR. |
mimaison
left a comment
There was a problem hiding this comment.
Thanks @ning2008wisc for all the updates! LGTM
|
Thanks @mimaison so much for your multiple rounds of valuable feedback and comments which greatly improving the code quality of this PR |
The main proposals of this PR:
(1) extract the common functions into a base class
MirrorConnectorsIntegrationBaseTest(2) test in SSL-enabled cluster