Skip to content

KAFKA-12375: don't reuse thread.id until a thread has fully shut down#10215

Merged
ableegoldman merged 5 commits intoapache:trunkfrom
ableegoldman:12375-dont-reuse-thread-id-in-REPLACE_THREAD-event
Mar 3, 2021
Merged

KAFKA-12375: don't reuse thread.id until a thread has fully shut down#10215
ableegoldman merged 5 commits intoapache:trunkfrom
ableegoldman:12375-dont-reuse-thread-id-in-REPLACE_THREAD-event

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Feb 26, 2021

Basically any id that isn't actively being used by a non-DEAD thread in the threads list is fair game. This is relevant in two scenarios:

REPLACE_THREAD: when choosing to replace a thread after a recoverable error, we should just grab a new (and free) thread.id rather than reusing the id of the dying thread, to avoid a race condition between the old thread shutting down and the new thread starting up.

#removeStreamThread(): in this case, if we haven't explicitly waited for the thread to complete the shutdown, then we should not remove it from the threads list since this will allow reusing that thread.id. This can happen if a thread is removing itself, or if we timed out waiting for it to reach the DEAD state.

In all of these scenarios, the threads list is considered the source of truth about available thread.ids. When trying to start up a new thread and computing the next available id, we'll trim any DEAD threads from this list

Should be cherrypicked to the 2.8 branch cc @vvcephei

@ableegoldman
Copy link
Copy Markdown
Member Author

Call for review @wcarlson5 @swist @cadonna @rodesai @guozhangwang

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @ableegoldman !

Here my feedback!

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
if (!names.contains(name)) {
return i;
final HashSet<String> allLiveThreadNames = new HashSet<>();
final AtomicInteger maxThreadId = new AtomicInteger(1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need an atomic integer here? maxThreadId is only used in the synchronized block.

Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman Mar 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because of the whole "variables used in a lambda must be final or effectively final" thing

threads.remove(streamThread);
// Don't remove from threads until shutdown is complete since this will let another thread
// reuse its thread.id. We will trim any DEAD threads from the list later
final long cacheSizePerThread = getCacheSizePerThread(threads.size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to adapt the resizing of the cache per threaad to use only the number of non-DEAD stream threads instead of all stream threads in the list. There are other two locations where we use the size of the thread list to resize the cache per thread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perviously we had relied on the fact there were no dead threads in the list

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I added a method getNumLiveStreamThreads to use instead of just threads.size() which will trim the list of any DEAD threads and return the actual number of living threads

Comment thread checkstyle/suppressions.xml Outdated
Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix. Overall it looks good but there are 3 things that either need to be adjusted or could be easily simplified with this change.

closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that we had the replace use the same ID for a reason. (maybe it had to do with rebalancing?). I don't think there should be a problem to try to get the same ID by waiting a bit in the replace thread

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot wait here until the dead thread is shutdown because the shutdown happens after replaceStreamThread() throws the exception. So we would wait forever.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadThread.shutdown(); I was referring to this below. But if we don't need to keep the same for any reason I am fine either way

Copy link
Copy Markdown
Member

@cadonna cadonna Mar 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being clear enough. I was also referring to deadThread.shutdown(). Method deadThread.shutdown() only requests a shutdown. The actual shutdown is performed in completeShutdown() which is called after replaceStreamThread() throws one of the exceptions below. Since completeShutdown() is called by the same thread that calls deadThread.shutdown() in this method, i.e., the dead thread, we would wait forever if we waited after deadThread.shutdown() until the dead thread is shut down. Or did I misunderstand your statement "by waiting a bit in the replace thread"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense. Another approach is using a temporary name. And then waiting the new thread until the old thread dies and takes the name. This is a bit complicated and I think it should only be done if it is necessary for the new thread to have the same name. And probably not in this PR but it could be an improvement done later

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think swapping the names would make the code unnecessarily complicated, and it would definitely make reading the logs more difficult.
Just to note: in the current rebalance protocol, the thread name should not impact the task assignment since within a client tasks are always just assigned to their previous owner (we maximize stickiness & balance)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the name doesn't matter I wonder why we spent so much effort making sure it had the same name?

log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time");
timeout = true;
// Don't remove from threads until shutdown is complete. We will trim it from the
// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we trim this list? I don't thing we do. In the begging of addStreamThread() can we purge the dead threads? That is the only place it should matter

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're trimming it in getNextThreadIndex. But if we're going to rely on threads.size() elsewhere, which it seems we do, then yeah we should trim it more aggressively

threads.remove(streamThread);
// Don't remove from threads until shutdown is complete since this will let another thread
// reuse its thread.id. We will trim any DEAD threads from the list later
final long cacheSizePerThread = getCacheSizePerThread(threads.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perviously we had relied on the fact there were no dead threads in the list

// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
// shutdown then we should just consider this thread.id to be burned
} else {
threads.remove(streamThread);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we purge the dead threads before we add new ones and if we remove the assumption that there are no dead threads in the thread list we can just not remove the threads in remove thread. This will make it there should be no concern about the cache size changing when a thread is removing itself. And make the risk we took about memory overflows unnecessary.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was mainly trying to keep things simple. There's definitely a tradeoff in when we resize the cache: either we resize it right away and risk an OOM or we resize it whenever we find newly DEAD threads but potentially have to wait to "reclaim" the memory of a thread.
Both scenarios run into trouble when a thread is hanging in shutdown, but if that occurs something has already gone wrong so I don't think we need to guarantee Streams will continue running perfectly. But the downside to resizing the cache only once a thread reaches DEAD is that a user could call removeStreamThread() with a timeout of 0 and then never call add/remove thread again, and they'll never get back the memory of the removed thread since we only trim the threads inside these methods (or the exception handler). ie, it seems ok to lazily remove DEAD threads if we only use the threads list to find a unique threadId, but not to lazily resize the cache. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave it for now, if we should see problems this could be a fix, we don't run a single thread soak so we won't see this issue ourselves but there are many single thread applications that could start using this and we should see if they have problems

@ableegoldman ableegoldman force-pushed the 12375-dont-reuse-thread-id-in-REPLACE_THREAD-event branch from 4730196 to 6699084 Compare March 2, 2021 03:36
@ableegoldman
Copy link
Copy Markdown
Member Author

This should be ready for a final review @cadonna @wcarlson5

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @ableegoldman !

LGTM!

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Mar 2, 2021

Test failure is probably related:

org.apache.kafka.streams.KafkaStreamsTest.shouldAddThreadWhenRunning

java.lang.NumberFormatException: For input string: "newThread"

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I think there are a few things we can do better to head off potential problems but they are defiantly not necessary here. Thanks for the PR @ableegoldman !

// list once it reaches DEAD, and if for some reason it's hanging indefinitely in the
// shutdown then we should just consider this thread.id to be burned
} else {
threads.remove(streamThread);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave it for now, if we should see problems this could be a fix, we don't run a single thread soak so we won't see this issue ourselves but there are many single thread applications that could start using this and we should see if they have problems

closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the name doesn't matter I wonder why we spent so much effort making sure it had the same name?

@ableegoldman
Copy link
Copy Markdown
Member Author

One unrelated failure TransactionsBounceTest.testWithGroupId() (known to be flaky, see KAFKA-10251)

@ableegoldman ableegoldman merged commit 23b61ba into apache:trunk Mar 3, 2021
ableegoldman added a commit that referenced this pull request Mar 3, 2021
…#10215)

Always grab a new thread.id and verify that a thread has fully shut down to DEAD before removing from the `threads` list and making that id available again

Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
@ableegoldman
Copy link
Copy Markdown
Member Author

Merged to trunk and cherrypicked to 2.8 cc @vvcephei

ableegoldman added a commit to confluentinc/kafka that referenced this pull request Mar 3, 2021
…apache#10215)

Always grab a new thread.id and verify that a thread has fully shut down to DEAD before removing from the `threads` list and making that id available again

Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
ijuma added a commit to ijuma/kafka that referenced this pull request Mar 3, 2021
* apache-github/trunk:
  KAFKA-12400: Upgrade jetty to fix CVE-2020-27223
  MINOR: Fix null exception in coordinator log (apache#10250)
  y
  KAFKA-12375: don't reuse thread.id until a thread has fully shut down (apache#10215)
  KAFKA-12177: apply log start offset retention before time and size based retention (apache#10216)
  KAFKA-12369; Implement `ListTransactions` API (apache#10206)
lct45 pushed a commit to confluentinc/kafka that referenced this pull request Mar 11, 2021
…apache#10215)

Always grab a new thread.id and verify that a thread has fully shut down to DEAD before removing from the `threads` list and making that id available again

Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants