diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 6dd1a85f5340a..f20e39f6a4d29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,20 +17,23 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.common.annotation.InterfaceStability; + import java.util.Map; /** * The window specifications used for joins. *
- * A {@link JoinWindows} instance defines a join over two stream on the same key and a maximum time difference. + * A {@link JoinWindows} instance defines a maximum time difference for a {@link KStream#join(KStream, ValueJoiner, + * JoinWindows) join over two streams} on the same key. * In SQL-style you would express this join as - *
+ * {@code
* SELECT * FROM stream1, stream2
* WHERE
* stream1.key = stream2.key
* AND
* stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
- *
+ * }
* There are three different window configuration supported:
* - * Both values (before and after) must not result in an "inverse" window, - * i.e., lower-interval-bound must not be larger than upper-interval.bound. + * Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller + * than lower-interval bound. + *
+ * {@link JoinWindows} are sliding windows, thus, they are aligned to the actual record timestamps. + * This implies, that each input record defines its own window with start and end time being relative to the record's + * timestamp. + *
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see UnlimitedWindows
+ * @see SessionWindows
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
*/
+@InterfaceStability.Unstable
public class JoinWindows extends Windows
+ * For {@link TimeWindows} the maintain duration is at least as small as the window size.
+ *
+ * @return the window maintain duration
+ */
@Override
public long maintainMs() {
return Math.max(super.maintainMs(), size());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index bed6c3dd97125..8918f3e68bb8b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -16,18 +16,16 @@
*/
package org.apache.kafka.streams.kstream;
-
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* A session based window specification used for aggregating events into sessions.
*
* Sessions represent a period of activity separated by a defined gap of inactivity.
- * Any events processed that fall within the inactivity gap of any existing sessions
- * are merged into the existing sessions. If the event falls outside of the session gap
- * then a new session will be created.
+ * Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
+ * If the event falls outside of the session gap then a new session will be created.
*
- * For example, If we have a session gap of 5 and the following data arrives:
+ * For example, if we have a session gap of 5 and the following data arrives:
*
- * We'd have 2 sessions for key A. 1 starting from time 10 and ending at time 12 and another
- * starting and ending at time 20. The length of the session is driven by the timestamps of
- * the data within the session
+ * We'd have 2 sessions for key A.
+ * One starting from time 10 and ending at time 12 and another starting and ending at time 20.
+ * The length of the session is driven by the timestamps of the data within the session.
+ * Thus, session windows are no fixed-size windows (c.f. {@link TimeWindows} and {@link JoinWindows}).
*
* If we then received another record:
- *
*
* The previous 2 sessions would be merged into a single session with start time 10 and end time 20.
* The aggregate value for this session would be the result of aggregating all 4 values.
+ *
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#count(SessionWindows, String)
+ * @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, String)
+ * @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
*/
@InterfaceStability.Unstable
public class SessionWindows {
@@ -69,27 +78,30 @@ private SessionWindows(final long gapMs) {
}
/**
- * Create a new SessionWindows with the specified inactivity gap
- * @param inactivityGapMs the gap of inactivity between sessions
- * @return a new SessionWindows with the provided inactivity gap
- * and default maintain duration
+ * Create a new window specification with the specified inactivity gap in milliseconds.
+ *
+ * @param inactivityGapMs the gap of inactivity between sessions in milliseconds
+ * @return a new window specification with default maintain duration of 1 day
+ *
+ * @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative
*/
public static SessionWindows with(final long inactivityGapMs) {
- if (inactivityGapMs < 1) {
+ if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(inactivityGapMs);
}
/**
- * Set the window maintain duration in milliseconds of streams time.
+ * Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed lower bound for how long a window will be maintained.
*
- * @return itself
+ * @return itself
+ * @throws IllegalArgumentException if {@code durationMs} is smaller than window gap
*/
public SessionWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < gapMs) {
- throw new IllegalArgumentException("Window retentin time (durationMs) cannot be smaller than window gap.");
+ throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
}
maintainDurationMs = durationMs;
@@ -97,14 +109,20 @@ public SessionWindows until(final long durationMs) throws IllegalArgumentExcepti
}
/**
- * @return the inactivityGap
+ * Return the specified gap for the session windows in milliseconds.
+ *
+ * @return the inactivity gap of the specified windows
*/
public long inactivityGap() {
return gapMs;
}
/**
- * @return the minimum amount of time a window will be maintained for.
+ * Return the window maintain duration (retention time) in milliseconds.
+ *
+ * For {@link SessionWindows} the maintain duration is at least as small as the window gap.
+ *
+ * @return the window maintain duration
*/
public long maintainMs() {
return Math.max(maintainDurationMs, gapMs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index 11df22823c6aa..87e7f2efc30d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,79 +17,94 @@
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import java.util.HashMap;
import java.util.Map;
/**
- * The time-based window specifications used for aggregations.
+ * The fixed-size time-based window specifications used for aggregations.
*
- * The semantics of a time-based window are: Every T1 (advance) time-units, compute the aggregate total for T2 (size) time-units.
+ * The semantics of time-based aggregation windows are: Every T1 (advance) milliseconds, compute the aggregate total for
+ * T2 (size) milliseconds.
*
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see SessionWindows
+ * @see UnlimitedWindows
+ * @see JoinWindows
+ * @see KGroupedStream#count(Windows, String)
+ * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, Windows, String)
+ * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
*/
+@InterfaceStability.Unstable
public class TimeWindows extends Windows
+ * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, non-overlapping windows.
+ * Tumbling windows are a special case of hopping windows with {@code advance == size}.
*
- * This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
- * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
- *
- * @param size The size of the window, with the requirement that size > 0.
- * The window size's effective time unit is determined by the semantics of the
- * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
- * @return a new window definition
+ * @param sizeMs The size of the window in milliseconds
+ * @return a new window definition with default maintain duration of 1 day
+ * @throws IllegalArgumentException if the specified window size is zero or negative
*/
public static TimeWindows of(final long sizeMs) throws IllegalArgumentException {
if (sizeMs <= 0) {
- throw new IllegalArgumentException("Window sizeMs must be larger than zero.");
+ throw new IllegalArgumentException("Window size (sizeMs) must be larger than zero.");
}
return new TimeWindows(sizeMs, sizeMs);
}
/**
- * Returns a window definition with the original size, but advance ("hop") the window by the given
- * interval, which specifies by how much a window moves forward relative to the previous one.
- * Think: [N * advanceInterval, N * advanceInterval + size), with N denoting the N-th window.
- *
+ * Return a window definition with the original size, but advance ("hop") the window by the given interval, which
+ * specifies by how much a window moves forward relative to the previous one.
+ * The time interval represented by the the N-th window is: {@code [N * advance, N * advance + size)}.
+ *
* This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
*
- * @param interval The advance interval ("hop") of the window, with the requirement that
- * 0 < interval ≤ size. The interval's effective time unit is
- * determined by the semantics of the topology's configured
- * {@link org.apache.kafka.streams.processor.TimestampExtractor}.
- * @return a new window definition
+ * @param advanceMs The advance interval ("hop") in milliseconds of the window, with the requirement that
+ * {@code 0 < advanceMs ≤ sizeMs}.
+ * @return a new window definition with default maintain duration of 1 day
+ * @throws IllegalArgumentException if the advance interval is negative, zero, or larger-or-equal the window size
*/
public TimeWindows advanceBy(final long advanceMs) {
if (advanceMs <= 0 || advanceMs > sizeMs) {
- throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d]", sizeMs));
+ throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d].", sizeMs));
}
return new TimeWindows(sizeMs, advanceMs);
}
@@ -111,6 +126,13 @@ public long size() {
return sizeMs;
}
+ /**
+ * {@inheritDoc}
+ *
+ * @param durationMs the window retention time
+ * @return itself
+ * @throws IllegalArgumentException if {@code duration} is smaller than the window size
+ */
public TimeWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < sizeMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
@@ -119,6 +141,13 @@ public TimeWindows until(final long durationMs) throws IllegalArgumentException
return this;
}
+ /**
+ * {@inheritDoc}
+ *
+ * For {@link TimeWindows} the maintain duration is at least as small as the window size.
+ *
+ * @return the window maintain duration
+ */
@Override
public long maintainMs() {
return Math.max(super.maintainMs(), sizeMs);
@@ -143,4 +172,4 @@ public int hashCode() {
return result;
}
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 8605f9d6bf26b..9d5669ceae85d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,14 +17,33 @@
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import java.util.HashMap;
import java.util.Map;
/**
- * The unlimited window specifications.
+ * The unlimited window specifications used for aggregations.
+ *
+ * An unlimited time window is also called landmark window.
+ * It has a fixed starting point while its window end is defined as infinite.
+ * With this regard, it is a fixed-size window with infinite window size.
+ *
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindows
+ * @see SessionWindows
+ * @see JoinWindows
+ * @see KGroupedStream#count(Windows, String)
+ * @see KGroupedStream#count(Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#reduce(Reducer, Windows, String)
+ * @see KGroupedStream#reduce(Reducer, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)
+ * @see KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.streams.processor.StateStoreSupplier)
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
*/
+@InterfaceStability.Unstable
public class UnlimitedWindows extends Windows
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
/**
* A single window instance, defined by its start and end timestamp.
+ * {@link Window} is agnostic if start/end boundaries are inclusive or exclusive; this is defined by concrete
+ * window implementations.
+ *
+ * To specify how {@link Window} boundaries are defined use {@link Windows}.
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see Windows
+ * @see org.apache.kafka.streams.kstream.internals.TimeWindow
+ * @see org.apache.kafka.streams.kstream.internals.SessionWindow
+ * @see org.apache.kafka.streams.kstream.internals.UnlimitedWindow
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
*/
+@InterfaceStability.Unstable
public abstract class Window {
protected final long startMs;
protected final long endMs;
/**
- * Create a new window for the given start time (inclusive) and end time (exclusive).
+ * Create a new window for the given start and end time.
*
- * @param start the start timestamp of the window (inclusive)
- * @param end the end timestamp of the window (exclusive)
- * @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than
- * {@code start}
+ * @param startMs the start timestamp of the window
+ * @param endMs the end timestamp of the window
+ * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
*/
public Window(long startMs, long endMs) throws IllegalArgumentException {
if (startMs < 0) {
@@ -45,14 +57,14 @@ public Window(long startMs, long endMs) throws IllegalArgumentException {
}
/**
- * Return the start timestamp of this window, inclusive
+ * Return the start timestamp of this window.
*/
public long start() {
return startMs;
}
/**
- * Return the end timestamp of this window, exclusive
+ * Return the end timestamp of this window.
*/
public long end() {
return endMs;
@@ -60,9 +72,11 @@ public long end() {
/**
* Check if the given window overlaps with this window.
+ * Should throw an {@link IllegalArgumentException} if the {@code other} window has a different type than {@code
+ * this} window.
*
- * @param other another window
- * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
+ * @param other another window of the same type
+ * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
*/
public abstract boolean overlap(final Window other);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
index 81357c1daf50d..2941136dc9856 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windowed.java
@@ -17,13 +17,31 @@
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
/**
- * Used to represent windowed stream aggregations (e.g. as returned by
- * {@link KGroupedStream#aggregate(Initializer, Aggregator, Windows, org.apache.kafka.common.serialization.Serde, String)}),
- * which have the type {@code
+ * If a {@link KStream} gets grouped and aggregated using a window-aggregation the resulting {@link KTable} is a
+ * so-called "windowed {@link KTable}" with a combined key type that encodes the corresponding aggregation window and
+ * the original record key.
+ * Thus, a windowed {@link KTable} has type {@link Windowed <Windowed<K>,V>}
*
- * @param
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,13 +16,25 @@
*/
package org.apache.kafka.streams.kstream;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
import java.util.Map;
/**
- * The window specification interface that can be extended for windowing operation in joins and aggregations.
+ * The window specification interface for fixed size windows that is used to define window boundaries and window
+ * maintain duration.
+ *
+ * If not explicitly specified, the default maintain duration is 1 day.
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
*
- * @param
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
*
* @see TimeWindow
* @see UnlimitedWindow
* @see org.apache.kafka.streams.kstream.SessionWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
*/
+@InterfaceStability.Unstable
public final class SessionWindow extends Window {
/**
* Create a new window for the given start time and end time (both inclusive).
*
- * @param start the start timestamp of the window
- * @param end the end timestamp of the window
+ * @param startMs the start timestamp of the window
+ * @param endMs the end timestamp of the window
+ * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than {@code startMs}
*/
- public SessionWindow(final long startMs, final long endMs) {
+ public SessionWindow(final long startMs, final long endMs) throws IllegalArgumentException {
super(startMs, endMs);
}
/**
* Check if the given window overlaps with this window.
*
- * @param other another window
- * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
+ * @param other another window
+ * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
* @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
*/
public boolean overlap(final Window other) throws IllegalArgumentException {
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
- + other.getClass());
+ + other.getClass() + ".");
}
final SessionWindow otherWindow = (SessionWindow) other;
return !(otherWindow.endMs < startMs || endMs < otherWindow.startMs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
index bf98f941dbac9..ab805acaa2200 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -14,28 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.Window;
+/**
+ * A {@link TimeWindow} covers a half-open time interval with its start timestamp as an inclusive boundary and its end
+ * timestamp as exclusive boundary.
+ * It is a fixed size window, i.e., all instances (of a single {@link org.apache.kafka.streams.kstream.TimeWindows
+ * window specification}) will have the same size.
+ *
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see SessionWindow
+ * @see UnlimitedWindow
+ * @see org.apache.kafka.streams.kstream.TimeWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
+ */
+@InterfaceStability.Unstable
public class TimeWindow extends Window {
- public TimeWindow(long startMs, long endMs) {
+ /**
+ * Create a new window for the given start time (inclusive) and end time (exclusive).
+ *
+ * @param startMs the start timestamp of the window (inclusive)
+ * @param endMs the end timestamp of the window (exclusive)
+ * @throws IllegalArgumentException if {@code startMs} is negative or if {@code endMs} is smaller than or equal to
+ * {@code startMs}
+ */
+ public TimeWindow(final long startMs, final long endMs) throws IllegalArgumentException {
super(startMs, endMs);
if (startMs == endMs) {
throw new IllegalArgumentException("Window endMs must be greater than window startMs.");
}
}
+ /**
+ * Check if the given window overlaps with this window.
+ *
+ * @param other another window
+ * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise
+ * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+ */
@Override
public boolean overlap(final Window other) throws IllegalArgumentException {
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
- + other.getClass());
+ + other.getClass() + ".");
}
final TimeWindow otherWindow = (TimeWindow) other;
return startMs < otherWindow.endMs && otherWindow.startMs < endMs;
}
-}
\ No newline at end of file
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index 7fb7c53000a44..311169e58e0be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -14,24 +14,52 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.streams.kstream.Window;
+/**
+ * {@link UnlimitedWindow} is an "infinite" large window with a fixed (inclusive) start time.
+ * All windows of the same {@link org.apache.kafka.streams.kstream.UnlimitedWindows window specification} will have the
+ * same start time.
+ * To make the window size "infinite" end time is set to {@link Long#MAX_VALUE}.
+ *
+ * For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ *
+ * @see TimeWindow
+ * @see SessionWindow
+ * @see org.apache.kafka.streams.kstream.UnlimitedWindows
+ * @see org.apache.kafka.streams.processor.TimestampExtractor
+ */
+@InterfaceStability.Unstable
public class UnlimitedWindow extends Window {
+ /**
+ * Create a new window for the given start time (inclusive).
+ *
+ * @param startMs the start timestamp of the window (inclusive)
+ * @throws IllegalArgumentException if {@code start} is negative
+ */
public UnlimitedWindow(final long startMs) {
super(startMs, Long.MAX_VALUE);
}
+ /**
+ * Returns {@code true} if the given window is of the same type, because all unlimited windows overlap with each
+ * other due to their infinite size.
+ *
+ * @param other another window
+ * @return {@code true}
+ * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
+ */
@Override
public boolean overlap(final Window other) {
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
- + other.getClass());
+ + other.getClass() + ".");
}
return true;
}
-}
\ No newline at end of file
+}
* +--------------------------------------+
* | key | value | time |
@@ -39,13 +37,12 @@
* | A | 3 | 20 |
* +-----------+-------------+------------+
*
- *
* +--------------------------------------+
* | key | value | time |
@@ -53,9 +50,21 @@
* | A | 4 | 16 |
* +-----------+-------------+------------+
*
- *
- *
+ * Thus, the specified {@link TimeWindow}s are aligned to the epoch.
+ * Aligned to the epoch means, that the first window starts at timestamp zero.
+ * For example, hopping windows with size of 5000ms and advance of 3000ms, have window boundaries
+ * [0;5000),[3000;8000),... and not [1000;6000),[4000;9000),... or even something "random" like [1452;6452),[4452;9452),...
+ *
- * it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or more "adjacent" windows.
+ * it discretize a stream into overlapping windows, which implies that a record maybe contained in one and or
+ * more "adjacent" windows.
- * it discretize a stream into non-overlapping windows, which implies that a record is only ever contained in one and only one tumbling window.