diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 4124b32c301d8..bd2fe3f7668a3 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -24,11 +24,11 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.StreamsConfig; @@ -160,7 +160,7 @@ public KeyValue apply(String user, PageViewByRegion vi return new KeyValue<>(viewRegion.region, viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) + .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).shiftedBy(1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index e61842ffe3273..b4f4fae418224 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -30,11 +30,11 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.Windowed; @@ -99,7 +99,7 @@ public KeyValue apply(String user, JsonNode viewRegion) { return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) + .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).shiftedBy(1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java deleted file mode 100644 index aa866e4274fc4..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.HoppingWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * The hopping window specifications used for aggregations. - */ -public class HoppingWindows extends Windows { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - public final long period; - - private HoppingWindows(String name, long size, long period) { - super(name); - - this.size = size; - this.period = period; - } - - /** - * Returns a half-interval hopping window definition with the window size in milliseconds - * of the form [ N * default_size, N * default_size + default_size ) - */ - public static HoppingWindows of(String name) { - return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS); - } - - /** - * Returns a new hopping window definition with the original size but reassign the window - * period in milliseconds of the form [ N * period, N * period + size ) - */ - public HoppingWindows with(long size) { - return new HoppingWindows(this.name, size, this.period); - } - - /** - * Returns a new hopping window definition with the original size but reassign the window - * period in milliseconds of the form [ N * period, N * period + size ) - */ - public HoppingWindows every(long period) { - return new HoppingWindows(this.name, this.size, period); - } - - @Override - public Map windowsFor(long timestamp) { - long enclosed = (size - 1) / period; - - long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period); - - Map windows = new HashMap<>(); - while (windowStart <= timestamp) { - // add the window - HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size); - windows.put(windowStart, window); - - // advance the step period - windowStart += this.period; - } - - return windows; - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(HoppingWindows.class)) - return false; - - HoppingWindows otherWindows = (HoppingWindows) other; - - return this.size == otherWindows.size && this.period == otherWindows.period; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 24dbdd33b2547..a74984a29b130 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -17,15 +17,14 @@ package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.TumblingWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import java.util.Map; /** * The window specifications used for joins. */ -public class JoinWindows extends Windows { +public class JoinWindows extends Windows { public final long before; public final long after; @@ -74,19 +73,29 @@ public JoinWindows after(long timeDifference) { } @Override - public Map windowsFor(long timestamp) { + public Map windowsFor(long timestamp) { // this function should never be called throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows"); } @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(JoinWindows.class)) + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof JoinWindows)) { return false; + } - JoinWindows otherWindows = (JoinWindows) other; + JoinWindows other = (JoinWindows) o; + return this.before == other.before && this.after == other.after; + } - return this.before == otherWindows.before && this.after == otherWindows.after; + @Override + public int hashCode() { + int result = (int) (before ^ (before >>> 32)); + result = 31 * result + (int) (after ^ (after >>> 32)); + return result; } -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java new file mode 100644 index 0000000000000..0aefc8a44126b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.TimeWindow; + +import java.util.HashMap; +import java.util.Map; + +/** + * The time-based window specifications used for aggregations. + */ +public class TimeWindows extends Windows { + + /** + * The size of the window, i.e. how long a window lasts. + * The window size's effective time unit is determined by the semantics of the topology's + * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + */ + public final long size; + + /** + * The size of the window's hop, i.e. by how much a window moves forward relative to the previous one. + * The hop size's effective time unit is determined by the semantics of the topology's + * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + */ + public final long hop; + + private TimeWindows(String name, long size, long hop) { + super(name); + if (size <= 0) { + throw new IllegalArgumentException("size must be > 0 (you provided " + size + ")"); + } + this.size = size; + if (!(0 < hop && hop <= size)) { + throw new IllegalArgumentException(String.format("hop (%d) must lie within interval (0, %d]", hop, size)); + } + this.hop = hop; + } + + /** + * Returns a window definition with the given window size, and with the hop size being equal to + * the window size. Think: [N * size, N * size + size), with N denoting the N-th window. + * + * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, + * non-overlapping windows. Tumbling windows are a specialization of hopping windows. + * + * @param name The name of the window. Must not be null or empty. + * @param size The size of the window, with the requirement that size > 0. + * The window size's effective time unit is determined by the semantics of the + * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + * @return a new window definition + */ + public static TimeWindows of(String name, long size) { + return new TimeWindows(name, size, size); + } + + /** + * Returns a window definition with the original size, but shift ("hop") the window by the given + * hop size, which specifies by how much a window moves forward relative to the previous one. + * Think: [N * hop, N * hop + size), with N denoting the N-th window. + * + * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows. + * + * @param hop The hop size of the window, with the requirement that 0 < hop ≤ size. + * The hop size's effective time unit is determined by the semantics of the + * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + * @return a new window definition + */ + public TimeWindows shiftedBy(long hop) { + return new TimeWindows(this.name, this.size, hop); + } + + /** + * Returns the windows that contain the provided timestamp. + * + * @param timestamp the timestamp + * @return a map of (windowStartTimestamp, window) entries + */ + @Override + public Map windowsFor(long timestamp) { + long enclosed = (size - 1) / hop; + long windowStart = Math.max(0, timestamp - timestamp % hop - enclosed * hop); + + Map windows = new HashMap<>(); + while (windowStart <= timestamp) { + TimeWindow window = new TimeWindow(windowStart, windowStart + this.size); + windows.put(windowStart, window); + windowStart += this.hop; + } + return windows; + } + + @Override + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof TimeWindows)) { + return false; + } + TimeWindows other = (TimeWindows) o; + return this.size == other.size && this.hop == other.hop; + } + + @Override + public int hashCode() { + int result = (int) (size ^ (size >>> 32)); + result = 31 * result + (int) (hop ^ (hop >>> 32)); + return result; + } + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java deleted file mode 100644 index cadedba550ba9..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.TumblingWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * The tumbling window specifications used for aggregations. - */ -public class TumblingWindows extends Windows { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - private TumblingWindows(String name, long size) { - super(name); - - this.size = size; - } - - /** - * Returns a half-interval sliding window definition with the default window size - */ - public static TumblingWindows of(String name) { - return new TumblingWindows(name, DEFAULT_SIZE_MS); - } - - /** - * Returns a half-interval sliding window definition with the window size in milliseconds - */ - public TumblingWindows with(long size) { - return new TumblingWindows(this.name, size); - } - - @Override - public Map windowsFor(long timestamp) { - long windowStart = timestamp - timestamp % size; - - // we cannot use Collections.singleMap since it does not support remove() call - Map windows = new HashMap<>(); - windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size)); - - return windows; - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(TumblingWindows.class)) - return false; - - TumblingWindows otherWindows = (TumblingWindows) other; - - return this.size == otherWindows.size; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 7cadfb4ff1d25..eea035f597238 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -50,23 +50,30 @@ public UnlimitedWindows startOn(long start) { @Override public Map windowsFor(long timestamp) { - // always return the single unlimited window - - // we cannot use Collections.singleMap since it does not support remove() call + // Always return the single unlimited window. + // We cannot use Collections.singleMap since it does not support remove(). Map windows = new HashMap<>(); windows.put(start, new UnlimitedWindow(start)); - - return windows; } @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(UnlimitedWindows.class)) + public final boolean equals(Object o) { + if (o == this) { + return true; + } + + if (!(o instanceof UnlimitedWindows)) { return false; + } - UnlimitedWindows otherWindows = (UnlimitedWindows) other; + UnlimitedWindows other = (UnlimitedWindows) o; + return this.start == other.start; + } - return this.start == otherWindows.start; + @Override + public int hashCode() { + return (int) (start ^ (start >>> 32)); } -} + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index f2965dc07d372..784d5c309c659 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -48,21 +48,18 @@ public boolean overlap(Window other) { return this.start() < other.end() || other.start() < this.end(); } - public boolean equalsTo(Window other) { - return this.start() == other.start() && this.end() == other.end(); - } - @Override public boolean equals(Object obj) { - if (obj == this) + if (obj == this) { return true; + } - if (!(obj instanceof Window)) + if (getClass() != obj.getClass()) { return false; + } Window other = (Window) obj; - - return this.equalsTo(other) && this.start == other.start && this.end == other.end; + return this.start == other.start && this.end == other.end; } @Override @@ -70,4 +67,5 @@ public int hashCode() { long n = (this.start << 32) | this.end; return (int) (n % 0xFFFFFFFFL); } + } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index e7dc23ec9a739..1406de62e217d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -45,6 +45,9 @@ public abstract class Windows { public int segments; protected Windows(String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("name must not be null or empty"); + } this.name = name; this.segments = DEFAULT_NUM_SEGMENTS; this.emitDurationMs = DEFAULT_EMIT_DURATION; @@ -95,7 +98,6 @@ protected String newName(String prefix) { return prefix + String.format("%010d", NAME_INDEX.getAndIncrement()); } - public abstract boolean equalTo(Windows other); - public abstract Map windowsFor(long timestamp); + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java similarity index 75% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java index 8b0b2fbe26c4f..5dfb9eb601d85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java @@ -19,19 +19,15 @@ import org.apache.kafka.streams.kstream.Window; -public class HoppingWindow extends Window { +public class TimeWindow extends Window { - public HoppingWindow(long start, long end) { + public TimeWindow(long start, long end) { super(start, end); } @Override public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(HoppingWindow.class); + return getClass() == other.getClass() && super.overlap(other); } - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class); - } -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java deleted file mode 100644 index a02d4b90937fd..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.streams.kstream.Window; - -public class TumblingWindow extends Window { - - public TumblingWindow(long start, long end) { - super(start, end); - } - - @Override - public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(TumblingWindow.class); - } - - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java index 8ac8f70e4c13b..4b93f9b731d19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java @@ -27,11 +27,7 @@ public UnlimitedWindow(long start) { @Override public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class); + return getClass() == other.getClass() && super.overlap(other); } - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class); - } -} +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java new file mode 100644 index 0000000000000..ffde37c183045 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TimeWindowsTest { + + private static String anyName = "window"; + private static long anySize = 123L; + + @Test + public void shouldHaveSaneEqualsAndHashCode() { + TimeWindows w1 = TimeWindows.of("w1", anySize); + TimeWindows w2 = TimeWindows.of("w2", w1.size); + + // Reflexive + assertTrue(w1.equals(w1)); + assertTrue(w1.hashCode() == w1.hashCode()); + + // Symmetric + assertTrue(w1.equals(w2)); + assertTrue(w1.hashCode() == w2.hashCode()); + assertTrue(w2.hashCode() == w1.hashCode()); + + // Transitive + TimeWindows w3 = TimeWindows.of("w3", w2.size); + assertTrue(w2.equals(w3)); + assertTrue(w2.hashCode() == w3.hashCode()); + assertTrue(w1.equals(w3)); + assertTrue(w1.hashCode() == w3.hashCode()); + + // Inequality scenarios + assertFalse("must be false for null", w1.equals(null)); + assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant"))); + assertFalse("must be false for different types", w1.equals(new Object())); + + TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1); + assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize)); + + TimeWindows differentHopSize = w1.shiftedBy(w1.hop - 1); + assertFalse("must be false when hop sizes are different", w1.equals(differentHopSize)); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeEmpty() { + TimeWindows.of("", anySize); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeNull() { + TimeWindows.of(null, anySize); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeNegative() { + TimeWindows.of(anyName, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeZero() { + TimeWindows.of(anyName, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void hopSizeMustNotBeNegative() { + TimeWindows.of(anyName, anySize).shiftedBy(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void hopSizeMustNotBeZero() { + TimeWindows.of(anyName, anySize).shiftedBy(0); + } + + @Test(expected = IllegalArgumentException.class) + public void hopSizeMustNotBeLargerThanWindowSize() { + long size = anySize; + TimeWindows.of(anyName, size).shiftedBy(size + 1); + } + + @Test + public void windowsForHoppingWindows() { + TimeWindows windows = TimeWindows.of(anyName, 12L).shiftedBy(5L); + Map matched = windows.windowsFor(21L); + assertEquals(12L / 5L + 1, matched.size()); + assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); + assertEquals(new TimeWindow(15L, 27L), matched.get(15L)); + assertEquals(new TimeWindow(20L, 32L), matched.get(20L)); + } + + @Test + public void windowsForTumblingWindows() { + TimeWindows windows = TimeWindows.of(anyName, 12L); + Map matched = windows.windowsFor(21L); + assertEquals(1, matched.size()); + assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java new file mode 100644 index 0000000000000..a1c5b534e5660 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class UnlimitedWindowsTest { + + private static String anyName = "window"; + + @Test + public void unlimitedWindows() { + long startTime = 10L; + UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(startTime); + + Map matchedWindows1 = w.windowsFor(startTime + 11L); + assertEquals(1, matchedWindows1.size()); + assertEquals(new UnlimitedWindow(startTime), matchedWindows1.get(startTime)); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 3c7a1bdc60485..8511eeb4a0292 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -21,11 +21,11 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.KStreamTestDriver; @@ -67,7 +67,7 @@ public void testAggBasic() throws Exception { KStream stream1 = builder.stream(strSerde, strSerde, topic1); KTable, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + TimeWindows.of("topic1-Canonized", 10).shiftedBy(5), strSerde, strSerde); @@ -144,7 +144,7 @@ public void testJoin() throws Exception { KStream stream1 = builder.stream(strSerde, strSerde, topic1); KTable, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + TimeWindows.of("topic1-Canonized", 10).shiftedBy(5), strSerde, strSerde); @@ -153,7 +153,7 @@ public void testJoin() throws Exception { KStream stream2 = builder.stream(strSerde, strSerde, topic2); KTable, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic2-Canonized").with(10L).every(5L), + TimeWindows.of("topic2-Canonized", 10).shiftedBy(5), strSerde, strSerde); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 7c6d5ec0fd2a2..b31b20d622223 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -49,7 +49,7 @@ public class WindowedStreamPartitionerTest { new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); + private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.emptySet()); @Test public void testCopartitioning() { @@ -71,7 +71,7 @@ public void testCopartitioning() { Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); for (int w = 0; w < 10; w++) { - HoppingWindow window = new HoppingWindow(10 * w, 20 * w); + TimeWindow window = new TimeWindow(10 * w, 20 * w); Windowed windowedKey = new Windowed<>(key, window); Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java deleted file mode 100644 index f9b6ba5656548..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.TumblingWindows; -import org.apache.kafka.streams.kstream.UnlimitedWindows; -import org.junit.Test; - -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class WindowsTest { - - @Test - public void hoppingWindows() { - - HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L); - - Map matched = windows.windowsFor(21L); - - assertEquals(3, matched.size()); - - assertEquals(new HoppingWindow(10L, 22L), matched.get(10L)); - assertEquals(new HoppingWindow(15L, 27L), matched.get(15L)); - assertEquals(new HoppingWindow(20L, 32L), matched.get(20L)); - } - - @Test - public void tumblineWindows() { - - TumblingWindows windows = TumblingWindows.of("test").with(12L); - - Map matched = windows.windowsFor(21L); - - assertEquals(1, matched.size()); - - assertEquals(new TumblingWindow(12L, 24L), matched.get(12L)); - } - - @Test - public void unlimitedWindows() { - - UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L); - - Map matched = windows.windowsFor(21L); - - assertEquals(1, matched.size()); - - assertEquals(new UnlimitedWindow(10L), matched.get(10L)); - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 95e0fbfe3173a..733c1ea33e6e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.TumblingWindows; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.UnlimitedWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; @@ -207,7 +207,7 @@ public Double apply(Long value1, Long value2) { // windowed count data.countByKey( - TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE), + TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE), stringSerde ).toStream().map( new KeyValueMapper, Long, KeyValue>() {