Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5773,7 +5773,7 @@ public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exceptio
TestUtils.waitForCondition(() -> {
time.sleep(1);
return disconnectFuture.isDone();
}, 1, 5000, () -> "Timed out waiting for expected disconnect");
}, 5000, 1, () -> "Timed out waiting for expected disconnect");
assertFalse(disconnectFuture.isCompletedExceptionally());
assertFalse(result.future.isDone());
TestUtils.waitForCondition(env.kafkaClient()::hasInFlightRequests,
Expand Down
16 changes: 8 additions & 8 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public static void waitForCondition(final TestCondition testCondition, final lon
* avoid transient failures due to slow or overloaded machines.
*/
public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier<String> conditionDetailsSupplier) throws InterruptedException {
waitForCondition(testCondition, DEFAULT_POLL_INTERVAL_MS, maxWaitMs, conditionDetailsSupplier);
waitForCondition(testCondition, maxWaitMs, DEFAULT_POLL_INTERVAL_MS, conditionDetailsSupplier);
}

/**
Expand All @@ -310,11 +310,11 @@ public static void waitForCondition(final TestCondition testCondition, final lon
*/
public static void waitForCondition(
final TestCondition testCondition,
final long pollIntervalMs,
final long maxWaitMs,
final long pollIntervalMs,
Supplier<String> conditionDetailsSupplier
) throws InterruptedException {
retryOnExceptionWithTimeout(pollIntervalMs, maxWaitMs, () -> {
retryOnExceptionWithTimeout(maxWaitMs, pollIntervalMs, () -> {
String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";
assertTrue(testCondition.conditionMet(),
Expand All @@ -333,7 +333,7 @@ public static void waitForCondition(
*/
public static void retryOnExceptionWithTimeout(final long timeoutMs,
final ValuelessCallable runnable) throws InterruptedException {
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs, runnable);
retryOnExceptionWithTimeout(timeoutMs, DEFAULT_POLL_INTERVAL_MS, runnable);
}

/**
Expand All @@ -345,21 +345,21 @@ public static void retryOnExceptionWithTimeout(final long timeoutMs,
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException {
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, DEFAULT_MAX_WAIT_MS, runnable);
retryOnExceptionWithTimeout(DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, runnable);
}

/**
* Wait for the given runnable to complete successfully, i.e. throw now {@link Exception}s or
* {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the
* last exception or assertion failure will be thrown thus providing context for the failure.
*
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
* @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully.
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
* @param runnable the code to attempt to execute successfully.
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final long pollIntervalMs,
final long timeoutMs,
public static void retryOnExceptionWithTimeout(final long timeoutMs,
final long pollIntervalMs,
final ValuelessCallable runnable) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + timeoutMs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,

QuorumController activeController() throws InterruptedException {
AtomicReference<QuorumController> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
QuorumController activeController = null;
for (QuorumController controller : controllers) {
if (controller.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testPassLeadership() throws Exception {

private static void waitForLastCommittedOffset(long targetOffset,
LocalLogManager logManager) throws InterruptedException {
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
MockMetaLogManagerListener listener =
(MockMetaLogManagerListener) logManager.listeners().get(0);
long highestOffset = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ File dir() {

LeaderAndEpoch waitForLeader() throws InterruptedException {
AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
LeaderAndEpoch result = null;
for (LocalLogManager logManager : logManagers) {
LeaderAndEpoch leader = logManager.leaderAndEpoch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
}

final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that after failover we have recovered to the last store write
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
});
Expand All @@ -159,7 +159,7 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
// Assert that all messages in the second batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));

TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
// Assert that the current value in store reflects all messages being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
});
Expand Down