Conversation
03ee17e to
d22143b
Compare
b73a533 to
7917638
Compare
41900e1 to
ddef3d1
Compare
7917638 to
a366fc3
Compare
|
Trying to understand the dependency here: should I only review this after #9836 is merged? |
552131c to
2e46bce
Compare
a366fc3 to
a48dd7f
Compare
| @SuppressWarnings("deprecation") | ||
| public class StreamsConfig extends AbstractConfig { | ||
|
|
||
| public static final long MAX_TASK_IDLE_MS_DISABLED = -1; |
There was a problem hiding this comment.
nit: move this down below to 147?
There was a problem hiding this comment.
Also nit, the line below should be
private static final Logger
| * (i.e., it increases or stays the same over time). | ||
| */ | ||
| public class PartitionGroup { | ||
| private static final Logger LOG = LoggerFactory.getLogger(PartitionGroup.class); |
There was a problem hiding this comment.
Is it more convienent to pass in the log object from AbstractTask to the PartitionGroup constructor? It is created with the logContext including the task-type / task-id.
There was a problem hiding this comment.
I can pass in the log context. I wouldn't pass the actual logger, though, because it would mess up common log4j usage patterns.
| bufferedPartitions, | ||
| emptyPartitions); | ||
| } | ||
| return true; |
There was a problem hiding this comment.
Should we log INFO if we are indeed enforcing processing? I.e. there are some empty partitions.
There was a problem hiding this comment.
Actually on a second thought.. if users configures -1 it means they probably do not care about enforced processing, while on the other side the INFO entry may flood the logs here. So NVM.
| } | ||
|
|
||
| @Override | ||
| public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { |
There was a problem hiding this comment.
The only reason that we need to add this function at Task seems to be tasks.activeTasksForInputPartition(partition) at TaskManager. and there's a TODO to convert its return to StreamTask anyways. So let's just move this function to StreamTask only and in TaskManager force convert the task to StreamTask. And then we can remove it from StandbyTask.
There was a problem hiding this comment.
Wouldn't it be good to avoid taking on unrelated refactoring TODOs in this PR? It seems better to me to leave it to whoever decides to pick up that TODO and instead just keep this PR focused on the feature.
| final int dummyThreadIdx = 1; | ||
| this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdx)) | ||
| .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); | ||
| this.maxPollTimeMs = maxPollTimeMs; |
There was a problem hiding this comment.
What's the rationale of this refactoring?
There was a problem hiding this comment.
Ah, this was leftover from when I was using the poll interval as a heuristic. I'll revert it.
| return lastRecordedValue; | ||
| } | ||
|
|
||
| @Before |
a48dd7f to
47d7856
Compare
|
LGTM! Once we have perf numbers quantifying its impact I think we can merge. |
1cc0bb7 to
ffbaeab
Compare
47d7856 to
0634433
Compare
| // TRACE for messages that don't wait for fetches, since these may be logged at extremely high frequency | ||
| // DEBUG when we waited for a fetch and decided to wait some more, as configured | ||
| // DEBUG when we are ready for processing and didn't have to enforce processing | ||
| // TRACE for messages that don't wait for fetches |
There was a problem hiding this comment.
I'm +1 on demoting these log entries! :)
|
I've just kicked another test. Please feel free to merge afterwards. |
bb23614 to
fc3ec40
Compare
fa1af20 to
12fb3c4
Compare
|
There was a merge conflict with trunk. Rebased and pushed. |
12fb3c4 to
bc1afd0
Compare
Use the new ConsumerRecords.metadata() API to implement improved task idling as described in KIP-695
|
Hmm, the Java 8 build appears to have hung after an hour and 58 minutes. It's been running for 3 hours and 30 minutes now. This is now the 16th build, and there have been multiple Java 8 successes to date, so I think it's environmental. I'll go ahead with the merge. |
This reverts commit 4d28391.
Use the new ConsumerRecords.metadata() API to implement
improved task idling as described in KIP-695
Committer Checklist (excluded from commit message)