KAFKA-10546: Deprecate old PAPI#10869
Conversation
vvcephei
left a comment
There was a problem hiding this comment.
Do you mind taking a look at this?
The original plan was to migrate all the internal processors and then deprecate the old PAPI, but it seems like we may not make the 3.0 release feature freeze with that approach. Instead, I'm proposing this PR to go ahead and deprecate the old API first and just suppress all deprecation warnings on our internals.
There was one side-effect. If you look at KIP-478, you'll see we proposed not to change the KStream#process method, in the hopes that we would already have a good replacement by the time of the deprecation. It also doesn't seem like that's going to happen, so I'm going to update the proposal to actually deprecate those methods and add new-PAPI versions.
Despite there being a lot of changed lines in this PR, I think it should be a pretty quick read. Most of the changes are:
- suppressing warnings
- switching to fully-qualified class names for the old API, since it's not possible to suppress deprecation warnings on imports.
| * Adds a state store to the underlying {@link Topology}. | ||
| * <p> | ||
| * It is required to connect state stores to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers}, | ||
| * It is required to connect state stores to {@link org.apache.kafka.streams.processor.api.Processor Processors}, |
There was a problem hiding this comment.
Switching to the new interface where possible.
| * | ||
| * @param name the unique name of the processor node | ||
| * @param supplier the supplier used to obtain this node's {@link Processor} instance | ||
| * @param supplier the supplier used to obtain this node's {@link org.apache.kafka.streams.processor.Processor} instance |
There was a problem hiding this comment.
Specifying old PAPI by fully qualified name where necessary.
| * <p> | ||
| * Note that it is possible to emit multiple records for each input record by using | ||
| * {@link ProcessorContext#forward(Object, Object) context#forward()} in | ||
| * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) context#forward()} in |
There was a problem hiding this comment.
Transformation will be migrated separately via https://issues.apache.org/jira/browse/KAFKA-10603, so we just need to make sure the javadoc references the correct context. Note that for this reason, we did not deprecate the old ProcessorContext.
| * @deprecated Since 3.0. Use {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, java.lang.String...)} instead. | ||
| */ | ||
| @Deprecated | ||
| void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier, | ||
| final String... stateStoreNames); |
There was a problem hiding this comment.
Note, git got confused. This is the pre-existing API, which is deprecated now.
| <KOut, VOut> void process(final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier, | ||
| final String... stateStoreNames); |
There was a problem hiding this comment.
Here is the new API. Note: the forwarding type should technically be Void, since you can't forward from these processors, but I left it unbounded to potentially be compatible with whatever we decide to do in https://issues.apache.org/jira/browse/KAFKA-10603
There was a problem hiding this comment.
Just wondering, if we ended up not doing KAFKA-10603, then how to bound KOut/Vout to Void only?
There was a problem hiding this comment.
Thanks, @guozhangwang !
Yeah, it's kind of an either/or thing. If we make it KOut, VOut now, we can't tighten it to Void, Void later.
Although, coming back to it, I'm thinking that we actually should be able to make it Void, Void now and then relax it to KOut, VOut later. Maybe you can sanity-check me:
Now:
Def:
void process(final ProcessorSupplier<? super K, ? super V, Void, Void> processorSupplier)
Usage:
mystream.process(new ProcessorSupplier<String, String, Void, Void>(){
...
Processor<String, String, Void, Void> get() {
// calling forward is optional, but if they do it, they could only call:
context.forward(new Record(null, null, ...));
}
...
})
Later:
Def:
KStream<KOut, VOut> process(final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier)
Same Usage is still ok:
* Return is unused, which is fine
* Return KOut and VOut both bind to Void, which is also fine.
mystream.process(new ProcessorSupplier<String, String, Void, Void>(){
...
Processor<String, String, Void, Void> get() {
// calling forward is optional, but if they do it, they could only call:
context.forward(new Record(null, null, ...));
}
...
})
So it seems we actually could/should set them to Void right now. Does that seem right to you?
There was a problem hiding this comment.
If any template type K could bind to Void then that seems fine -- I was not clear about this myself :P
There was a problem hiding this comment.
@vvcephei +1 I like the last proposal: using Void and keep the option to migrate to K/V later.
| final NullPointerException exception = assertThrows( | ||
| NullPointerException.class, | ||
| () -> testStream.process(null)); | ||
| () -> testStream.process((ProcessorSupplier<? super String, ? super String, Object, Object>) null)); |
There was a problem hiding this comment.
We have to cast null now to resolve the overload we want.
| @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.") | ||
| def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V], | ||
| stateStoreNames: String*): Unit = { | ||
| val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier() | ||
| inner.process(processorSupplierJ, stateStoreNames: _*) | ||
| } |
There was a problem hiding this comment.
This is the mirror of the Java API: we have to do the same thing in Scala for the same reason.
| * Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with | ||
| * the older (deprecated) overload. |
There was a problem hiding this comment.
Note this unfortunate fact. This means that Scala users won't be able to use SAM conversion anymore for this particular method. The only alternative we have is to give the new methods a different name.
| Reducer, | ||
| Transformer, | ||
| ValueJoiner, | ||
| ValueMapper, |
There was a problem hiding this comment.
For whatever reason, Spotless Scala insisted on changing the order.
| private class SimpleProcessorSupplier private[TopologyTest] (val valueList: util.List[String]) | ||
| extends ProcessorSupplier[String, String, Nothing, Nothing] { | ||
|
|
||
| override def get(): Processor[String, String, Nothing, Nothing] = | ||
| (record: api.Record[String, String]) => valueList.add(record.value()) |
There was a problem hiding this comment.
I went ahead and changed the test to demonstrate that SAM conversion doesn't work with these methods, so we have to declare a supplier instead of a () => processor.
|
We'll have to take a look at the scala 2.12 builds, since I don't think the suppression I used is available in that version. |
…tream.process methods, which means I also have to add a bunch more suppressions where that method is used in tests.
guozhangwang
left a comment
There was a problem hiding this comment.
I made a quick pass on the public classes, just one question regarding forward-looking for https://issues.apache.org/jira/browse/KAFKA-10603.
| <KOut, VOut> void process(final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier, | ||
| final String... stateStoreNames); |
There was a problem hiding this comment.
Just wondering, if we ended up not doing KAFKA-10603, then how to bound KOut/Vout to Void only?
|
Thanks for the review, @jeqo ! I think we're pretty much good to go on this, once @guozhangwang has a chance to review the recent changes to the |
|
@vvcephei I took a quick look at the more recent commits (for |
|
Thanks, @guozhangwang ! Yes, that's the area of the code I was looking for a double-check on. I switched the KOut/VOut parameters to Void/Void as you suggested. |
* Deprecate the old Processor API * Suppress warnings on all internal usages of the old API (which will be migrated in other child tickets of KAFKA-8410) * Add new KStream#process methods, since KAFKA-10603 has not seen any action.
(which will be migrated in other child tickets of KAFKA-8410)
Committer Checklist (excluded from commit message)