From 0a8bdfb9a487d3736dee08e36e2d20b07a90ddf1 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 25 May 2021 06:17:13 -0700 Subject: [PATCH] MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout` New parameters in overloaded methods should appear later apart from lambdas that should always be last. --- .../clients/admin/KafkaAdminClientTest.java | 2 +- .../java/org/apache/kafka/test/TestUtils.java | 16 ++++++++-------- .../controller/QuorumControllerTestEnv.java | 2 +- .../kafka/metalog/LocalLogManagerTest.java | 2 +- .../kafka/metalog/LocalLogManagerTestEnv.java | 2 +- .../OptimizedKTableIntegrationTest.java | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 4cad255ba0241..8c987a12d02fa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -5773,7 +5773,7 @@ public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exceptio TestUtils.waitForCondition(() -> { time.sleep(1); return disconnectFuture.isDone(); - }, 1, 5000, () -> "Timed out waiting for expected disconnect"); + }, 5000, 1, () -> "Timed out waiting for expected disconnect"); assertFalse(disconnectFuture.isCompletedExceptionally()); assertFalse(result.future.isDone()); TestUtils.waitForCondition(env.kafkaClient()::hasInFlightRequests, diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 5e34ae8c5b369..3c819befa5fa8 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -298,7 +298,7 @@ public static void waitForCondition(final TestCondition testCondition, final lon * avoid transient failures due to slow or overloaded machines. */ public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier conditionDetailsSupplier) throws InterruptedException { - waitForCondition(testCondition, DEFAULT_POLL_INTERVAL_MS, maxWaitMs, conditionDetailsSupplier); + waitForCondition(testCondition, maxWaitMs, DEFAULT_POLL_INTERVAL_MS, conditionDetailsSupplier); } /** @@ -310,11 +310,11 @@ public static void waitForCondition(final TestCondition testCondition, final lon */ public static void waitForCondition( final TestCondition testCondition, - final long pollIntervalMs, final long maxWaitMs, + final long pollIntervalMs, Supplier conditionDetailsSupplier ) throws InterruptedException { - retryOnExceptionWithTimeout(pollIntervalMs, maxWaitMs, () -> { + retryOnExceptionWithTimeout(maxWaitMs, pollIntervalMs, () -> { String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null; String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : ""; assertTrue(testCondition.conditionMet(), @@ -333,7 +333,7 @@ public static void waitForCondition( */ public static void retryOnExceptionWithTimeout(final long timeoutMs, final ValuelessCallable runnable) throws InterruptedException { - retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs, runnable); + retryOnExceptionWithTimeout(timeoutMs, DEFAULT_POLL_INTERVAL_MS, runnable); } /** @@ -345,7 +345,7 @@ public static void retryOnExceptionWithTimeout(final long timeoutMs, * @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully. */ public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException { - retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, DEFAULT_MAX_WAIT_MS, runnable); + retryOnExceptionWithTimeout(DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, runnable); } /** @@ -353,13 +353,13 @@ public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) * {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the * last exception or assertion failure will be thrown thus providing context for the failure. * - * @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}. * @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully. + * @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}. * @param runnable the code to attempt to execute successfully. * @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully. */ - public static void retryOnExceptionWithTimeout(final long pollIntervalMs, - final long timeoutMs, + public static void retryOnExceptionWithTimeout(final long timeoutMs, + final long pollIntervalMs, final ValuelessCallable runnable) throws InterruptedException { final long expectedEnd = System.currentTimeMillis() + timeoutMs; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index db3acfba2a72f..da2269974022a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -53,7 +53,7 @@ public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv, QuorumController activeController() throws InterruptedException { AtomicReference value = new AtomicReference<>(null); - TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { QuorumController activeController = null; for (QuorumController controller : controllers) { if (controller.isActive()) { diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java index 4d4e5101da994..7b5e26d79f6c7 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java @@ -96,7 +96,7 @@ public void testPassLeadership() throws Exception { private static void waitForLastCommittedOffset(long targetOffset, LocalLogManager logManager) throws InterruptedException { - TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { MockMetaLogManagerListener listener = (MockMetaLogManagerListener) logManager.listeners().get(0); long highestOffset = -1; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java index 9282f42237d66..4ff350e41a80a 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java @@ -108,7 +108,7 @@ File dir() { LeaderAndEpoch waitForLeader() throws InterruptedException { AtomicReference value = new AtomicReference<>(null); - TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { LeaderAndEpoch result = null; for (LocalLogManager logManager : logManagers) { LeaderAndEpoch leader = logManager.leaderAndEpoch(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 7f7eabb1e1d30..44744cd3e2be3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -147,7 +147,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { } final ReadOnlyKeyValueStore newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1; - TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> { + TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> { // Assert that after failover we have recovered to the last store write assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1))); }); @@ -159,7 +159,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the second batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> { + TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> { // Assert that the current value in store reflects all messages being processed assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1))); });