Skip to content

KAFKA-10003: Mark KStream.through() as deprecated#8679

Merged
mjsax merged 12 commits intoapache:trunkfrom
mjsax:kafka-10003-deprecate-through
May 22, 2020
Merged

KAFKA-10003: Mark KStream.through() as deprecated#8679
mjsax merged 12 commits intoapache:trunkfrom
mjsax:kafka-10003-deprecate-through

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented May 17, 2020

  • part of KIP-221

@mjsax mjsax added the streams label May 17, 2020
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented May 17, 2020

Call for review @lkokhreidze @vvcephei

Also updates the Scala API...

</ol>
</td>
</tr>
<tr class="row-odd"><td><p class="first"><strong>Through</strong></p>
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is weird because the part above repeats below. The actual deletes starts here.

* If a {@link StreamPartitioner custom partitioner} has been
* {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
* {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
* {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this update is necessary. This method is deprecated itself.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well make this update, since we may remove the methods at different times.

* from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}.
* The number of partitions is determined based on the upstream topics partition numbers.
* <p>
* This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.
Copy link
Copy Markdown
Member Author

@mjsax mjsax May 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure if we should remove this now, or when we remove through()?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree with removing it. I guess if you want to preserve it in some fashion, you could add the opposite statement to the through() documentation.

* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}) an internal repartitioning topic will be created in Kafka.
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
* {@link #transform(TransformerSupplier, String...)}) an internal repartitioning topic will be created in Kafka.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just simplifying this one.

import org.apache.kafka.streams.processor.internals.InternalTopicProperties;

class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
public class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be public to be visible in Scala

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting that it only needs to be visible for the scala tests that verify the scala Repartitioned builder results in a correctly configured object. For the public API, we only convert a scala Repartitioned to a java Repartitioned.



@Test
public void shouldProcessViaRepartitionTopic() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replicated the test for through() for repartition().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

stream = input.repartition();
} else {
input.to(INTERMEDIATE_USER_TOPIC);
stream = builder.stream(INTERMEDIATE_USER_TOPIC);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to test this, because topics using this pattern are still consider intermediate topics and the --intermediat-topic flag in StreamsResetter is still useful and not changed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should continue testing with through, to ensure it continues to work. WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, through() is literally implemented as to() + stream()... But I can revert and add a suppress annotation, too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On re-reading, I realize I misunderstood the situation. I revert my comment 😬 .

if (throughTopic != null) {
output = input.through(throughTopic);
input.to(throughTopic);
output = builder.stream(throughTopic);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using to() and steam() is "simpler" as we cleanup topics in-between (and thus avoid internal topics).

We could of course also use repartition().

private static final String TEST_ID = "reset-with-ssl-integration-test";

private static Map<String, Object> sslConfig;
private static final Map<String, Object> SSL_CONFIG;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

}

@Test
public void shouldNotAllowNullRepartitionedOnRepartition() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicating test

assertEquals(((AbstractStream) stream1.repartition()).keySerde(), consumedInternal.keySerde());
assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), consumedInternal.valueSerde());
assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).keySerde(), mySerde);
assertEquals(((AbstractStream) stream1.repartition(Repartitioned.with(mySerde, mySerde))).valueSerde(), mySerde);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicating test cases

}

@Test
public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicating test

}

@Test
public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicating test

if (withRepartitioning) {
final KStream<String, Integer> repartitionedData = data.through("repartition");
data.to("repartition");
final KStream<String, Integer> repartitionedData = builder.stream("repartition");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. Avoid internal topics.

*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
* val clicksPerRegion: KStream[String, Long] = //..
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no KTable#through() method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops...

}

"Create a Produced with timestampExtractor and resetPolicy" should "create a Consumed with Serdes, timestampExtractor and resetPolicy" in {
"Create a Produced with streamPartitioner" should "create a Produced with Serdes and streamPartitioner" in {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup (was originally copied from ConsumedTest but not updated correctly)

* @return A new [[Repartitioned]] instance configured with keySerde and valueSerde
* @see KStream#repartition(Repartitioned)
*/
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just named all method with in alignment to the other Scala helper classes.

Also noticed, that all helper classed only have static methods... Is not by design? Seems we are missing something here? If there is more than one optional parameter, it seems we should have non-static method to allow method chaining? (Could be fixed in a follow up PR)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd have to change from object to class or case class (which would have been my preference to begin with), since objects can only have static members.

Probably, this ship has sailed for now, and we should just keep doing what the other similar classes are doing. Since we've found so much wackiness in the Scala API since it was introduced, it might be a good idea to consider revamping the whole thing from scratch some day.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found another set of zombie comments I meant to send some time in the past. I'll continue my review.

* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @see #repartition()
* @see #repartition(Repartitioned)
* @deprecated used {@link #repartition()} instead
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little nice for future reference when we also say when it became deprecated, such as "since 2.6".

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why? If I use 2.6 why do I can if it was deprecated in 2.4 or 2.2 or 2.6? It's deprecated in the version I use now. Why would I care about older versions?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one thing, it's nice for us, so we can easily tell when it's been deprecated "long enough" to remove. I can recall trudging through git history in the past to figure this out.

For users, maybe you don't care, but I personally find it nice when my libraries do this for me. It's just good bookkeeping, and it gives me some confidence that the maintainers are doing proper, tidy maintenance.

If it provides a "third party" supporting opinion, the Scala language designers thought this was important enough to build it in as a separate field of the "deprecated" annotation: https://docs.scala-lang.org/tour/annotations.html

* from the auto-generated topic using default serializers, deserializers, and producer's {@link DefaultPartitioner}.
* The number of partitions is determined based on the upstream topics partition numbers.
* <p>
* This operation is similar to {@link #through(String)}, however, Kafka Streams manages the used topic automatically.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree with removing it. I guess if you want to preserve it in some fashion, you could add the opposite statement to the through() documentation.

* @return A new [[Repartitioned]] instance configured with keySerde and valueSerde
* @see KStream#repartition(Repartitioned)
*/
def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd have to change from object to class or case class (which would have been my preference to begin with), since objects can only have static members.

Probably, this ship has sailed for now, and we should just keep doing what the other similar classes are doing. Since we've found so much wackiness in the Scala API since it was introduced, it might be a good idea to consider revamping the whole thing from scratch some day.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @mjsax ! Completed my full pass.

Comment thread docs/streams/developer-guide/dsl-api.html Outdated
Comment thread docs/streams/upgrade-guide.html Outdated
Comment thread docs/streams/upgrade-guide.html Outdated
* If a {@link StreamPartitioner custom partitioner} has been
* {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link StreamsConfig} or
* {@link KStream#through(String, Produced)}, or if the original {@link KTable}'s input
* {@link KStream#repartition(Repartitioned)}, or if the original {@link KTable}'s input
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well make this update, since we may remove the methods at different times.

import org.apache.kafka.streams.processor.internals.InternalTopicProperties;

class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
public class RepartitionedInternal<K, V> extends Repartitioned<K, V> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth noting that it only needs to be visible for the scala tests that verify the scala Repartitioned builder results in a correctly configured object. For the public API, we only convert a scala Repartitioned to a java Repartitioned.



@Test
public void shouldProcessViaRepartitionTopic() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

stream = input.repartition();
} else {
input.to(INTERMEDIATE_USER_TOPIC);
stream = builder.stream(INTERMEDIATE_USER_TOPIC);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should continue testing with through, to ensure it continues to work. WDYT?


}

@SuppressWarnings("deprecation") // specifically testing the deprecated variant
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a case where I would advocate more strongly to deprecate this method, to avoid accidentally "hiding" the deprecation from callers.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, but then we need to add more suppression or deprecation upstream. Does not seem worth for testing code

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the point!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

*
* //..
* val clicksPerRegion: KTable[String, Long] = //..
* val clicksPerRegion: KStream[String, Long] = //..
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops...

mjsax and others added 4 commits May 19, 2020 13:28
Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
…scala/kstream/KStream.scala

Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, @mjsax ! I replied above to the threads, and had just two more new comments.

* @return a [[KStream]] that contains the exact same repartitioned records as this [[KStream]]
* @see `org.apache.kafka.streams.kstream.KStream#repartition`
*/
def repartition(implicit repartitioned: Repartitioned[K, V]): KStream[K, V] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that we have no test for this operator (or for through). Should we add one?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what we can/should test?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have previously had embarrassing bugs like, "It's not possible to write code that compiles using the Scala DSL". If we had had even the most trivial test written for those DSL methods, we would never have released those bugs. So, I'd just recommend creating any topology that contains this operator and maybe using TTD to pipe a single record through it to ensure that it doesn't throw any runtime exceptions when you use it.

mjsax and others added 2 commits May 19, 2020 17:16
…scala/kstream/KStream.scala

Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

@lkokhreidze lkokhreidze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up @mjsax , lgtm.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this thorough (and thoroughly awesome) PR, @mjsax !

I responded to the question about the scala API test, above; otherwise I'm +1.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented May 21, 2020

Added the test. Will merge after Jenkins passed.

* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @see #repartition()
* @see #repartition(Repartitioned)
* @deprecated since 2.6; use #repartition(Repartitioned) instead
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use {@link #repartition(Repartitioned)} ?

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much, @mjsax !

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented May 21, 2020

Java 8 passed.
Java 11:

org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]

Java 14:

org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented May 22, 2020

Java 8 and Java 11 passed.
Java 14:

org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]

@mjsax mjsax merged commit 27824ba into apache:trunk May 22, 2020
@mjsax mjsax deleted the kafka-10003-deprecate-through branch May 22, 2020 15:41
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 24, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-9888: Copy connector configs before passing to REST extensions (apache#8511)
  KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations (apache#8654)
  KAFKA-6145: Add unit tests for assignments of only stateless tasks (apache#8713)
  MINOR: Fix join group request timeout lower bound (apache#8702)
  MINOR: Improve security documentation for Kafka Streams apache#8710
  KAFKA-6145: KIP-441: Enforce Standby Task Stickiness (apache#8696)
  KAFKA-10003: Mark KStream.through() as deprecated and update Scala API (apache#8679)
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants