Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,20 +17,23 @@

package org.apache.kafka.streams.kstream;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;

/**
* The window specifications used for joins.
* <p>
* A {@link JoinWindows} instance defines a join over two stream on the same key and a maximum time difference.
* A {@link JoinWindows} instance defines a maximum time difference for a {@link KStream#join(KStream, ValueJoiner,
* JoinWindows) join over two streams} on the same key.
* In SQL-style you would express this join as
* <pre>
* <pre>{@code
* SELECT * FROM stream1, stream2
* WHERE
* stream1.key = stream2.key
* AND
* stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after
* </pre>
* }</pre>
* There are three different window configuration supported:
* <ul>
* <li>before = after = time-difference</li>
Expand All @@ -40,9 +43,27 @@
* A join is symmetric in the sense, that a join specification on the first stream returns the same result record as
* a join specification on the second stream with flipped before and after values.
* <p>
* Both values (before and after) must not result in an "inverse" window,
* i.e., lower-interval-bound must not be larger than upper-interval.bound.
* Both values (before and after) must not result in an "inverse" window, i.e., upper-interval bound cannot be smaller
* than lower-interval bound.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want this to be "lower-interval-bound" to be consistent with "upper-interval-bound", or vice-versa?

* <p>
* {@link JoinWindows} are sliding windows, thus, they are aligned to the actual record timestamps.
* This implies, that each input record defines its own window with start and end time being relative to the record's
* timestamp.
* <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
*
* @see TimeWindows
* @see UnlimitedWindows
* @see SessionWindows
* @see KStream#join(KStream, ValueJoiner, JoinWindows)
* @see KStream#join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see org.apache.kafka.streams.processor.TimestampExtractor
*/
@InterfaceStability.Unstable
public class JoinWindows extends Windows<Window> {

/** Maximum time difference for tuples that are before the join tuple. */
Expand All @@ -52,57 +73,75 @@ public class JoinWindows extends Windows<Window> {

private JoinWindows(final long beforeMs, final long afterMs) {
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative");
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative.");
}
this.afterMs = afterMs;
this.beforeMs = beforeMs;
}

/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
* ({@code timeDifference} must not be negative)
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifferenceMs},
* i.e., the timestamp of a record from the secondary stream is max {@code timeDifferenceMs} earlier or later than
* the timestamp of the record from the primary stream.
*
* @param timeDifference join window interval
* @param timeDifferenceMs join window interval in milliseconds
* @throws IllegalArgumentException if {@code timeDifferenceMs} is negative
*/
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, timeDifferenceMs);
}

/**
* Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream is
* earlier than or equal to the timestamp of a record from the first stream.
* Changes the start window boundary to {@code timeDifferenceMs} but keep the end window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} earlier than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "after"
* value (which would result in a negative window size).
*
* @param timeDifference join window interval
* @param timeDifferenceMs relative window start time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
*/
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs);
}

/**
* Specifies that records of the same key are joinable if their timestamps are within
* the join window interval, and if the timestamp of a record from the secondary stream
* is later than or equal to the timestamp of a record from the first stream.
* Changes the end window boundary to {@code timeDifferenceMs} but keep the start window boundary as is.
* Thus, records of the same key are joinable if the timestamp of a record from the secondary stream is at most
* {@code timeDifferenceMs} later than the timestamp of the record from the primary stream.
* {@code timeDifferenceMs} can be negative but it's absolute value must not be larger than current window "before"
* value (which would result in a negative window size).
*
* @param timeDifference join window interval
* @param timeDifferenceMs relative window end time in milliseconds
* @throws IllegalArgumentException if the resulting window size is negative
*/
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs);
}

/**
* Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
* Not supported by {@link JoinWindows}.
* Throws {@link UnsupportedOperationException}.
*
* @throws UnsupportedOperationException at every invocation
*/
@Override
public Map<Long, Window> windowsFor(final long timestamp) {
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
throw new UnsupportedOperationException("windowsFor() is not supported by JoinWindows.");
}

@Override
public long size() {
return beforeMs + afterMs;
}

/**
* {@inheritDoc}
*
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
*/
@Override
public JoinWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < size()) {
Expand All @@ -112,6 +151,13 @@ public JoinWindows until(final long durationMs) throws IllegalArgumentException
return this;
}

/**
* {@inheritDoc}
* <p>
* For {@link TimeWindows} the maintain duration is at least as small as the window size.
*
* @return the window maintain duration
*/
@Override
public long maintainMs() {
return Math.max(super.maintainMs(), size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@
*/
package org.apache.kafka.streams.kstream;


import org.apache.kafka.common.annotation.InterfaceStability;

/**
* A session based window specification used for aggregating events into sessions.
* <p>
* Sessions represent a period of activity separated by a defined gap of inactivity.
* Any events processed that fall within the inactivity gap of any existing sessions
* are merged into the existing sessions. If the event falls outside of the session gap
* then a new session will be created.
* Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions.
* If the event falls outside of the session gap then a new session will be created.
* <p>
* For example, If we have a session gap of 5 and the following data arrives:
* For example, if we have a session gap of 5 and the following data arrives:
* <pre>
* +--------------------------------------+
* | key | value | time |
Expand All @@ -39,23 +37,34 @@
* | A | 3 | 20 |
* +-----------+-------------+------------+
* </pre>
* <p>
* We'd have 2 sessions for key A. 1 starting from time 10 and ending at time 12 and another
* starting and ending at time 20. The length of the session is driven by the timestamps of
* the data within the session
* We'd have 2 sessions for key A.
* One starting from time 10 and ending at time 12 and another starting and ending at time 20.
* The length of the session is driven by the timestamps of the data within the session.
* Thus, session windows are no fixed-size windows (c.f. {@link TimeWindows} and {@link JoinWindows}).
* <p>
* If we then received another record:
* <p>
* <pre>
* +--------------------------------------+
* | key | value | time |
* +-----------+-------------+------------+
* | A | 4 | 16 |
* +-----------+-------------+------------+
* </pre>
* <p>
* The previous 2 sessions would be merged into a single session with start time 10 and end time 20.
* The aggregate value for this session would be the result of aggregating all 4 values.
* <p>
* For time semantics, see {@link org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
*
* @see TimeWindows
* @see UnlimitedWindows
* @see JoinWindows
* @see KGroupedStream#count(SessionWindows, String)
* @see KGroupedStream#count(SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see KGroupedStream#reduce(Reducer, SessionWindows, String)
* @see KGroupedStream#reduce(Reducer, SessionWindows, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, String)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Merger, SessionWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.streams.processor.StateStoreSupplier)
* @see org.apache.kafka.streams.processor.TimestampExtractor
*/
@InterfaceStability.Unstable
public class SessionWindows {
Expand All @@ -69,42 +78,51 @@ private SessionWindows(final long gapMs) {
}

/**
* Create a new SessionWindows with the specified inactivity gap
* @param inactivityGapMs the gap of inactivity between sessions
* @return a new SessionWindows with the provided inactivity gap
* and default maintain duration
* Create a new window specification with the specified inactivity gap in milliseconds.
*
* @param inactivityGapMs the gap of inactivity between sessions in milliseconds
* @return a new window specification with default maintain duration of 1 day
*
* @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative
*/
public static SessionWindows with(final long inactivityGapMs) {
if (inactivityGapMs < 1) {
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(inactivityGapMs);
}

/**
* Set the window maintain duration in milliseconds of streams time.
* Set the window maintain duration (retention time) in milliseconds.
* This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
*
* @return itself
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is smaller than window gap
*/
public SessionWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < gapMs) {
throw new IllegalArgumentException("Window retentin time (durationMs) cannot be smaller than window gap.");
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
}
maintainDurationMs = durationMs;

return this;
}

/**
* @return the inactivityGap
* Return the specified gap for the session windows in milliseconds.
*
* @return the inactivity gap of the specified windows
*/
public long inactivityGap() {
return gapMs;
}

/**
* @return the minimum amount of time a window will be maintained for.
* Return the window maintain duration (retention time) in milliseconds.
* <p>
* For {@link SessionWindows} the maintain duration is at least as small as the window gap.
*
* @return the window maintain duration
*/
public long maintainMs() {
return Math.max(maintainDurationMs, gapMs);
Expand Down
Loading