MINOR: consolidate processor context for active/standby#8669
MINOR: consolidate processor context for active/standby#8669guozhangwang merged 15 commits intoapache:trunkfrom
Conversation
|
Call for review @cadonna @guozhangwang @vvcephei |
There was a problem hiding this comment.
I felt these were just cluttering up this class so I moved them to a new file
62d7079 to
751e8e8
Compare
751e8e8 to
2a506c4
Compare
| import org.apache.kafka.streams.state.WindowStoreIterator; | ||
| import org.apache.kafka.streams.state.internals.WrappedStateStore; | ||
|
|
||
| abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> { |
There was a problem hiding this comment.
All of this (and the other new class below) was just copied over from ProcessorContextImpl. I changed the named to AbstractXXXDecorator since it's an abstract class but it's all otherwise unchanged
| } | ||
|
|
||
| /** | ||
| * @throws UnsupportedOperationException if the current task type is standby |
There was a problem hiding this comment.
No logical changes here, just added a check for any methods that were previously overridden in the standby context to throw UnsupportedOperation
| logChange(key, value, context.timestamp()); | ||
| } | ||
|
|
||
| void logChange(final K key, |
There was a problem hiding this comment.
This class was the root cause of the processor context issue blocking the active <--> standby task conversion. I was taking pieces out of it bit by bit and by the end it seemed pointless to have at all
| void log(final Bytes key, | ||
| final byte[] value) { | ||
| changeLogger.logChange(key, value); | ||
| context.logChange(name(), key, value, context.timestamp()); |
There was a problem hiding this comment.
We now just delegate to the context to figure out how/what to log
vvcephei
left a comment
There was a problem hiding this comment.
Thanks for this, @ableegoldman !
I had a few comments; once they're addressed, I'll be +1 to merge this.
Thanks,
-John
| @Override | ||
| public TaskType taskType() { | ||
| return stateManager.taskType(); | ||
| } |
There was a problem hiding this comment.
Looks like this doesn't need to be defaulted here. If the logic doesn't apply to all the implementing classes, it's better not to define it in the abstract class.
There was a problem hiding this comment.
Ah, actually it does. I should remove the override from the ProcessorContextImpl instead
| BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer(); | ||
| ByteArraySerializer BYTE_ARRAY_VALUE_SERIALIZER = new ByteArraySerializer(); | ||
|
|
There was a problem hiding this comment.
It doesn't seem like these need to be defined here, since they're only used outside of this interface.
They actually only have two, independent, usages, and it doesn't seem that important to de-duplicate the instances. Can we just copy them to separate constants in the classes that need them?
There was a problem hiding this comment.
Yeah, this was part of a larger refactoring that I walked back. That sounds like a reasonable request 👍
| EasyMock.reset(context); | ||
| EasyMock.expect(context.timestamp()).andStubReturn(0L); | ||
| context.logChange(store.name(), key1, value, 0L); | ||
| context.logChange(store.name(), key2, value, 0L); |
There was a problem hiding this comment.
We don't need expectLastCall() on these (and everywhere else)?
There was a problem hiding this comment.
The expectLastCall is redundant if you're not chaining it with something else (like expectLastCall().times(2))
I verified this just to be sure by removing the invocation of context.logChange in this class and it did indeed fail
| private final List<CapturedForward> capturedForwards = new LinkedList<>(); | ||
| private boolean committed = false; | ||
|
|
||
|
|
There was a problem hiding this comment.
Not sure about this change ;)
|
test this please |
|
Test this please |
1 similar comment
|
Test this please |
|
All builds failed due to some mysterious |
|
test this please |
|
All builds failed on |
|
test this please |
guozhangwang
left a comment
There was a problem hiding this comment.
LGTM overall. I'd like to also ping @cadonna since he's reviewing another large PR that merges MockInternal- and InternalMock- ProcessorContext which would have some conflicts with this one.
| final Bytes key, | ||
| final byte[] value, | ||
| final long timestamp) { | ||
| recordCollector().send( |
There was a problem hiding this comment.
When we merge the InternalMock with MockInternal would this be okay? cc @cadonna
There was a problem hiding this comment.
I assumed this would be fine since it's pretty much what happened before (ie users of the context would get the record collector and then call send) but I'd like to get this confirmed
There was a problem hiding this comment.
Both, MockInternalProcessorContext and InternalMockProcessorContext currently implement RecordCollector.Supplier (i.e., recordCollector()). Of course, once rebased the consolidated mock needs to implement taskType() and logChange().
See also my comment above regarding KEY_SERIALIZER and VALUE_SERIALIZER.
@guozhangwang did you have anything specific in mind that I did not cover here?
There was a problem hiding this comment.
No I do not, just wanting to make sure we do not have any major conflicts when rebasing the other.
| logChange(key, value, context.timestamp()); | ||
| } | ||
|
|
||
| void logChange(final K key, |
cadonna
left a comment
There was a problem hiding this comment.
@ableegoldman Thank you for this! I am looking forward to use this in the fix for KAFKA-9603.
Here my feedback.
| final Bytes key, | ||
| final byte[] value, | ||
| final long timestamp) { | ||
| throwUnsupportedOperationExceptionIfStandby("logChange"); |
There was a problem hiding this comment.
req: Please unit test a processor context for a standby with the unsupported methods. That is, all code paths that involve a call to throwUnsupportedOperationExceptionIfStandby().
| import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.KEY_SERIALIZER; | ||
| import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.VALUE_SERIALIZER; |
There was a problem hiding this comment.
prop: I think we should move those to InternalProcessorContext. IMO, it would be cleaner for a mock not to have a direct dependency to the class it mocks. The new consolidated mock for the internal processor context will not extend AbstractProcessorContext but only MockProcessorContext. Thus, the common ancestor will be InternalProcessorContext.
Sorry for bothering you because of those constants after that @vvcephei has already bothered you. :-)
| final Bytes key, | ||
| final byte[] value, | ||
| final long timestamp) { | ||
| recordCollector().send( |
There was a problem hiding this comment.
Both, MockInternalProcessorContext and InternalMockProcessorContext currently implement RecordCollector.Supplier (i.e., recordCollector()). Of course, once rebased the consolidated mock needs to implement taskType() and logChange().
See also my comment above regarding KEY_SERIALIZER and VALUE_SERIALIZER.
@guozhangwang did you have anything specific in mind that I did not cover here?
|
test this please |
|
LGTM. Will merge after green builds. |
|
test this please |
1 similar comment
|
test this please |
|
Failed due to flaky |
This is a prerequisite for KAFKA-9501 and will also be useful for KAFKA-9603
There should be no logical changes here: the main difference is the removal of
StandbyContextImplin preparation for contexts to transition between active and standby.Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite decorators out into a separate file.