diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b26f3c339cfe6..c0f28efe98bdf 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -134,6 +134,18 @@

Streams API see KIP-324

+

+ We deprecated the notion of segments in window stores as those are intended to be an implementation details. + Thus, method Windows#segments() and variable Windows#segments were deprecated. + If you implement custom windows, you should update your code accordingly. + Similarly, WindowBytesStoreSupplier#segments() was deprecated and replaced with WindowBytesStoreSupplier#segmentInterval(). + If you implement custom window store, you need to update your code accordingly. + Finally, Stores#persistentWindowStore(...) were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. + For more details, see KIP-319 + (note: KIP-328 and + KIP-358 'overlap' with KIP-319). +

+

Streams API changes in 2.0.0

In 2.0.0 we have added a few new APIs on the ReadOnlyWindowStore interface (for details please read Streams API changes below). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 4dfba2306cb9e..feaee1e13362f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -83,21 +83,6 @@ public long maintainMs() { return maintainDurationMs; } - /** - * Return the segment interval in milliseconds. - * - * @return the segment interval - * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}. - */ - @Deprecated - public long segmentInterval() { - // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient. - final long minimumSegmentInterval = 60_000L; - // Scaled to the (possibly overridden) retention period - return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval); - } - - /** * Set the number of segments to be used for rolling the window store. * This function is not exposed to users but can be called by developers that extend this class. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 30e51403714c5..f7a182472be10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state; -import java.time.Duration; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -32,6 +31,7 @@ import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import java.time.Duration; import java.util.Objects; /** @@ -155,7 +155,7 @@ public String metricsScope() { * careful to set it the same as the windowed keys you're actually storing. * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead + * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */ @Deprecated public static WindowBytesStoreSupplier persistentWindowStore(final String name, @@ -178,28 +178,6 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, ); } - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. - * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead - */ - @Deprecated - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates) { - // we're arbitrarily defaulting to segments no smaller than one minute. - final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); - return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval); - } - /** * Create a persistent {@link WindowBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) @@ -220,28 +198,16 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); ApiUtils.validateMillisecondDuration(windowSize, "windowSize"); - return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates); + final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L); + + return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval); } - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param segmentInterval size of segments in ms (cannot be negative) - * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. - * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead - */ - @Deprecated - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates, - final long segmentInterval) { + private static WindowBytesStoreSupplier persistentWindowStore(final String name, + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates, + final long segmentInterval) { Objects.requireNonNull(name, "name cannot be null"); if (retentionPeriod < 0L) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); @@ -269,7 +235,9 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} + * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead */ + @Deprecated public static SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriod) { Objects.requireNonNull(name, "name cannot be null"); @@ -288,6 +256,7 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} */ + @SuppressWarnings("deprecation") public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) { ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");