KAFKA-6849: add transformValues methods to KTable.#4959
KAFKA-6849: add transformValues methods to KTable.#4959guozhangwang merged 18 commits intoapache:trunkfrom
Conversation
Add overloads with Materialized
# Conflicts: # streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java # streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java # streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java # streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java # streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
|
@dguy , @guozhangwang , @mjsax can you guys take a look when you get a chance. The stricter the review the better. :D |
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks for the PR @big-andy-coates . Made a pass over it and left some comments.
| <tr class="row-even"><td><p class="first"><strong>Transform (values only)</strong></p> | ||
| <ul class="last simple"> | ||
| <li>KStream -> KStream</li> | ||
| <li>KTable -> KTable</li> |
There was a problem hiding this comment.
Could we piggy back the docs change for KIP-149 here as well? E.g. in line 2912 below, add ValueTransformerWithKey as well and emphasizing that the passed in key is for ready-only, but not mutable.
There was a problem hiding this comment.
@guozhangwang Not sure I follow exactly what doc changes you're after...
I can switch this paragraph to use ValueTransformerWithKey rather than ValueTransformer and call out that the key is readOnly. Is that what you're after?
| <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper); | ||
|
|
||
| /** | ||
| * Transform the value of each input record into a new value (with possible new type) of the output record. |
There was a problem hiding this comment.
nit for first sentence: Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value. Also we can add one more sentence comparing with mapValues that this API allows users to have more flexible custom transformation logic that can rely on additional state stores and record context, plus they can add period functions via scheduling etc.
Ditto below.
There was a problem hiding this comment.
docs already calls out the potential use of punctuate. I'll add something for more flexibility / state stores and record context.
|
|
||
| /** | ||
| * Transform the value of each input record into a new value (with possible new type) of the output record. | ||
| * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applies to each input |
There was a problem hiding this comment.
nit: add one more sentence that The resulted {@code KTable} is materialized into another state store (additional to the provided state store names) as specified by the user via {@link Materialized}, and is queryable through its given name.
|
|
||
| @Override | ||
| public <K, V> void forward(final K key, final V value) { | ||
| throw new StreamsException("ProcessorContext#forward() not supported."); |
There was a problem hiding this comment.
nit: not supported is a tad confusing, maybe sth. like .. is not allowed, please specify the output record in transform() return values.
There was a problem hiding this comment.
@guozhangwang That makes sense in the current use of this class, i.e. it's only used from KStream and KTable transformValues methods - but that might not always be the case.
Happy to change if you still think it better.
There was a problem hiding this comment.
@guozhangwang, what's your preference here?
There was a problem hiding this comment.
Since it is really nit I'm fine if you prefer it as is.
| import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; | ||
| import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
|
||
| /** |
| private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, | ||
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, | ||
| final String... stateStoreNames) { | ||
| Objects.requireNonNull(stateStoreNames, "stateStoreNames"); |
There was a problem hiding this comment.
Hmm... stateStoreNames can indeed be null right?
| name); | ||
| } | ||
|
|
||
| return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, queryableStoreName, isQueryable); |
There was a problem hiding this comment.
If the resulted KTable is not materialized, we should 1) set the queryableStoreName to the current queryableStoreName; and 2) set is Queryable to materialized != null && materialized.isQueryable(). Please see doFilter and doMapValues for references.
There was a problem hiding this comment.
Good spot! I did reference these for methods, but they've recently been changed. I'll update accordingly.
| public void init(final ProcessorContext context) { | ||
| parentGetter.init(context); | ||
|
|
||
| valueTransformer.init(new ForwardingDisabledProcessorContext(context)); |
There was a problem hiding this comment.
This makes me thinking: we should let least document that users should not rely on the number of generated valueTransformers in their logic, because here, we will create two instances for getter and processor, hence we'd actually get 2N instances of transformers given N tasks, etc.
There was a problem hiding this comment.
@guozhangwang, is it 2N always, or either 1N or 2N depending on whether the downstream processor node is materialized or not?
There was a problem hiding this comment.
It is not 2N always, I think the point here is that the valueTransformer.transform() call could be triggered more than once for each record, because of 1) we may use a getter that initialize a new value transformer, 2) we may send old values which calls transform again on the old values. This is unfortunately not very intuitive to users, and we'd better warn them about that for now. And in the future we can consider how to address this (I've filed a JIRA for it)
| private final String queryableName; | ||
| private boolean sendOldValues = false; | ||
|
|
||
| KTableTransformValues(final KTableImpl<K, ?, V> parent, |
There was a problem hiding this comment.
For KTableTransformValuesProcessor that extends AbstractProcessor, we should override its close() call as:
@Override
public void close() {
valueTransformer.close();
}
Similarly we should close the valueTransformer in Getter as well. Unfortunately we do not have a close() in KTableValueGetter yet. If there is no better way to call valueTransformer.close() when the topology is closing we should then add this function and make sure it is called when the dependent processor node is being closed.
There was a problem hiding this comment.
Can't see a better way, so adding close() to KTableValueGetter and wiring that up.
| inner.toStream[KR](mapper.asKeyValueMapper) | ||
|
|
||
| /** | ||
| * Transform the value of each input record into a new value (with possible new type) of the output record. |
There was a problem hiding this comment.
Ditto for the scala doc as above.
|
Also could you update the |
mjsax
left a comment
There was a problem hiding this comment.
Did not look into the tests in detail yet.
| } | ||
|
|
||
| @Override | ||
| public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, String... stateStoreNames) { |
There was a problem hiding this comment.
nit: add final
nit: one argument per line (line too long)
| } | ||
|
|
||
| private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, | ||
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, |
| private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier, | ||
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, | ||
| final String... stateStoreNames) { | ||
| Objects.requireNonNull(stateStoreNames, "stateStoreNames"); |
There was a problem hiding this comment.
Does this do the trick? The type is String[] and can only be null if somebody calls via
String[] stores = null;
table.transformValues(...., stores);
table.transformValues(...., null); // this would not trigger the check
Should better step through the array and check each entry for null ? Or is this intended and we rely on connectProcessorAndStateStores to do this check?
There was a problem hiding this comment.
Ah I now remembered: to guard against (... , null) we should follow the way in InternalTopologyBuilder#addStateStore. i.e.:
if (stateStoreNames != null) {
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(processorName, "store name must not be null");
}
}
There was a problem hiding this comment.
Yes, this check is to ensure someone hasn't done something daft like:
String[] stores = null;
table.transformValues(...., stores);
If someone passes in a null store name it is caught later.
| } | ||
|
|
||
| @Override | ||
| public void process(K key, Change<V> change) { |
| @Override | ||
| public void process(K key, Change<V> change) { | ||
| final V1 newValue = computeValue(key, change.newValue, valueTransformer); | ||
| final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue, valueTransformer) : null; |
There was a problem hiding this comment.
Not sure about this... If the operator is stateful, the computation might depend on the state. Thus, this might not compute the correct oldValue...
On the other hand, I am wondering if we need the old value in the first place -- also, the old value would be store in the result KTable state (if there is any) -- thus, maybe using the result KTable state might be the better option (maybe we even need to force a materialization if we need to send oldValue.
WDYT?
There was a problem hiding this comment.
Hmm good question ... I think we still need the old values, but I also agrees that we can just get it from the getter than re-computing it. Note that if the result KTable is not materialized, then getter will rely on parent's materialized store as well which is still an issue.
This is a known issue that I have been thinking to fix, but the scope could be quite large. Maybe we can create a JIRA for fixing this separately.
I will create a JIRA for now.
There was a problem hiding this comment.
There is plenty of scope for computing the wrong old value here. I've changed the code to make the computation of old value very explicit.
Materialized tables are easy. Where unmaterialized tables are mixed with stateful transformations it is easy for users to break downstream aggregates. (There's a test that shows this). I've documented this in the Java doc. I think it still makes sense to have the ability to pass in additional stateStoreNames for the non-materialized transformValues method call. Not allowing them would remove a big 'gotcha' but may be overly restrictive too.
No doubt the wording of the Java docs around this issue will need finessing, and you may prefer me to remove the 'prove this breaks tests', but just let me know.
| * @see `org.apache.kafka.streams.kstream.KStream#transformValues` | ||
| */ | ||
| def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], | ||
| def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], |
There was a problem hiding this comment.
Intellij madness that I missed. Fixing...
There was a problem hiding this comment.
Ah, no, not madness - I changed it to match the scala doc parameter name.
There was a problem hiding this comment.
Maybe we should update the doc parameter name instead?
There was a problem hiding this comment.
Sure, I just prefer short param names.
|
Created https://issues.apache.org/jira/browse/KAFKA-6903 as per @mjsax 's comment |
|
@big-andy-coates could you rebase? There is a recent commit changing |
|
Hey @guozhangwang @mjsax, I've pushed some commits. The size of the PR has grown somewhat due to adding the I think all issues have been addressed. Likely contentious points are:
|
guozhangwang
left a comment
There was a problem hiding this comment.
Left some more comments.
| * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) | ||
| * @see `org.apache.kafka.streams.kstream.KStream#transformValues` | ||
| */ | ||
| def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], |
There was a problem hiding this comment.
There are some proposals on refactoring the scala api to make better leverage on its type inference: #5019
I think to be consistent with that PR the API should be
transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR]) (materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
stateStoreNames: String*): KTable[K, VR])
I will let @joan38 to chime in here as well:
There was a problem hiding this comment.
It's been a while since i've looked at Scala, can I suggest such changes are left to the linked PR? i.e. we try and merge this one first, as there seems to be open questions of the other PR.
There was a problem hiding this comment.
It should be ok here since valueTransformerWithKeySupplier doesn't have parameters that depends on another.
| public void init(final ProcessorContext context) { | ||
| super.init(context); | ||
|
|
||
| oldValueGetter.init(context); |
There was a problem hiding this comment.
This seems an overkill to me: for old values, we only need change.oldValue, and actually all of its implementations either do not need the value, or only look for change.oldValue.
We can probably simply it in line 132, just as:
oldValue = queryableName ? /* get old value from materialized store*/ : computeValue(key, change.oldValue, valueTransformer);
There was a problem hiding this comment.
Actually I think using the materialized store to get the old value can be a separate optimization PR by itself: we should consider doing this for filter, mapValues, transformValues, etc, all operators.
To keep the scope / loc of this PR small, we can consider only use computeValue(key, change.oldValue, valueTransformer)
There was a problem hiding this comment.
This does mean backing out changes that will make this PR less correct. I'm happy to do it, but my opinion would be to leave it in.
With regards to the overkill... yep, you're absolutely right... doh - not sure how I missed that- sorting.
|
@big-andy-coates Updated https://issues.apache.org/jira/browse/KAFKA-6903 for the optimization on sending old values. |
|
@guozhangwang @mjsax changes pushed.
|
# Conflicts: # streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinValueGetter.java
|
@mjsax - are you happy? Is so, can you merge please? |
mjsax
left a comment
There was a problem hiding this comment.
You pushed during my review... submitting what I have a continue (cannot add not comments to this review)
| /** | ||
| * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value, | ||
| * (with possibly new type). | ||
| * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applies to each input |
There was a problem hiding this comment.
is applie[d] (please double check if this is c&p error and must be fixed somewhere else, too.
There was a problem hiding this comment.
done everywhere.
| * This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing access to additional state-stores, | ||
| * and access to the {@link ProcessorContext}. | ||
| * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional | ||
| * periodic actions get be performed. |
There was a problem hiding this comment.
c&P again - done everywhere.
| * new ValueTransformerWithKeySupplier() { | ||
| * ValueTransformerWithKey get() { | ||
| * return new ValueTransformerWithKey() { | ||
| * private StateStore state; |
There was a problem hiding this comment.
Should we update this to be a KeyValueStore to align with the registered store from the example above?
There was a problem hiding this comment.
Sure. (Shame a cast is involved here :( )
| * private StateStore state; | ||
| * | ||
| * void init(ProcessorContext context) { | ||
| * this.state = context.getStateStore("myValueTransformState"); |
There was a problem hiding this comment.
might need to insert a cast (cf. comment above)
| /** | ||
| * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value, | ||
| * (with possibly new type). | ||
| * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applies to each input |
| } | ||
|
|
||
| @Override | ||
| public Integer transform(String readOnlyKey, String value) { |
| } | ||
| } | ||
|
|
||
| private static class StatefulTransformer implements ValueTransformerWithKey<String, String, Integer> { |
There was a problem hiding this comment.
maybe rename s.th. it describes what it does?
There was a problem hiding this comment.
What it does isn't as important as the fact its stateful vs stateless IMHO.
| } | ||
| } | ||
|
|
||
| private static class StatelessTransformer implements ValueTransformerWithKey<String, String, Integer> { |
There was a problem hiding this comment.
maybe rename s.th. it describes what it does?
| driver = new TopologyTestDriver(builder.build(), props()); | ||
|
|
||
| driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L)); | ||
| driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aa", 0L)); |
There was a problem hiding this comment.
maybe use "a", "b", "c" as values, as the transformer counts the number of calls to process (for better distinction with next test)
There was a problem hiding this comment.
yer, or maybe just 'ignored'
| public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() { | ||
|
|
||
| builder.addStateStore(storeBuilder(STORE_NAME)) | ||
| .table(INPUT_TOPIC, CONSUMED) |
| @Test | ||
| public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() { | ||
|
|
||
| builder.addStateStore(storeBuilder(STORE_NAME)) |
|
|
||
| @Test | ||
| public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() { | ||
|
|
|
|
||
| @Test | ||
| public void willUnfortunatelyCalculateIncorrectOldValuesIfStatefulAndNotMaterialized() { | ||
| builder.addStateStore(storeBuilder(STORE_NAME)) |
| @Test | ||
| public void willUnfortunatelyCalculateIncorrectOldValuesIfStatefulAndNotMaterialized() { | ||
| builder.addStateStore(storeBuilder(STORE_NAME)) | ||
| .table(INPUT_TOPIC, CONSUMED) |
| driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aaa", 0L)); | ||
|
|
||
| assertThat(output(), not(hasItems("A:1", "A:0", "A:2", "A:0", "A:3"))); | ||
| // Output more likely to be "A:1", "A:-2", "A:0", "A:-5", "A:-1" |
There was a problem hiding this comment.
well, indeed. I did call it out. It does highlight the issue and, if it starts to fail, there is a doc update needed. Happy to remove though. (I agree its a bit weird.)
There was a problem hiding this comment.
The idea with the doc change is good -- maybe put a comment about this -- otherwise, when it fails, we might not remember the purpose of this test.
| final List<String> expectedStoredNames) { | ||
| final List<String> missing = new ArrayList<>(); | ||
|
|
||
| for (String storedName : expectedStoredNames) { |
| return new ValueTransformerWithKey<K, V, V>() { | ||
|
|
||
| @Override | ||
| public void init(ProcessorContext context) { |
| } | ||
|
|
||
| @Override | ||
| public V transform(K readOnlyKey, V value) { |
| import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
|
||
| public class NoOpInternalValueTransformer<K, V> implements ValueTransformerWithKeySupplier<K, V, V> { | ||
| public ProcessorContext context; |
There was a problem hiding this comment.
I don't think that the context should be a member of the supplier, but rather of the transformer -- otherwise, multiple transformers share the same context ?
There was a problem hiding this comment.
I agree, but this is a simple test class and switching it to be a member of the transformer muddies the test cases. How strongly do you feel about this?
There was a problem hiding this comment.
I see... Maybe rename to SingletonNoOpInternalValueTransformer and only create one ValueTransformerWithKey instance? This makes it more clear why it's designed this way? From the context of the class itself it's unclear -- only makes sense if you check it's usage in the test. Would be good to be self-descriptive IMHO.
There was a problem hiding this comment.
Done - also, 'Internal' part dropped inline with change that dropped the InternalValueTranformerWithKey interface
|
@mjsax nits fixed ;) |
|
retest this please |
See the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-292%3A+Add+transformValues%28%29+method+to+KTable This PR adds the transformValues method to the KTable interface. The semantics of the call are the same as the methods of the same name on the KStream interface. Fixes KAFKA-6849 Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
See the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-292%3A+Add+transformValues%28%29+method+to+KTable
This PR adds the
transformValuesmethod to theKTableinterface. The semantics of the call are the same as the methods of the same name on theKStreaminterface.Fixes KAFKA-6849