-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-7277: Migrate Streams API to Duration instead of longMs times #5682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0293466
6724276
9730461
c7b3c9a
c4f2335
a6d47d4
b44aed9
52c5a84
4e42ed4
e5b9e5f
527f6fe
6b25162
d63b729
8d40a40
5dbe795
518a914
6eb48e3
f241d3a
4e8b65f
a583259
5210f9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| */ | ||
| package org.apache.kafka.streams; | ||
|
|
||
| import java.time.Duration; | ||
| import org.apache.kafka.clients.admin.AdminClient; | ||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.clients.producer.KafkaProducer; | ||
|
|
@@ -36,6 +37,7 @@ | |
| import org.apache.kafka.streams.errors.InvalidStateStoreException; | ||
| import org.apache.kafka.streams.errors.ProcessorStateException; | ||
| import org.apache.kafka.streams.errors.StreamsException; | ||
| import org.apache.kafka.streams.internals.ApiUtils; | ||
| import org.apache.kafka.streams.kstream.KStream; | ||
| import org.apache.kafka.streams.kstream.KTable; | ||
| import org.apache.kafka.streams.kstream.Produced; | ||
|
|
@@ -827,7 +829,9 @@ public void close() { | |
| * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached | ||
| * before all threads stopped | ||
| * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}. | ||
| * @deprecated Use {@link #close(Duration)} instead | ||
| */ | ||
| @Deprecated | ||
| public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { | ||
| log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); | ||
|
|
||
|
|
@@ -895,6 +899,22 @@ public void run() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the | ||
| * threads to join. | ||
| * A {@code timeout} of 0 means to wait forever. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @guozhangwang @bbejeck @vvcephei Should we change this to "zero or negative" ? Comparing the implementation, passing in a negative timestamp will "expire" the timeout even without checking the state transition at all and return immediately (with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine with those semantics, maybe we can make a Jira. Thinking about it more, though, wouldn't it make more sense to:
Regardless, I agree the current behavior is a little weird, and I'd be in favor of a Jira/KIP to revise it.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overall I'm ok with the semantics. But my first instinct of a timeout of In short, I'm +1 as well on revising the behavior.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, we should fix this in 2.1 -- I am suggestion this, because we could use the new semantics for
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nizhikov Are you willing for address 7477 in this PR? If not, it's also fine and we do a follow up PR. However, it should be part of the KIP description. Could you update the KIP accordingly?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mjsax I'll take care of KAFKA-7477 in follow up PR. KIP-358 updated. Please, see, "Proposed Changes" section.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you start working on KAFKA-7477 already @nizhikov ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can understand thus fix is trivial.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't call it trivial, but sure, this sound good! Thanks a lot. Just want to make sure we get it on time to not miss code freeze deadline. Thanks a lot! |
||
| * | ||
| * @param timeout how long to wait for the threads to shutdown | ||
| * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached | ||
| * before all threads stopped | ||
| * Note that this method must not be called in the {@link StateListener#onChange(State, State)} callback of {@link StateListener}. | ||
| * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds} | ||
| */ | ||
| public synchronized boolean close(final Duration timeout) throws IllegalArgumentException { | ||
| ApiUtils.validateMillisecondDuration(timeout, "timeout"); | ||
| return close(timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| /** | ||
| * Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all | ||
| * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.streams.internals; | ||
|
|
||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.Objects; | ||
|
|
||
| public final class ApiUtils { | ||
| private ApiUtils() { | ||
| } | ||
|
|
||
| /** | ||
| * Validates that milliseconds from {@code duration} can be retrieved. | ||
| * @param duration Duration to check. | ||
| * @param name Name of params for an error message. | ||
| * @return Milliseconds from {@code duration}. | ||
| */ | ||
| public static long validateMillisecondDuration(final Duration duration, final String name) { | ||
| try { | ||
| if (duration == null) | ||
| throw new IllegalArgumentException("[" + Objects.toString(name) + "] shouldn't be null."); | ||
|
|
||
| return duration.toMillis(); | ||
| } catch (final ArithmeticException e) { | ||
| throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validates that milliseconds from {@code instant} can be retrieved. | ||
| * @param instant Instant to check. | ||
| * @param name Name of params for an error message. | ||
| * @return Milliseconds from {@code instant}. | ||
| */ | ||
| public static long validateMillisecondInstant(final Instant instant, final String name) { | ||
| try { | ||
| if (instant == null) | ||
| throw new IllegalArgumentException("[" + name + "] shouldn't be null."); | ||
|
|
||
| return instant.toEpochMilli(); | ||
| } catch (final ArithmeticException e) { | ||
| throw new IllegalArgumentException("[" + name + "] can't be converted to milliseconds. ", e); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.