From 174aedc935c58bbc2b5637b669c45b38ea12579e Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 15 Oct 2018 17:10:47 -0700 Subject: [PATCH 1/3] KAFKA-7080 and KAFKA-7277: Cleanup overlapping KIP changes also: add upgrade docs for KIP-319 --- docs/streams/upgrade-guide.html | 12 ++++ .../apache/kafka/streams/kstream/Windows.java | 15 ----- .../apache/kafka/streams/state/Stores.java | 57 +++++-------------- 3 files changed, 25 insertions(+), 59 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b26f3c339cfe6..f851a2c46f6e0 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -134,6 +134,18 @@

Streams API see KIP-324

+

+ We deprecated the notion of segments in window stores as those are intended to be an implementation details. + Thus, method Windows#segments() and variable Windows#segments were deprecated. + If you implement custom windows, you should updated your code accordingly. + Similarly, WindowBytesStoreSupplier#segments() was deprecated and replaced with WindowBytesStoreSupplier#segmentInterval(). + If you implement custom window store, you need to updated your code accordingly. + Finally, Stores#persistentWindowStore(...) were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. + For more details, see KIP-319 + (note: KIP-358 'overlaps' with KIP-319 + as it changes Stores#persistentWindowStore(...), too). +

+

Streams API changes in 2.0.0

In 2.0.0 we have added a few new APIs on the ReadOnlyWindowStore interface (for details please read Streams API changes below). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 4dfba2306cb9e..feaee1e13362f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -83,21 +83,6 @@ public long maintainMs() { return maintainDurationMs; } - /** - * Return the segment interval in milliseconds. - * - * @return the segment interval - * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}. - */ - @Deprecated - public long segmentInterval() { - // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient. - final long minimumSegmentInterval = 60_000L; - // Scaled to the (possibly overridden) retention period - return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval); - } - - /** * Set the number of segments to be used for rolling the window store. * This function is not exposed to users but can be called by developers that extend this class. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 30e51403714c5..f7a182472be10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state; -import java.time.Duration; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -32,6 +31,7 @@ import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import java.time.Duration; import java.util.Objects; /** @@ -155,7 +155,7 @@ public String metricsScope() { * careful to set it the same as the windowed keys you're actually storing. * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead + * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */ @Deprecated public static WindowBytesStoreSupplier persistentWindowStore(final String name, @@ -178,28 +178,6 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, ); } - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. - * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead - */ - @Deprecated - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates) { - // we're arbitrarily defaulting to segments no smaller than one minute. - final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); - return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval); - } - /** * Create a persistent {@link WindowBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) @@ -220,28 +198,16 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); ApiUtils.validateMillisecondDuration(windowSize, "windowSize"); - return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates); + final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L); + + return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval); } - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param segmentInterval size of segments in ms (cannot be negative) - * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. - * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead - */ - @Deprecated - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates, - final long segmentInterval) { + private static WindowBytesStoreSupplier persistentWindowStore(final String name, + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates, + final long segmentInterval) { Objects.requireNonNull(name, "name cannot be null"); if (retentionPeriod < 0L) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); @@ -269,7 +235,9 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} + * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead */ + @Deprecated public static SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriod) { Objects.requireNonNull(name, "name cannot be null"); @@ -288,6 +256,7 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} */ + @SuppressWarnings("deprecation") public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) { ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); From 60c49bef357fece3a6303d80d70ab0be308ddc8b Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 15 Oct 2018 18:02:21 -0700 Subject: [PATCH 2/3] Minor update --- docs/streams/upgrade-guide.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index f851a2c46f6e0..ac4035a169a47 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -142,8 +142,8 @@

Streams API If you implement custom window store, you need to updated your code accordingly. Finally, Stores#persistentWindowStore(...) were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. For more details, see KIP-319 - (note: KIP-358 'overlaps' with KIP-319 - as it changes Stores#persistentWindowStore(...), too). + (note: KIP-328 and + KIP-358 'overlap' with KIP-319).

Streams API changes in 2.0.0

From edd45eecbc620ec8b62cf109a25b62a63c29394c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 15 Oct 2018 18:21:46 -0700 Subject: [PATCH 3/3] fixed typos --- docs/streams/upgrade-guide.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index ac4035a169a47..c0f28efe98bdf 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -137,9 +137,9 @@

Streams API

We deprecated the notion of segments in window stores as those are intended to be an implementation details. Thus, method Windows#segments() and variable Windows#segments were deprecated. - If you implement custom windows, you should updated your code accordingly. + If you implement custom windows, you should update your code accordingly. Similarly, WindowBytesStoreSupplier#segments() was deprecated and replaced with WindowBytesStoreSupplier#segmentInterval(). - If you implement custom window store, you need to updated your code accordingly. + If you implement custom window store, you need to update your code accordingly. Finally, Stores#persistentWindowStore(...) were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. For more details, see KIP-319 (note: KIP-328 and