Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.kstream.internals.TimeWindow;

import java.util.Map;

/**
Expand All @@ -45,21 +43,19 @@
* Both values (before and after) must not result in an "inverse" window,
* i.e., lower-interval-bound must not be larger than upper-interval.bound.
*/
public class JoinWindows extends Windows<TimeWindow> {
public class JoinWindows extends Windows<Window> {

/** Maximum time difference for tuples that are before the join tuple. */
public final long before;
public final long beforeMs;
/** Maximum time difference for tuples that are after the join tuple. */
public final long after;

private JoinWindows(long before, long after) {
super();
public final long afterMs;

if (before + after < 0) {
throw new IllegalArgumentException("Window interval (ie, before+after) must not be negative");
private JoinWindows(final long beforeMs, final long afterMs) {
if (beforeMs + afterMs < 0) {
throw new IllegalArgumentException("Window interval (ie, beforeMs+afterMs) must not be negative");
}
this.after = after;
this.before = before;
this.afterMs = afterMs;
this.beforeMs = beforeMs;
}

/**
Expand All @@ -68,8 +64,8 @@ private JoinWindows(long before, long after) {
*
* @param timeDifference join window interval
*/
public static JoinWindows of(long timeDifference) {
return new JoinWindows(timeDifference, timeDifference);
public static JoinWindows of(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, timeDifferenceMs);
}

/**
Expand All @@ -79,8 +75,8 @@ public static JoinWindows of(long timeDifference) {
*
* @param timeDifference join window interval
*/
public JoinWindows before(long timeDifference) {
return new JoinWindows(timeDifference, this.after);
public JoinWindows before(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(timeDifferenceMs, afterMs);
}

/**
Expand All @@ -90,40 +86,54 @@ public JoinWindows before(long timeDifference) {
*
* @param timeDifference join window interval
*/
public JoinWindows after(long timeDifference) {
return new JoinWindows(this.before, timeDifference);
public JoinWindows after(final long timeDifferenceMs) throws IllegalArgumentException {
return new JoinWindows(beforeMs, timeDifferenceMs);
}

/**
* Not supported by {@link JoinWindows}. Throws {@link UnsupportedOperationException}.
*/
@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
public Map<Long, Window> windowsFor(final long timestamp) {
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}

@Override
public long size() {
return after + before;
return beforeMs + afterMs;
}

@Override
public JoinWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < size()) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
}
super.until(durationMs);
return this;
}

@Override
public long maintainMs() {
return Math.max(super.maintainMs(), size());
}

@Override
public final boolean equals(Object o) {
public final boolean equals(final Object o) {
if (o == this) {
return true;
}
if (!(o instanceof JoinWindows)) {
return false;
}

JoinWindows other = (JoinWindows) o;
return this.before == other.before && this.after == other.after;
final JoinWindows other = (JoinWindows) o;
return beforeMs == other.beforeMs && afterMs == other.afterMs;
}

@Override
public int hashCode() {
int result = (int) (before ^ (before >>> 32));
result = 31 * result + (int) (after ^ (after >>> 32));
int result = (int) (beforeMs ^ (beforeMs >>> 32));
result = 31 * result + (int) (afterMs ^ (afterMs >>> 32));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public class SessionWindows {
private final long gapMs;
private long maintainDurationMs;

private SessionWindows(final long gapMs, final long maintainDurationMs) {
private SessionWindows(final long gapMs) {
this.gapMs = gapMs;
this.maintainDurationMs = maintainDurationMs;
maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
}

/**
Expand All @@ -75,7 +75,10 @@ private SessionWindows(final long gapMs, final long maintainDurationMs) {
* and default maintain duration
*/
public static SessionWindows with(final long inactivityGapMs) {
return new SessionWindows(inactivityGapMs, Windows.DEFAULT_MAINTAIN_DURATION);
if (inactivityGapMs < 1) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
return new SessionWindows(inactivityGapMs);
}

/**
Expand All @@ -84,8 +87,12 @@ public static SessionWindows with(final long inactivityGapMs) {
*
* @return itself
*/
public SessionWindows until(final long durationMs) {
this.maintainDurationMs = durationMs;
public SessionWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < gapMs) {
throw new IllegalArgumentException("Window retentin time (durationMs) cannot be smaller than window gap.");
}
maintainDurationMs = durationMs;

return this;
}

Expand All @@ -100,6 +107,6 @@ public long inactivityGap() {
* @return the minimum amount of time a window will be maintained for.
*/
public long maintainMs() {
return maintainDurationMs;
return Math.max(maintainDurationMs, gapMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,18 @@ public class TimeWindows extends Windows<TimeWindow> {
* The window size's effective time unit is determined by the semantics of the topology's
* configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long size;
public final long sizeMs;

/**
* The size of the window's advance interval, i.e. by how much a window moves forward relative
* to the previous one. The interval's effective time unit is determined by the semantics of
* the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long advance;
public final long advanceMs;


private TimeWindows(long size, long advance) {
super();
if (size <= 0) {
throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")");
}
this.size = size;
if (!(0 < advance && advance <= size)) {
throw new IllegalArgumentException(
String.format("advance interval (%d) must lie within interval (0, %d]", advance, size));
}
this.advance = advance;
private TimeWindows(final long sizeMs, final long advanceMs) throws IllegalArgumentException {
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
}

/**
Expand All @@ -76,8 +67,11 @@ private TimeWindows(long size, long advance) {
* topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
*/
public static TimeWindows of(long size) {
return new TimeWindows(size, size);
public static TimeWindows of(final long sizeMs) throws IllegalArgumentException {
if (sizeMs <= 0) {
throw new IllegalArgumentException("Window sizeMs must be larger than zero.");
}
return new TimeWindows(sizeMs, sizeMs);
}

/**
Expand All @@ -93,43 +87,59 @@ public static TimeWindows of(long size) {
* {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
*/
public TimeWindows advanceBy(long interval) {
return new TimeWindows(this.size, interval);
public TimeWindows advanceBy(final long advanceMs) {
if (advanceMs <= 0 || advanceMs > sizeMs) {
throw new IllegalArgumentException(String.format("AdvanceMs must lie within interval (0, %d]", sizeMs));
}
return new TimeWindows(sizeMs, advanceMs);
}

@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
long windowStart = (Math.max(0, timestamp - this.size + this.advance) / this.advance) * this.advance;
Map<Long, TimeWindow> windows = new HashMap<>();
public Map<Long, TimeWindow> windowsFor(final long timestamp) {
long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs;
final Map<Long, TimeWindow> windows = new HashMap<>();
while (windowStart <= timestamp) {
TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);
final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs);
windows.put(windowStart, window);
windowStart += this.advance;
windowStart += advanceMs;
}
return windows;
}

@Override
public long size() {
return size;
return sizeMs;
}

public TimeWindows until(final long durationMs) throws IllegalArgumentException {
if (durationMs < sizeMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than the window size.");
}
super.until(durationMs);
return this;
}

@Override
public long maintainMs() {
return Math.max(super.maintainMs(), sizeMs);
}

@Override
public final boolean equals(Object o) {
public final boolean equals(final Object o) {
if (o == this) {
return true;
}
if (!(o instanceof TimeWindows)) {
return false;
}
TimeWindows other = (TimeWindows) o;
return this.size == other.size && this.advance == other.advance;
final TimeWindows other = (TimeWindows) o;
return sizeMs == other.sizeMs && advanceMs == other.advanceMs;
}

@Override
public int hashCode() {
int result = (int) (size ^ (size >>> 32));
result = 31 * result + (int) (advance ^ (advance >>> 32));
int result = (int) (sizeMs ^ (sizeMs >>> 32));
result = 31 * result + (int) (advanceMs ^ (advanceMs >>> 32));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,23 @@
*/
public class UnlimitedWindows extends Windows<UnlimitedWindow> {

private static final long DEFAULT_START_TIMESTAMP = 0L;
private static final long DEFAULT_START_TIMESTAMP_MS = 0L;

/** The start timestamp of the window. */
public final long start;
public final long startMs;

private UnlimitedWindows(long start) {
super();
if (start < 0) {
throw new IllegalArgumentException("start must be > 0 (you provided " + start + ")");
private UnlimitedWindows(final long startMs) throws IllegalArgumentException {
if (startMs < 0) {
throw new IllegalArgumentException("startMs must be > 0 (you provided " + startMs + ")");
}
this.start = start;
this.startMs = startMs;
}

/**
* Return an unlimited window starting at timestamp zero.
*/
public static UnlimitedWindows of() {
return new UnlimitedWindows(DEFAULT_START_TIMESTAMP);
return new UnlimitedWindows(DEFAULT_START_TIMESTAMP_MS);
}

/**
Expand All @@ -53,18 +52,18 @@ public static UnlimitedWindows of() {
* @param start the window start time
* @return a new unlimited window that starts at {@code start}
*/
public UnlimitedWindows startOn(long start) {
public UnlimitedWindows startOn(final long start) throws IllegalArgumentException {
return new UnlimitedWindows(start);
}

@Override
public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
public Map<Long, UnlimitedWindow> windowsFor(final long timestamp) {
// always return the single unlimited window

// we cannot use Collections.singleMap since it does not support remove()
Map<Long, UnlimitedWindow> windows = new HashMap<>();
if (timestamp >= start) {
windows.put(start, new UnlimitedWindow(start));
final Map<Long, UnlimitedWindow> windows = new HashMap<>();
if (timestamp >= startMs) {
windows.put(startMs, new UnlimitedWindow(startMs));
}
return windows;
}
Expand All @@ -75,7 +74,7 @@ public long size() {
}

@Override
public final boolean equals(Object o) {
public final boolean equals(final Object o) {
if (o == this) {
return true;
}
Expand All @@ -84,13 +83,18 @@ public final boolean equals(Object o) {
return false;
}

UnlimitedWindows other = (UnlimitedWindows) o;
return this.start == other.start;
final UnlimitedWindows other = (UnlimitedWindows) o;
return startMs == other.startMs;
}

@Override
public int hashCode() {
return (int) (start ^ (start >>> 32));
return (int) (startMs ^ (startMs >>> 32));
}

@Override
public UnlimitedWindows until(final long durationMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be set for UnlimitedWindows.");
}

@Override
Expand Down
Loading