KAFKA-12829: Remove the deprecated method init(ProcessorContext, StateStore) from the StateStore interface#16906
Conversation
| default void init(final StateStoreContext context, final StateStore root) { | ||
| init(StoreToProcessorContextAdapter.adapt(context), root); | ||
| } | ||
| default void init(final StateStoreContext context, final StateStore root) {} |
There was a problem hiding this comment.
This is used to replace the deprecated init, so I guess it should NOT have default implementation after removing the deprecated version?
@guozhangwang WDYT?
| @Override | ||
| public void init(final ProcessorContext context, | ||
| final StateStore root) { | ||
| private void initInternal(final ProcessorContext context, |
There was a problem hiding this comment.
Could you please merge initInternal into init? we don't need to have a private method used to generate a small public method :)
| @Override | ||
| public void init(final ProcessorContext context, | ||
| final StateStore root) { | ||
| private void initInternal(final ProcessorContext context, |
|
@chia7712 Thanks for the code review, I have fixed it, PTAL again |
init(ProcessorContext, StateStore) from the StateStore interfaceinit(ProcessorContext, StateStore) from the StateStore interface
| this.context = context; | ||
| public void init(final StateStoreContext stateStoreContext, final StateStore root) { | ||
| this.stateStoreContext = stateStoreContext; | ||
| this.context = StoreToProcessorContextAdapter.adapt(stateStoreContext); |
There was a problem hiding this comment.
This feels rather hacky -- using adapt was a workaround as long as we have both init() methods. I think, we need to do more cleanup/refactoring and get rid or this.context completely.
In other stores, we use InternalProcessorContext -- I think doing the same here would be the right way forward (and get rid of both stateStoreContext and context in favor of internalProcessorContext)?
Applies elsewhere, too.
(Ideally, we should be able to delete class StoreToProcessorContextAdapter entirely)
| public void init(final ProcessorContext context, final StateStore root) { | ||
| throw new UnsupportedOperationException("cannot initialize a logical segment"); | ||
| } | ||
| public void init(final StateStoreContext context, final StateStore root) {} |
There was a problem hiding this comment.
This method should throw (as the old one)
| @Override | ||
| public void init(final ProcessorContext context, | ||
| final StateStore root) { | ||
| initInternal(asInternalProcessorContext(context)); |
There was a problem hiding this comment.
If we call initInternal only in a single place now, we might consider inlining it?
There was a problem hiding this comment.
agree. @xijiu please apply "inline" to elsewhere, too
| public void init(final ProcessorContext context, | ||
| public void init(final StateStoreContext stateStoreContext, | ||
| final StateStore root) { | ||
| final ProcessorContext context = StoreToProcessorContextAdapter.adapt(stateStoreContext); |
There was a problem hiding this comment.
Do we need context at all? Seem we can use stateStoreContext below got call appConfigs() and register() ?
| this.context = context; | ||
| final String topic = ProcessorStateManager.storeChangelogTopic(prefix, name(), context.taskId().topologyName()); | ||
| this.context = asInternalProcessorContext(context); | ||
| final String topic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE); |
There was a problem hiding this comment.
Can we exclude this change from this PR, as this PR should only remove init() to address KAFKA-12829, and pull this change into a follow up PR for KAFKA-13588.
| } | ||
|
|
||
| @Test | ||
| public void shouldStoreAndReturnStateStores() { |
There was a problem hiding this comment.
Why do we remove this test? Should we rather rewrite it calling the new init() passing in a mocked StateStoreContext object?
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the PR. Made a pass. This is a complex change. Not sure if I caught everything... Might find more in a second pass after the PR was updated.
644f98c to
512acb8
Compare
mjsax
left a comment
There was a problem hiding this comment.
Thanks for updating the PR. A few more nits. Also, there are failing tests which seems to be related to the changes of this PR.
|
|
||
| @Override | ||
| public void init(final StateStoreContext context, final StateStore root) { | ||
| final String changelogTopic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE); |
There was a problem hiding this comment.
Can we inline initInternal here, too?
| public void init(final ProcessorContext context, final StateStore root) { | ||
| this.context = ProcessorContextUtils.asInternalProcessorContext(context); | ||
| changelogTopic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE); | ||
| init(root); |
There was a problem hiding this comment.
Seem this init is the same and internalInit in other classes -- can we inline it?
512acb8 to
4f613c7
Compare
|
hi @mjsax , I have improved the code based on the suggestions, include some junit test. And I also ran the PTAL, Thanks |
| } | ||
|
|
||
|
|
||
| public InternalProcessorContext getInternalProcessorContext() { |
There was a problem hiding this comment.
MockProcessorContext is public API and we cannot just add this. -- I assume this was necessary to fix some tests? For this case, we should do it differently.
|
|
||
| <suppress checks="MethodLength" | ||
| files="KTableImpl.java"/> | ||
| files="(KTableImpl|MockProcessorContext).java"/> |
| this.context = null; | ||
| expiredRecordSensor = null; | ||
| } | ||
| this.context = asInternalProcessorContext(stateStoreContext); |
There was a problem hiding this comment.
It seems this change breaks the test -- let's not apply this change but keep the code as-is.
4f613c7 to
0dcddcf
Compare
|
hello @mjsax , I have fixed the code, PTAL |
|
Thanks a ton for this PR @xijiu! Huge piece of work! Great job! Merged to |
…teStore)` from the `StateStore` interface (apache#16906) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax <matthias@confluent.io>


See the discussion: (#11611 (comment))
I did the following three things:
init (ProcessorContext, StateStore)method. Actually, it is only called in one test class(MockProcessorContextTest), then I deleted the relevant test method.init(ProcessorContext, StateStore)from theStateStoreand Its implementation classes. If the implementation class contains both methodsinit(ProcessorContext, StateStore)andinit(StateStoreContext, StateStore)simultaneously, then deleteinit(ProcessorContext, StateStore)directly. If the implementation class only contains methodsinit(ProcessorContext, StateStore), then I create a new methodinit(StateStoreContext, StateStore)and copy the relevant code to it. There are still many details in the code.TimeOrderedCachingWindowStorewas not repalaced byProcessorContextUtils#changelogFor, I moved it over while I was at it.Committer Checklist (excluded from commit message)