Skip to content

KAFKA-7875: Add KStream.flatTransformValues#6424

Merged
bbejeck merged 6 commits intoapache:trunkfrom
cadonna:kafka7875-flatTransformValues
Apr 16, 2019
Merged

KAFKA-7875: Add KStream.flatTransformValues#6424
bbejeck merged 6 commits intoapache:trunkfrom
cadonna:kafka7875-flatTransformValues

Conversation

@cadonna
Copy link
Copy Markdown
Member

@cadonna cadonna commented Mar 11, 2019

  • Adds flatTrasformValues methods in KStream
  • Adds processor supplier and processor for flatTransformValues
  • Improves API documentation of transformValues

This contribution is my original work and I license the work to the project under the project's open source license.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented Mar 11, 2019

I was not able to use the following signature for flatTransformValues that would be consistent with transformValues:

<VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<? extends VR>> valueTransformerSupplier,
                                        final String... stateStoreNames);

When I tried to use this signature, I ran into compilation issues caused by generics. I was able to fix the compilation issues, but then I run again into the issue with value transformer suppliers that would not compile as lambda functions but would compile as anonymous inner classes. See the following discussion that we had about this issue during the implementation of flatTransform. It seems to me that Kafka's transformer suppliers are the root of the issue. So I tried a signature that uses the Supplier provided by Java 8:

public <VR> KStream<K, VR> flatTransformValues(final Supplier<ValueTransformerWithKey<? super K, ? super V, ? extends Iterable<? extends VR>>> valueTransformerSupplier, 
                                               final String... stateStoreNames)

With this signature I did not run into the lambda function compilation issue. Here more tests would be needed to check if all works as expected.

Replacing Kafka's transformer suppliers with Java's supplier would solve the issue and it would additionally reduce the code to maintain. The drawbacks are as follows:

  • The signature with Java's supplier is not backward-compatible for stream application that use the anonymous inner class specification.
  • The signatures with Java's supplier and Kafka's suppliers cannot exist side-by-side, because for suppliers specified as lambda functions the signatures cannot be distinguished.

Are there also other drawbacks that hinder to use the Java's supplier in the KStream API that I am not aware of?

Finally, because of these issues I was also not able to specify transformValues in terms of flatTransformValues as I did for transform.

@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented Mar 11, 2019

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Mar 14, 2019

Hey, @cadonna ,

Sorry for the slow reply, I've pulled down your branch and played around with it a bit. I've reached the same conclusion as you, the generic type inference behaves differently between our ValueTransformerWithKeySupplier and Java's Supplier. I don't think this is because Javac treats Supplier differently, I think it's just because the type arguments are bound on the ValueTransformerWithKeySupplier directly as <K, V, VR> in our version, and they're nested in <T> in Supplier.

I'm not 100% on my type algebra, but I think that the two should be equivalent, so I think we're just coping with the limitations of Java's generic type inference (again). In that case, without a deep knowledge of the java compiler's implementation, we're reduced to figuring out our options experimentally.

Here's what I found (acknowledging that it's pretty much the same as you said above):

All the examples are using this stream:

final KStream<Object, Object> asdf = builder.stream("asdf");

For flatTransformValues (with key) have two main options:

Our supplier

    <VR> KStream<K, VR> flatTransformValuesA(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends Iterable<? extends VR>> valueTransformerSupplier,
                                            final String... stateStoreNames);

Which we can use as:

        final KStream<Object, Number> streamA1 =
            asdf.flatTransformValuesA(new ValueTransformerWithKeySupplier<Object, Object, List<Integer>>() {
                @Override
                public ValueTransformerWithKey<Object, Object, List<Integer>> get() {
                    return new ValueTransformerWithKey<Object, Object, List<Integer>>() {
                        @Override public void init(final ProcessorContext context) { }

                        @Override
                        public List<Integer> transform(final Object readOnlyKey, final Object value) {
                            return asList(1, 2, 3);
                        }

                        @Override public void close() { }
                    };
                }
            });
        final KStream<Object, Number> streamA2 =
            asdf.flatTransformValuesA((ValueTransformerWithKeySupplier<Object, Object, List<Integer>>) () -> new ValueTransformerWithKey<Object, Object, List<Integer>>() {
                @Override public void init(final ProcessorContext context) { }

                @Override
                public List<Integer> transform(final Object readOnlyKey, final Object value) {
                    return asList(1, 2, 3);
                }

                @Override public void close() { }
            });

Java Supplier

    <VR> KStream<K, VR> flatTransformValuesB(final Supplier<ValueTransformerWithKey<? super K, ? super V, ? extends Iterable<? extends VR>>> valueTransformerSupplier,
                                             final String... stateStoreNames);

Which we can use as:

        final KStream<Object, Number> streamB1 =
            asdf.flatTransformValuesB(new Supplier<ValueTransformerWithKey<? super Object, ? super Object, ? extends Iterable<? extends Number>>>() {
                @Override
                public ValueTransformerWithKey<? super Object, ? super Object, ? extends Iterable<? extends Number>> get() {
                    return new ValueTransformerWithKey<Object, Object, List<Integer>>() {
                        @Override public void init(final ProcessorContext context) {}

                        @Override
                        public List<Integer> transform(final Object readOnlyKey, final Object value) {
                            return asList(1, 2, 3);
                        }

                        @Override public void close() {}
                    };
                }
            });
        final KStream<Object, Number> streamB2 =
            asdf.flatTransformValuesB(() -> new ValueTransformerWithKey<Object, Object, List<Integer>>() {
                @Override public void init(final ProcessorContext context) {}

                @Override
                public List<Integer> transform(final Object readOnlyKey, final Object value) {
                    return asList(1, 2, 3);
                }

                @Override public void close() {}
            });

Conclusion

They both have downsides.

  • When using lambdas with our supplier, you have to declare the lambda's type with a cast (ValueTransformerWithKeySupplier<Object, Object, Iterable<Integer>>)
  • When using an anonymous function (or declared class) with Java's Supplier, you have to declare the generic parameters very strangely as new Supplier<ValueTransformerWithKey<? super Object, ? super Object, ? extends Iterable<? extends Number>>>

I see the essential dilemma as which implementation style we want to facilitate: lambda vs. subclass.

Worth noting that in all cases, we're talking about how to declare the supplier, not the transformer, which must always be a declared class.

Also worth noting, I'm using List<Integer> to satisfy ? extends Iterable<? extends Number> intentionally to demonstrate that we can use types under the declared bound. The situation doesn't improve at all if we declare all the generics/return types above as Iterable<Number> instead of List<Integer>.

@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented Apr 5, 2019

Thank you @vvcephei for your experiments.

I am im favour of using the following signature:

<VR> KStream<K, VR> flatTransformValues(
    final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, 
    final String... stateStoreNames); 

This is also the one used in this PR. This is the more restrictive one and it is not consistent with the signatures of transformValues and flatMapValues. However, the signature accepts lambda functions and classes without strange declarations on user side which in my opinion is most important. Consolidation of the signatures -- especially of the generics bounds -- seems to me a bigger issue that should be addressed on its own, if needed at all.

@guozhangwang
Copy link
Copy Markdown
Contributor

I think I buy the arguments for allowing lambdas naturally with the cost of inconsistency on signatures (the only inconsistency comes from Iterable<VR> v.s. Iterable<? extends VR> which I think is not significant.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @cadonna, overall looks good and I only have minor comments.

Also, can you rebase this PR? Thanks!

final String... stateStoreNames);

/**
* Transform the value of each input record into zero or more new values (with possible new
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with possible new type -> with possibly a new type

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
* the processing progress can be observed and additional periodic actions can be performed.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: state -> state store

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() { ... }, "myValueTransformState");
* }</pre>
* Within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit state -> state store

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* Furthermore, via {@link org.apache.kafka.streams.processor. Punctuator#punctuate()} the processing progress can
* be observed and additional periodic actions can be performed.
* <p>
* In order to assign a state, the state must be created and registered beforehand:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same as above

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
* }</pre>
* Within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same as above

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Test
public void shouldTransform() throws Exception {
public void shouldTransformValuesWithValueTransformerWithKey() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Exception can be removed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldTransformValuesWithValueTransformerWithoutKey() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldFlatTransformValuesWithKey() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldFlatTransformValuesWithValueTransformerWithoutKey() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

inputValue = 10;
valueTransformer = mock(ValueTransformerWithKey.class);
context = strictMock(ProcessorContext.class);
processor = new KStreamFlatTransformValuesProcessor<Integer, Integer, String>(valueTransformer);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use new KStreamFlatTransformValuesProcessor<>

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 9, 2019

FWIW I also agree with using the signature supporting lamdas directly as IMHO the inconsistencies we have observed don't outweigh keeping things more straight forward for users.

cadonna added 2 commits April 9, 2019 18:27
- corrected javadocs in KStream
- minor refactorings in tests
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. I left a couple of small comments.
Thanks, @cadonna !

* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
* record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
* This is a stateful record-by-record operation (see {@link #mapValues(ValueMapper) mapValues()}).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throughout the javadocs in KStreamImpl, cf. is used. Do you want me to change all of them in a new PR? Are there other opinions on this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we have a documentation style guide, but it's generally more accessible if we don't use Latin in our docs. No need to go making sweeping changes, but there's also no need to perpetuate an anti-pattern.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we have a documentation style guide

I don't think we have one.

but it's generally more accessible if we don't use Latin in our docs.

This is surprising to me. I thought cf. is a standard abbreviation that everybody knows...

I would still use cf. in this PR for consistency reasons. If you prefer see, just create a ticket or open a new PR. We can also update this on a per-class basis, but I would prefer to keep it consistent within a class.

transformNode.setValueChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, transformNode);

// cannot inherit value serde
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a way to set it, then?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the serde to null seems to be a common pattern in KStreamImpl if the operation on the stream cannot guarantee that the current serde still fits for the newly generated stream. Have a look at the other non-terminal operations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my comment was too terse. What I meant to ask was, if this is a value-changing operation, should there be an overload that allows users to add a value serde, so that downstream operations can have the serde available?

Off the top of my head, I don't think there are any other value-changing operations that don't have an option to set a serde. (but I could be wrong...)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaand, nevermind. I was thinking of KTable. Carry on.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 12, 2019

Java 8 passed, Java 11 failed with known flaky test kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

retest this please

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 14, 2019

Same as above. Retest this please.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only nits. Overall LGTM.

* A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
* record value and computes zero or more new values.
* Thus, an input record {@code <K,V>} can be transformed into output records {@code <K:V'>, <K:V''>, ...}.
* This is a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we have a documentation style guide

I don't think we have one.

but it's generally more accessible if we don't use Latin in our docs.

This is surprising to me. I thought cf. is a standard abbreviation that everybody knows...

I would still use cf. in this PR for consistency reasons. If you prefer see, just create a ticket or open a new PR. We can also update this on a per-class basis, but I would prefer to keep it consistent within a class.

* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* NewValueType transform(V value) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewValueType -> List<NewValueType> ? (Similar in return below.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

* Within the {@link ValueTransformer}, the state store is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link ValueTransformer} must return a list of values in {@link ValueTransformer#transform(Object)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must return a list of values -- could this be miss leading, that people think the need to return a List? (Similar next sentence below.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rephrased it.

* Within the {@link ValueTransformerWithKey}, the state store is obtained via the {@link ProcessorContext}.
* To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
* a schedule must be registered.
* The {@link ValueTransformer} must return a list of values in {@link ValueTransformer#transform(Object)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrased it as well.

* a schedule must be registered.
* The {@link ValueTransformer} must return a list of values in {@link ValueTransformer#transform(Object)
* transform()} or {@code null}.
* If the return value is an empty list or {@null} no records are emitted.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
* }
*
* NewValueType transform(K readOnlyKey, V value) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*
* NewValueType transform(K readOnlyKey, V value) {
* // can access this.state and use read-only key
* return new NewValueType(readOnlyKey); // or null
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

builder.addGraphNode(this.streamsGraphNode, transformNode);

// cannot inherit value serde
return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't use this is not required.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldInitialiseFlatTransformValuesProcessor() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Initiali[z]e ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

cadonna added 2 commits April 14, 2019 13:13
- corrected mistake in example in javadocs
- improved javadocs for flatTransformValues and transformValues
- corrected typo in test method name
@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented Apr 15, 2019

1 similar comment
@cadonna
Copy link
Copy Markdown
Member Author

cadonna commented Apr 15, 2019

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 16, 2019

Java 8 failed with a known flaky test kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup Java 11 passed

retest this please

@bbejeck bbejeck merged commit 05668e9 into apache:trunk Apr 16, 2019
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 16, 2019

Merged #6424 into trunk

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Adds flatTrasformValues methods in KStream
Adds processor supplier and processor for flatTransformValues
Improves API documentation of transformValues

Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
@cadonna cadonna deleted the kafka7875-flatTransformValues branch October 21, 2019 10:53
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants