MINOR: unify calls to get committed offsets and metadata#7463
MINOR: unify calls to get committed offsets and metadata#7463mjsax merged 2 commits intoapache:trunkfrom
Conversation
| return stateMgr.changelogPartitions(); | ||
| } | ||
|
|
||
| Map<TopicPartition, Long> committedOffsetForPartitions(final Set<TopicPartition> partitions) { |
There was a problem hiding this comment.
This is moved into StandbyTask as we don't share it any longer for both types of tasks.
| } | ||
|
|
||
| private boolean closeUnclean(final T task) { | ||
| private void closeUnclean(final T task) { |
There was a problem hiding this comment.
side cleanup: return type is never used.
| void transitionToRunning(final T task) { | ||
| log.debug("Transitioning {} {} to running", taskTypeName, task.id()); | ||
| running.put(task.id(), task); | ||
| task.initializeTaskTime(); |
There was a problem hiding this comment.
we remove the method entirely and init task time in the constructor of StreamTask (note, that we don't need to init task time for StandbyTasks anyway)
There was a problem hiding this comment.
This in now done implicitly with resume()
| } | ||
| } | ||
|
|
||
| private void initializeCommittedOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) { |
There was a problem hiding this comment.
This is a mix of "old" initializeStateStores (the part to filter for source topic partitions that are use as changelogs) and "old" AbstractTaks#committedOffsetForPartitions)
| stateMgr.putOffsetLimits(committedOffsetsForChangelogs); | ||
| } | ||
|
|
||
| private void initializeTaskTime(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) { |
There was a problem hiding this comment.
This is not private. Same implementation as before, however, we get offsetsAndMetadata passed in, and don't use the consumer to get them from the brokers.
|
|
||
|
|
||
| @Override | ||
| public boolean initializeStateStores() { |
There was a problem hiding this comment.
Just simplified -- moved the offset initialization into the constructor.
| final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); | ||
| result.putAll(adminClientMetrics); | ||
| return result; | ||
| return new LinkedHashMap<>(adminClientMetrics); |
There was a problem hiding this comment.
Why do we need a LinkedHashMap here? Particularly because the interface doesn't suggest it needs anything more than a Map implementation? Actually I'm also wondering why we need to copy at all since we get back a read only view of the map from Admin and it appears the only consumer of adminClientMetrics immediately copies the results anyway.
There was a problem hiding this comment.
Not sure about the LinkedHashMap -- maybe @guozhangwang knows more?
For why to copy: it's not a public contract that we get a read-only map and hence we need to guard the map from writes via a copy. Not sure if we could rely on an AdminClient implementation detail... \cc @cmccabe ?
There was a problem hiding this comment.
Yes, the only use case I found above this code is to immediately copy it into yet another map (which is why the LinkedHashMap also doesn't make sense), so it should not matter, unless we're expecting to do something else with it down the road. Anyway, per our discussion I'm OK with this going in without fixing it, but it is low hanging fruit to reduce unnecessary work.
|
Call for review @guozhangwang @cadonna @cpettitt-confluent |
| public void initializeTaskTime() { | ||
| //no-op | ||
| } | ||
| public void initializeTopology() {} |
There was a problem hiding this comment.
Could we also move this only to the StreamTask? Doesn't have to be in this PR.
There was a problem hiding this comment.
Not really, because we call initializeTopology() within AssignedTasks#transitToRunning(T task) that does not know the type (it generic T extends Tasks) -- hence, we would need to do an instanceof call within that method what is undesired.
There was a problem hiding this comment.
Oh, I see. I did not think about that dependency. Nevertheless I think somehow it would be possible to move initializeTopology() to StreamsTask without an instanceof but that needs a bit more thoughts and refactoring and it is out-of-scope for this PR.
| initializeCommittedOffsets(offsetsAndMetadata); | ||
| initializeTaskTime(offsetsAndMetadata); |
There was a problem hiding this comment.
Apparently it is not completely save to call instance methods from the constructor since the object and its member variables are not completely initialised yet. Static methods would be fine, though.
I do not think that initializeCommittedOffsets() is currently problematic since it only accesses member variables from AbstractTask which should be already initialised. However, initializeTaskTime() accesses partitionGroup which is created in the constructor. Since we cannot know what reorderings a compiler does, it would be better to specify initializeTaskTime() as static and pass in partitionGroup. To make initializeCommittedOffsets() future-proof, it would also be good to also transform it to a static method.
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass on the PR, lgtm modulo @cadonna 's comments.
| */ | ||
| AbstractTask(final TaskId id, | ||
| final Collection<TopicPartition> partitions, | ||
| final Set<TopicPartition> partitions, |
There was a problem hiding this comment.
Why reducing the type generality here?
There was a problem hiding this comment.
In StreamTask constructor we call consumer.committed(...) that takes a Set<TopicPartitions> -- hence, it's easier to pass in a Set from the beginning on. Second, thinking about it, a Collection is actually too generic anyway -- we would not want to assign a List of partitions that may contain duplicates IMHO.
| try { | ||
| if (!entry.getValue().initializeStateStores()) { | ||
| final T task = entry.getValue(); | ||
| task.initializeMetadata(); |
There was a problem hiding this comment.
This is new -- we need to initialize the task after we created it.
| } | ||
|
|
||
| @Override | ||
| public void initializeMetadata() {} |
| throw new ProcessorStateException(format("%sError while deleting the checkpoint file", logPrefix), e); | ||
| } | ||
| } | ||
| initializeMetadata(); |
There was a problem hiding this comment.
This is also new -- we implicitly re-initialize the task on resume() -- was done "outside" before
There was a problem hiding this comment.
Do we really need to re-initialize committed offsets and task time during Resume?
There was a problem hiding this comment.
Yes, because in StreamTask#suspend() we call closeTopology() that calls partitionGroup.clear() that resets all times to UNKNOWN.
| static ProcessorTopology withRepartitionTopics(final List<ProcessorNode> processorNodes, | ||
| final Map<String, SourceNode> sourcesByTopic, | ||
| final Set<String> repartitionTopics) { | ||
| private static ProcessorTopology withRepartitionTopics(final List<ProcessorNode> processorNodes, |
| final ProcessorTopology topology = withSources( | ||
| asList(source1, source2, processorStreamTime, processorSystemTime), | ||
| mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2)) | ||
| mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2)) |
| assertThat(expectedError, is(singletonList("ERROR"))); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") |
| throw new KafkaException("KABOOM!"); | ||
| } | ||
| task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { | ||
| throw new KafkaException("KABOOM!"); |
| } | ||
|
|
||
| @Test | ||
| public void shouldInitTaskTimeOnResumeWithEosDisabled() { |
| public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() { | ||
| final Consumer<byte[], byte[]> consumer = mockConsumerWithCommittedException(new AuthorizationException("message")); | ||
| final StreamTask task = createOptimizedStatefulTask(createConfig(false), consumer); | ||
| task.initializeMetadata(); |
There was a problem hiding this comment.
we moved initializing the offsets from initializeStateStores into initializeMetadata()
|
|
||
| final ProcessorTopology topology = ProcessorTopologyFactories.with( | ||
| asList(source1), | ||
| singletonList(source1), |
|
Updated this PR. Call for review. As an afterthought, I realized that we should not init the offsets and task time within the constructor of Hence, I refactored the code and added a new unified init method to get the offsets. This also addressed @cadonna concerns about the initialization order within the constructor. |
|
@cadonna @cpettitt-confluent could you guys take another look? |
cpettitt-confluent
left a comment
There was a problem hiding this comment.
One comment above on one of the side cleanups where I think we can go a bit further. Otherwise LGTM.
|
Java 11 / 2.12 passed. Others failed -- test result not available any longer. Retest this please. |
|
Java 11 / 2.13: |
Reviewers: Chris Pettitt <cpettitt@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>
|
Merged to |
This is a follow up to #7304 and #6694 to unify the calls to
consumer.committed()further and only call it once during startup instead of twice.