MINOR: Added unit tests for ConnectionQuotas#8650
MINOR: Added unit tests for ConnectionQuotas#8650rajinisivaram merged 4 commits intoapache:trunkfrom
Conversation
| executor.submit((() => | ||
| intercept[RuntimeException](connectionQuotas.inc(externalListener.listenerName, externalListener.defaultClientIp, blockedPercentMeters("EXTERNAL")))): Runnable | ||
| ).get(5, TimeUnit.SECONDS) |
There was a problem hiding this comment.
- I think that
interceptdoes not assert that the exception is thrown but return the intercepted exception. It would be better to useassertThrownhere. - Do we really need to use an executor here? It seems that
incshould throw immediately if the listener does not exist.
There was a problem hiding this comment.
intercept also verifies the exception. For example, if the exception thrown is different, the test will fail with an error like this: Expected exception java.lang.IllegalArgumentException to be thrown, but kafka.network.TooManyConnectionsException was thrown. If if no exception is thrown, it will fail like this: Expected exception java.lang.IllegalArgumentException to be thrown, but no exception was thrown. From what I see, assertThrown achieves the same behavior as intercept.
There was a problem hiding this comment.
About using the executor -- you are right that inc should return immediately in this case. I still used the executor in case the behavior changes and inc blocks. I think it's better to protect the test from this scenario
| try { | ||
| // verify there is no limit by accepting 10000 connections as fast as possible | ||
| val numConnections = 10000 | ||
| val futures = listeners.values.map( listener => |
There was a problem hiding this comment.
nit: the space before listener is not needed. i would use curly braces here to stay consistent with the foreach below.
| // verify there is no limit by accepting 10000 connections as fast as possible | ||
| val numConnections = 10000 | ||
| val futures = listeners.values.map( listener => | ||
| executor.submit((() => acceptConnections(connectionQuotas, listener, numConnections)): Runnable) ) |
There was a problem hiding this comment.
nit: the space before the last closing parenthesis is not needed.
| executor.submit((() => acceptConnections(connectionQuotas, listener, numConnections)): Runnable) ) | ||
| futures.foreach(_.get(10, TimeUnit.SECONDS)) | ||
| listeners.values.foreach { listener => | ||
| assertEquals(s"${listener.listenerName.value()}",numConnections, connectionQuotas.get(listener.defaultClientIp)) |
There was a problem hiding this comment.
Number of connections on $listener:to stay consistent with the other tests?- nit: a space is missing after the first coma.
| // calling dec() for an IP for which we didn't call inc() should throw an exception | ||
| intercept[IllegalArgumentException](connectionQuotas.dec(listener.listenerName, unknownHost)) |
There was a problem hiding this comment.
I think that this does not assert as mentioned earlier. Moreover, I think that this deserves its own unit test as it is not really related to the current one.
There was a problem hiding this comment.
agreed, moved to a separate test
| executor.submit((() => acceptConnections(connectionQuotas, listener, listenerMaxConnections)): Runnable) ) | ||
| futures.foreach(_.get(5, TimeUnit.SECONDS)) | ||
| listeners.values.foreach { listener => | ||
| assertEquals(s"${listener.listenerName.value()}", listenerMaxConnections, connectionQuotas.get(listener.defaultClientIp)) |
There was a problem hiding this comment.
Number of connections on $listener: to stay consistent with other tests?
| val futures2 = listeners.values.map( listener => | ||
| executor.submit((() => acceptConnections(connectionQuotas, listener, 1)): Runnable) ) |
| val futures2 = listeners.values.map( listener => | ||
| executor.submit((() => acceptConnections(connectionQuotas, listener, 1)): Runnable) ) | ||
| futures2.foreach { future => | ||
| intercept[TimeoutException](future.get(1, TimeUnit.SECONDS)) |
There was a problem hiding this comment.
see my comment that intercept does assert on both wrong exception or no exception, similar to assertThrown
| connectionQuotas.dec(listener.listenerName, listener.defaultClientIp) | ||
| } | ||
| // all connections should get added | ||
| futures.foreach(_.get(5, TimeUnit.SECONDS)) |
There was a problem hiding this comment.
Could we assert the number of connections after this line?
| assertEquals(s"Number of connections on $externalListener:", maxConnectionsPerIp + 2, connectionQuotas.get(externalListener.defaultClientIp)) | ||
|
|
||
| // connections on the same listener but from a different IP should be accepted | ||
| executor.submit((() => acceptConnections(connectionQuotas, externalListener.listenerName, knownHost, maxConnectionsPerIp, 0)): Runnable).get(5, TimeUnit.SECONDS) |
There was a problem hiding this comment.
A general comment. It would be great if we could break long lines in order to keep them under a reasonable size. I don't know if we have a strict guideline on this but we tend to do it.
|
@dajac Thanks for your very thorough review. I addressed all your comments. |
|
ok to test |
rajinisivaram
left a comment
There was a problem hiding this comment.
@apovzner Thanks for the tests, LGTM
|
there was an odd build failure. retest this please. |
|
It looks like the build couldn't even run tests: |
|
ok to test |
|
Retest this please. |
|
One of the build failures failed due to one of the unit tests added in this PR. It was bug in the test, that was waiting on the wrong future, which had a name similar to another one. I fixed both waiting on the right future and changed variable name so that it is easier to spot. |
|
Retest this please |
|
Retest this please. |
|
JDK 14 build has an unrelated test failure: org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] It looks like tests passed on other builds, but it failed to record the results. |
|
Test failures unrelated, merging to trunk. |
* 'trunk' of github.com:apache/kafka: KAFKA-9980: Fix bug where alterClientQuotas could not set default client quotas (apache#8658) KAFKA-9780: Deprecate commit records without record metadata (apache#8379) MINOR: Deploy VerifiableClient in constructor to avoid test timeouts (apache#8651) MINOR: Added unit tests for ConnectionQuotas (apache#8650) MINOR: Correct MirrorMaker2 integration test configs for Connect internal topics (apache#8653) KAFKA-9855 - return cached Structs for Schemas with no fields (apache#8472) KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (apache#8608) KAFKA-8869: Remove task configs for deleted connectors from config snapshot (apache#8444) KAFKA-9409: Supplement immutability of ClusterConfigState class in Connect (apache#7942)
Added ConnectionQuotasTest, a unit test for ConnectionQuotas. We test connection limits functionality in SocketServerTest (max connections per IP), and DynamicConnectionQuotaTest (max broker-wide and per listener connection limits), which is an integration test. It is useful to have unit tests that directly test ConnectionQuotas.
Committer Checklist (excluded from commit message)