Conversation
| } catch (final RuntimeException e) { | ||
| throw new StreamsException("Thread Producer encounter unexpected error trying to close", e); | ||
| } | ||
| threadProducer.close(); |
There was a problem hiding this comment.
Exception handling is moved into StreamsProducer (similar below)
| this.producer = Objects.requireNonNull(producer, "producer cannot be null"); | ||
| this.applicationId = applicationId; | ||
| this.eosEnabled = eosEnabled; | ||
| this.eosEnabled = applicationId != null; |
There was a problem hiding this comment.
We need the application.id only if EOS is enabled -- hence we simplify the constructor to 3 parameter and enable EOS is application.id is provided (this avoid the redundant eosEnabled parameter)
|
|
||
| private String formatException(final String message) { | ||
| return message + " [" + logPrefix + ", " + (eosEnabled ? "eos" : "alo") + "]"; | ||
| return message + " [" + logPrefix + "]"; |
There was a problem hiding this comment.
It's redundant to log if EOS is enabled or not; it's a single global config. Not need to include.
There was a problem hiding this comment.
I would rather we keep it here so that we don't need to look around the log to identify the latest restart log to determine whether we are on EOS or not.
There was a problem hiding this comment.
If we argue this way, you could also justify to log the whole config each time :)
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the PR, the changes LGTM, but I still have doubts about how this could simplifies the work for KIP-447, as activeTaskCreator shall only handle the producer close and metric stuff IIUC?
|
|
||
| private String formatException(final String message) { | ||
| return message + " [" + logPrefix + ", " + (eosEnabled ? "eos" : "alo") + "]"; | ||
| return message + " [" + logPrefix + "]"; |
There was a problem hiding this comment.
I would rather we keep it here so that we don't need to look around the log to identify the latest restart log to determine whether we are on EOS or not.
|
After taking to @guozhangwang offline, I realize that this PR does actually not make sense. The idea of KIP-447 is to have a |
Related to KIP-447 (extracted from #8218 to keep the other PR smaller):
We want to use
StreamsProducerinstead ofKafkaProducerwithinActiveTaskCreator. Furthermore,StreamsProducershould expose metrics and close the internalKafkaProducer.The lion's share of the PR is additional code cleanup, mostly related to generics.
Call for review @guozhangwang @abbccdda @vvcephei