Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ <h3><a id="streams_api_changes_210" href="#streams_api_changes_210">Streams API
see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a>
</p>

<p>
We deprecated the notion of segments in window stores as those are intended to be an implementation details.
Thus, method <code>Windows#segments()</code> and variable <code>Windows#segments</code> were deprecated.
If you implement custom windows, you should update your code accordingly.
Similarly, <code>WindowBytesStoreSupplier#segments()</code> was deprecated and replaced with <code>WindowBytesStoreSupplier#segmentInterval()</code>.
If you implement custom window store, you need to update your code accordingly.
Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer.
For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a>
(note: <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a> and
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a> 'overlap' with KIP-319).
</p>

<h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
<p>
In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <a href="#streams_api_changes_200">Streams API changes</a> below).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,6 @@ public long maintainMs() {
return maintainDurationMs;
}

/**
* Return the segment interval in milliseconds.
*
* @return the segment interval
* @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
*/
@Deprecated
public long segmentInterval() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in KIP-319, deprecated in KIP-328

// Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
final long minimumSegmentInterval = 60_000L;
// Scaled to the (possibly overridden) retention period
return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
}


/**
* Set the number of segments to be used for rolling the window store.
* This function is not exposed to users but can be called by developers that extend this class.
Expand Down
57 changes: 13 additions & 44 deletions streams/src/main/java/org/apache/kafka/streams/state/Stores.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state;

import java.time.Duration;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -32,6 +31,7 @@
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;

import java.time.Duration;
import java.util.Objects;

/**
Expand Down Expand Up @@ -155,7 +155,7 @@ public String metricsScope() {
* careful to set it the same as the windowed keys you're actually storing.
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@Deprecated
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
Expand All @@ -178,28 +178,6 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@Deprecated
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in KIP-319, deprecated in KIP-358

final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates) {
// we're arbitrarily defaulting to segments no smaller than one minute.
final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
Expand All @@ -220,28 +198,16 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
ApiUtils.validateMillisecondDuration(windowSize, "windowSize");

return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates);
final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L);

return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval);
}

/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param segmentInterval size of segments in ms (cannot be negative)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
@Deprecated
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in KIP-319 -- should have bee deprecated with KIP-328.

final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final long segmentInterval) {
private static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final long segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
Expand Down Expand Up @@ -269,7 +235,9 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @return an instance of a {@link SessionBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
*/
@Deprecated
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of KIP-358 but slipped in the PR

public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final long retentionPeriod) {
Objects.requireNonNull(name, "name cannot be null");
Expand All @@ -288,6 +256,7 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name
* and for the entire grace period.
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
@SuppressWarnings("deprecation")
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
final Duration retentionPeriod) {
ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod");
Expand Down