Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ private boolean waitOnState(final State targetState, final long waitMs) {
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
if (waitMs == 0) {
try {
stateLock.wait();
Comment thread
nizhikov marked this conversation as resolved.
} catch (final InterruptedException e) {
// it is ok: just move on to the next iteration
}
} else if (waitMs > elapsedMs) {
if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
Expand Down Expand Up @@ -824,17 +818,30 @@ public void close() {
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
* @param timeout how long to wait for the threads to shutdown
* @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately.
Comment thread
nizhikov marked this conversation as resolved.
* @param timeUnit unit of time used for timeout
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
* @deprecated Use {@link #close(Duration)} instead
* @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
long timeoutMs = timeUnit.toMillis(timeout);

log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " +
"Please, consider update your code.", timeoutMs);

if (timeoutMs < 0) {
timeoutMs = 0;
} else if (timeoutMs == 0) {
timeoutMs = Long.MAX_VALUE;
}

return close(timeoutMs);
}

private boolean close(final long timeoutMs) {
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been stopped
Expand Down Expand Up @@ -890,7 +897,7 @@ public void run() {
shutdownThread.start();
}

if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
log.info("Streams client stopped completely");
return true;
} else {
Expand All @@ -912,7 +919,15 @@ public void run() {
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
return close(timeout.toMillis(), TimeUnit.MILLISECONDS);

final long timeoutMs = timeout.toMillis();
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}

log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);

return close(timeoutMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws InterruptedException {
}
}

@Test
public void shouldThrowOnNegativeTimeoutForClose() {
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.close(Duration.ofMillis(-1L));
fail("should not accept negative close parameter");
} catch (final IllegalArgumentException e) {
// expected
} finally {
streams.close();
}
}

@Test
public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));

th.start();

try {
th.join(30_000L);
assertFalse(th.isAlive());
} finally {
streams.close();
}
}

private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
final File taskDir = new File(appDir, "0_0");
TestUtils.waitForCondition(
Expand Down