From e4d498a0ee3a24a82aebc500e8e8f2b4913c8dff Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Fri, 22 Apr 2016 16:17:33 -0700 Subject: [PATCH 1/5] KAFKA-3613: Consolidate TumblingWindows and HoppingWindows into TimeWindows --- .../examples/pageview/PageViewTypedDemo.java | 4 +- .../pageview/PageViewUntypedDemo.java | 4 +- .../kafka/streams/kstream/HoppingWindows.java | 95 -------------- .../kafka/streams/kstream/JoinWindows.java | 27 ++-- .../kafka/streams/kstream/TimeWindows.java | 118 ++++++++++++++++++ .../streams/kstream/TumblingWindows.java | 74 ----------- .../streams/kstream/UnlimitedWindows.java | 20 ++- .../apache/kafka/streams/kstream/Window.java | 14 +-- .../apache/kafka/streams/kstream/Windows.java | 6 +- .../{HoppingWindow.java => TimeWindow.java} | 12 +- .../kstream/internals/TumblingWindow.java | 38 ------ .../kstream/internals/UnlimitedWindow.java | 8 +- .../streams/kstream/TimeWindowsTest.java | 62 +++++++++ .../internals/KStreamWindowAggregateTest.java | 8 +- .../WindowedStreamPartitionerTest.java | 4 +- .../kstream/internals/WindowsTest.java | 33 ++--- .../streams/smoketest/SmokeTestClient.java | 4 +- 17 files changed, 251 insertions(+), 280 deletions(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java rename streams/src/main/java/org/apache/kafka/streams/kstream/internals/{HoppingWindow.java => TimeWindow.java} (75%) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 4124b32c301d8..bd2fe3f7668a3 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -24,11 +24,11 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.StreamsConfig; @@ -160,7 +160,7 @@ public KeyValue apply(String user, PageViewByRegion vi return new KeyValue<>(viewRegion.region, viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) + .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).shiftedBy(1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index e61842ffe3273..b4f4fae418224 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -30,11 +30,11 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.Windowed; @@ -99,7 +99,7 @@ public KeyValue apply(String user, JsonNode viewRegion) { return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) + .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).shiftedBy(1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper, Long, KeyValue>() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java deleted file mode 100644 index aa866e4274fc4..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.HoppingWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * The hopping window specifications used for aggregations. - */ -public class HoppingWindows extends Windows { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - public final long period; - - private HoppingWindows(String name, long size, long period) { - super(name); - - this.size = size; - this.period = period; - } - - /** - * Returns a half-interval hopping window definition with the window size in milliseconds - * of the form [ N * default_size, N * default_size + default_size ) - */ - public static HoppingWindows of(String name) { - return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS); - } - - /** - * Returns a new hopping window definition with the original size but reassign the window - * period in milliseconds of the form [ N * period, N * period + size ) - */ - public HoppingWindows with(long size) { - return new HoppingWindows(this.name, size, this.period); - } - - /** - * Returns a new hopping window definition with the original size but reassign the window - * period in milliseconds of the form [ N * period, N * period + size ) - */ - public HoppingWindows every(long period) { - return new HoppingWindows(this.name, this.size, period); - } - - @Override - public Map windowsFor(long timestamp) { - long enclosed = (size - 1) / period; - - long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period); - - Map windows = new HashMap<>(); - while (windowStart <= timestamp) { - // add the window - HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size); - windows.put(windowStart, window); - - // advance the step period - windowStart += this.period; - } - - return windows; - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(HoppingWindows.class)) - return false; - - HoppingWindows otherWindows = (HoppingWindows) other; - - return this.size == otherWindows.size && this.period == otherWindows.period; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 24dbdd33b2547..a74984a29b130 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -17,15 +17,14 @@ package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.TumblingWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import java.util.Map; /** * The window specifications used for joins. */ -public class JoinWindows extends Windows { +public class JoinWindows extends Windows { public final long before; public final long after; @@ -74,19 +73,29 @@ public JoinWindows after(long timeDifference) { } @Override - public Map windowsFor(long timestamp) { + public Map windowsFor(long timestamp) { // this function should never be called throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows"); } @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(JoinWindows.class)) + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof JoinWindows)) { return false; + } - JoinWindows otherWindows = (JoinWindows) other; + JoinWindows other = (JoinWindows) o; + return this.before == other.before && this.after == other.after; + } - return this.before == otherWindows.before && this.after == otherWindows.after; + @Override + public int hashCode() { + int result = (int) (before ^ (before >>> 32)); + result = 31 * result + (int) (after ^ (after >>> 32)); + return result; } -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java new file mode 100644 index 0000000000000..a6b11fdeaaf2b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.TimeWindow; + +import java.util.HashMap; +import java.util.Map; + +/** + * The time-based window specifications used for aggregations. + */ +public class TimeWindows extends Windows { + + /** + * The size of the window, i.e. how long a window lasts. + * The window size's effective time unit is determined by the semantics of the topology's + * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + */ + public final long size; + + /** + * The size of the window's hop, i.e. by how much a window moves forward relative to the previous one. + * The hop size's effective time unit is determined by the semantics of the topology's + * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + */ + public final long hop; + + private TimeWindows(String name, long size, long hop) { + super(name); + if (size <= 0) { + throw new IllegalArgumentException("size must be > 0 (you provided " + size + ")"); + } + this.size = size; + if (!(0 < hop && hop <= size)) { + throw new IllegalArgumentException(String.format("hop (%d) must lie within interval (0, %d]", hop, size)); + } + this.hop = hop; + } + + /** + * Returns a window definition with the given window size, and with the hop size being equal to + * the window size. Think: [N * size, N * size + size), with N denoting the N-th window. + * + * This provides the semantics of tumbling windows, which are fixed-sized, gap-less, + * non-overlapping windows. Tumbling windows are a specialization of hopping windows. + * + * @param name The name of the window. Must not be null or empty. + * @param size The size of the window, with the requirement that size > 0. + * The window size's effective time unit is determined by the semantics of the + * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + * @return a new window definition + */ + public static TimeWindows of(String name, long size) { + return new TimeWindows(name, size, size); + } + + /** + * Returns a window definition with the original size, but shift ("hop") the window by the given + * hop size, which specifies by how much a window moves forward relative to the previous one. + * Think: [N * hop, N * hop + size), with N denoting the N-th window. + * + * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows. + * + * @param hop The hop size of the window, with the requirement that 0 < hop ≤ size. + * The hop size's effective time unit is determined by the semantics of the + * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}. + * @return a new window definition + */ + public TimeWindows shiftedBy(long hop) { + return new TimeWindows(this.name, this.size, hop); + } + + // TODO: Document windowsFor() + @Override + public Map windowsFor(long timestamp) { + long windowStart = timestamp - timestamp % this.size; + // We cannot use Collections.singleMap since it does not support remove() call. + Map windows = new HashMap<>(); + windows.put(windowStart, new TimeWindow(windowStart, windowStart + this.size)); + return windows; + } + + @Override + public final boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof TimeWindows)) { + return false; + } + TimeWindows other = (TimeWindows) o; + return this.size == other.size && this.hop == other.hop; + } + + @Override + public int hashCode() { + int result = (int) (size ^ (size >>> 32)); + result = 31 * result + (int) (hop ^ (hop >>> 32)); + return result; + } + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java deleted file mode 100644 index cadedba550ba9..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.kstream.internals.TumblingWindow; - -import java.util.HashMap; -import java.util.Map; - -/** - * The tumbling window specifications used for aggregations. - */ -public class TumblingWindows extends Windows { - - private static final long DEFAULT_SIZE_MS = 1000L; - - public final long size; - - private TumblingWindows(String name, long size) { - super(name); - - this.size = size; - } - - /** - * Returns a half-interval sliding window definition with the default window size - */ - public static TumblingWindows of(String name) { - return new TumblingWindows(name, DEFAULT_SIZE_MS); - } - - /** - * Returns a half-interval sliding window definition with the window size in milliseconds - */ - public TumblingWindows with(long size) { - return new TumblingWindows(this.name, size); - } - - @Override - public Map windowsFor(long timestamp) { - long windowStart = timestamp - timestamp % size; - - // we cannot use Collections.singleMap since it does not support remove() call - Map windows = new HashMap<>(); - windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size)); - - return windows; - } - - @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(TumblingWindows.class)) - return false; - - TumblingWindows otherWindows = (TumblingWindows) other; - - return this.size == otherWindows.size; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 7cadfb4ff1d25..7a9a60249db02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -61,12 +61,22 @@ public Map windowsFor(long timestamp) { } @Override - public boolean equalTo(Windows other) { - if (!other.getClass().equals(UnlimitedWindows.class)) + public final boolean equals(Object o) { + if (o == this) { + return true; + } + + if (!(o instanceof UnlimitedWindows)) { return false; + } - UnlimitedWindows otherWindows = (UnlimitedWindows) other; + UnlimitedWindows other = (UnlimitedWindows) o; + return this.start == other.start; + } - return this.start == otherWindows.start; + @Override + public int hashCode() { + return (int) (start ^ (start >>> 32)); } -} + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index f2965dc07d372..784d5c309c659 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -48,21 +48,18 @@ public boolean overlap(Window other) { return this.start() < other.end() || other.start() < this.end(); } - public boolean equalsTo(Window other) { - return this.start() == other.start() && this.end() == other.end(); - } - @Override public boolean equals(Object obj) { - if (obj == this) + if (obj == this) { return true; + } - if (!(obj instanceof Window)) + if (getClass() != obj.getClass()) { return false; + } Window other = (Window) obj; - - return this.equalsTo(other) && this.start == other.start && this.end == other.end; + return this.start == other.start && this.end == other.end; } @Override @@ -70,4 +67,5 @@ public int hashCode() { long n = (this.start << 32) | this.end; return (int) (n % 0xFFFFFFFFL); } + } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index e7dc23ec9a739..1406de62e217d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -45,6 +45,9 @@ public abstract class Windows { public int segments; protected Windows(String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("name must not be null or empty"); + } this.name = name; this.segments = DEFAULT_NUM_SEGMENTS; this.emitDurationMs = DEFAULT_EMIT_DURATION; @@ -95,7 +98,6 @@ protected String newName(String prefix) { return prefix + String.format("%010d", NAME_INDEX.getAndIncrement()); } - public abstract boolean equalTo(Windows other); - public abstract Map windowsFor(long timestamp); + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java similarity index 75% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java index 8b0b2fbe26c4f..5dfb9eb601d85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java @@ -19,19 +19,15 @@ import org.apache.kafka.streams.kstream.Window; -public class HoppingWindow extends Window { +public class TimeWindow extends Window { - public HoppingWindow(long start, long end) { + public TimeWindow(long start, long end) { super(start, end); } @Override public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(HoppingWindow.class); + return getClass() == other.getClass() && super.overlap(other); } - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class); - } -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java deleted file mode 100644 index a02d4b90937fd..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.streams.kstream.Window; - -public class TumblingWindow extends Window { - - public TumblingWindow(long start, long end) { - super(start, end); - } - - @Override - public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(TumblingWindow.class); - } - - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java index 8ac8f70e4c13b..4b93f9b731d19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java @@ -27,11 +27,7 @@ public UnlimitedWindow(long start) { @Override public boolean overlap(Window other) { - return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class); + return getClass() == other.getClass() && super.overlap(other); } - @Override - public boolean equalsTo(Window other) { - return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class); - } -} +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java new file mode 100644 index 0000000000000..2708d382e0d6a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TimeWindowsTest { + + @Test + public void shouldHaveSaneEqualsAndHashCode() { + long arbitrarySize = 10L; + TimeWindows w1 = TimeWindows.of("w1", arbitrarySize); + TimeWindows w2 = TimeWindows.of("w2", w1.size); + + // Reflexive + assertEquals(true, w1.equals(w1)); + assertEquals(true, w1.hashCode() == w1.hashCode()); + + // Symmetric + assertEquals(true, w1.equals(w2)); + assertEquals(true, w1.hashCode() == w2.hashCode()); + assertEquals(true, w2.hashCode() == w1.hashCode()); + + // Transitive + TimeWindows w3 = TimeWindows.of("w3", w2.size); + assertEquals(true, w2.equals(w3)); + assertEquals(true, w2.hashCode() == w3.hashCode()); + assertEquals(true, w1.equals(w3)); + assertEquals(true, w1.hashCode() == w3.hashCode()); + + // Inequality scenarios + assertEquals("must be false for null", false, w1.equals(null)); + assertEquals("must be false for different window types", false, w1.equals(UnlimitedWindows.of("irrelevant"))); + assertEquals("must be false for different types", false, w1.equals(new Object())); + + TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1); + assertEquals("must be false when window sizes are different", false, w1.equals(differentWindowSize)); + + TimeWindows differentHopSize = w1.shiftedBy(w1.hop - 1); + assertEquals("must be false when hop sizes are different", false, w1.equals(differentHopSize)); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 3c7a1bdc60485..8511eeb4a0292 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -21,11 +21,11 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.KStreamTestDriver; @@ -67,7 +67,7 @@ public void testAggBasic() throws Exception { KStream stream1 = builder.stream(strSerde, strSerde, topic1); KTable, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + TimeWindows.of("topic1-Canonized", 10).shiftedBy(5), strSerde, strSerde); @@ -144,7 +144,7 @@ public void testJoin() throws Exception { KStream stream1 = builder.stream(strSerde, strSerde, topic1); KTable, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + TimeWindows.of("topic1-Canonized", 10).shiftedBy(5), strSerde, strSerde); @@ -153,7 +153,7 @@ public void testJoin() throws Exception { KStream stream2 = builder.stream(strSerde, strSerde, topic2); KTable, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), - HoppingWindows.of("topic2-Canonized").with(10L).every(5L), + TimeWindows.of("topic2-Canonized", 10).shiftedBy(5), strSerde, strSerde); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 7c6d5ec0fd2a2..b31b20d622223 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -49,7 +49,7 @@ public class WindowedStreamPartitionerTest { new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); + private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.emptySet()); @Test public void testCopartitioning() { @@ -71,7 +71,7 @@ public void testCopartitioning() { Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); for (int w = 0; w < 10; w++) { - HoppingWindow window = new HoppingWindow(10 * w, 20 * w); + TimeWindow window = new TimeWindow(10 * w, 20 * w); Windowed windowedKey = new Windowed<>(key, window); Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java index f9b6ba5656548..4eb1a1b568515 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java @@ -19,8 +19,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.TumblingWindows; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.UnlimitedWindows; import org.junit.Test; @@ -32,39 +31,27 @@ public class WindowsTest { @Test public void hoppingWindows() { - - HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L); - - Map matched = windows.windowsFor(21L); - + TimeWindows windows = TimeWindows.of("test", 12L).shiftedBy(5L); + Map matched = windows.windowsFor(21L); assertEquals(3, matched.size()); - - assertEquals(new HoppingWindow(10L, 22L), matched.get(10L)); - assertEquals(new HoppingWindow(15L, 27L), matched.get(15L)); - assertEquals(new HoppingWindow(20L, 32L), matched.get(20L)); + assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); + assertEquals(new TimeWindow(15L, 27L), matched.get(15L)); + assertEquals(new TimeWindow(20L, 32L), matched.get(20L)); } @Test - public void tumblineWindows() { - - TumblingWindows windows = TumblingWindows.of("test").with(12L); - - Map matched = windows.windowsFor(21L); - + public void tumblingWindows() { + TimeWindows windows = TimeWindows.of("test", 12L); + Map matched = windows.windowsFor(21L); assertEquals(1, matched.size()); - - assertEquals(new TumblingWindow(12L, 24L), matched.get(12L)); + assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); } @Test public void unlimitedWindows() { - UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L); - Map matched = windows.windowsFor(21L); - assertEquals(1, matched.size()); - assertEquals(new UnlimitedWindow(10L), matched.get(10L)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index 95e0fbfe3173a..733c1ea33e6e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.TumblingWindows; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.UnlimitedWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; @@ -207,7 +207,7 @@ public Double apply(Long value1, Long value2) { // windowed count data.countByKey( - TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE), + TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE), stringSerde ).toStream().map( new KeyValueMapper, Long, KeyValue>() { From 41905113fe167ed5f8c328b45395e3cd04370a9d Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Sat, 23 Apr 2016 13:59:27 -0700 Subject: [PATCH 2/5] TimeWindows: fix windowsFor() by honoring hop size --- .../apache/kafka/streams/kstream/TimeWindows.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index a6b11fdeaaf2b..20f71cc514de8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -89,10 +89,17 @@ public TimeWindows shiftedBy(long hop) { // TODO: Document windowsFor() @Override public Map windowsFor(long timestamp) { - long windowStart = timestamp - timestamp % this.size; - // We cannot use Collections.singleMap since it does not support remove() call. + long enclosed = (size - 1) / hop; + long windowStart = Math.max(0, timestamp - timestamp % hop - enclosed * hop); + Map windows = new HashMap<>(); - windows.put(windowStart, new TimeWindow(windowStart, windowStart + this.size)); + while (windowStart <= timestamp) { + // add the window + TimeWindow window = new TimeWindow(windowStart, windowStart + this.size); + windows.put(windowStart, window); + // hop forward + windowStart += this.hop; + } return windows; } From 94fe799f366b69d9e323ad1bba6cb368b7414897 Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Sun, 24 Apr 2016 10:34:21 -0700 Subject: [PATCH 3/5] Simplify asserts --- .../streams/kstream/TimeWindowsTest.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 2708d382e0d6a..fab8313910c53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -22,6 +22,9 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; public class TimeWindowsTest { @@ -32,31 +35,31 @@ public void shouldHaveSaneEqualsAndHashCode() { TimeWindows w2 = TimeWindows.of("w2", w1.size); // Reflexive - assertEquals(true, w1.equals(w1)); - assertEquals(true, w1.hashCode() == w1.hashCode()); + assertTrue(w1.equals(w1)); + assertTrue(w1.hashCode() == w1.hashCode()); // Symmetric - assertEquals(true, w1.equals(w2)); - assertEquals(true, w1.hashCode() == w2.hashCode()); - assertEquals(true, w2.hashCode() == w1.hashCode()); + assertTrue(w1.equals(w2)); + assertTrue(w1.hashCode() == w2.hashCode()); + assertTrue(w2.hashCode() == w1.hashCode()); // Transitive TimeWindows w3 = TimeWindows.of("w3", w2.size); - assertEquals(true, w2.equals(w3)); - assertEquals(true, w2.hashCode() == w3.hashCode()); - assertEquals(true, w1.equals(w3)); - assertEquals(true, w1.hashCode() == w3.hashCode()); + assertTrue(w2.equals(w3)); + assertTrue(w2.hashCode() == w3.hashCode()); + assertTrue(w1.equals(w3)); + assertTrue(w1.hashCode() == w3.hashCode()); // Inequality scenarios - assertEquals("must be false for null", false, w1.equals(null)); - assertEquals("must be false for different window types", false, w1.equals(UnlimitedWindows.of("irrelevant"))); - assertEquals("must be false for different types", false, w1.equals(new Object())); + assertFalse("must be false for null", w1.equals(null)); + assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant"))); + assertFalse("must be false for different types", w1.equals(new Object())); TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1); - assertEquals("must be false when window sizes are different", false, w1.equals(differentWindowSize)); + assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize)); TimeWindows differentHopSize = w1.shiftedBy(w1.hop - 1); - assertEquals("must be false when hop sizes are different", false, w1.equals(differentHopSize)); + assertFalse("must be false when hop sizes are different", w1.equals(differentHopSize)); } } \ No newline at end of file From 830b338b5a079e49e483d34bd55e47e423394a1f Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Sun, 24 Apr 2016 11:16:17 -0700 Subject: [PATCH 4/5] Improve unit tests and docstrings --- .../kafka/streams/kstream/TimeWindows.java | 9 ++- .../streams/kstream/UnlimitedWindows.java | 7 +- .../streams/kstream/TimeWindowsTest.java | 64 ++++++++++++++++++- .../streams/kstream/UnlimitedWindowsTest.java | 43 +++++++++++++ .../kstream/internals/WindowsTest.java | 57 ----------------- 5 files changed, 112 insertions(+), 68 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java delete mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java index 20f71cc514de8..0aefc8a44126b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java @@ -86,7 +86,12 @@ public TimeWindows shiftedBy(long hop) { return new TimeWindows(this.name, this.size, hop); } - // TODO: Document windowsFor() + /** + * Returns the windows that contain the provided timestamp. + * + * @param timestamp the timestamp + * @return a map of (windowStartTimestamp, window) entries + */ @Override public Map windowsFor(long timestamp) { long enclosed = (size - 1) / hop; @@ -94,10 +99,8 @@ public Map windowsFor(long timestamp) { Map windows = new HashMap<>(); while (windowStart <= timestamp) { - // add the window TimeWindow window = new TimeWindow(windowStart, windowStart + this.size); windows.put(windowStart, window); - // hop forward windowStart += this.hop; } return windows; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 7a9a60249db02..eea035f597238 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -50,13 +50,10 @@ public UnlimitedWindows startOn(long start) { @Override public Map windowsFor(long timestamp) { - // always return the single unlimited window - - // we cannot use Collections.singleMap since it does not support remove() call + // Always return the single unlimited window. + // We cannot use Collections.singleMap since it does not support remove(). Map windows = new HashMap<>(); windows.put(start, new UnlimitedWindow(start)); - - return windows; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index fab8313910c53..ef6152ab4fa24 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -19,19 +19,23 @@ package org.apache.kafka.streams.kstream; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Test; +import java.util.Map; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; public class TimeWindowsTest { + private static String ANY_NAME = "window"; + private static long ANY_SIZE = 123L; + @Test public void shouldHaveSaneEqualsAndHashCode() { - long arbitrarySize = 10L; - TimeWindows w1 = TimeWindows.of("w1", arbitrarySize); + TimeWindows w1 = TimeWindows.of("w1", ANY_SIZE); TimeWindows w2 = TimeWindows.of("w2", w1.size); // Reflexive @@ -62,4 +66,58 @@ public void shouldHaveSaneEqualsAndHashCode() { assertFalse("must be false when hop sizes are different", w1.equals(differentHopSize)); } + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeEmpty() { + TimeWindows.of("", ANY_SIZE); + } + + @Test(expected = IllegalArgumentException.class) + public void nameMustNotBeNull() { + TimeWindows.of(null, ANY_SIZE); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeNegative() { + TimeWindows.of(ANY_NAME, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void windowSizeMustNotBeZero() { + TimeWindows.of(ANY_NAME, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void hopSizeMustNotBeNegative() { + TimeWindows.of(ANY_NAME, ANY_SIZE).shiftedBy(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void hopSizeMustNotBeZero() { + TimeWindows.of(ANY_NAME, ANY_SIZE).shiftedBy(0); + } + + @Test(expected = IllegalArgumentException.class) + public void hopSizeMustNotBeLargerThanWindowSize() { + long size = ANY_SIZE; + TimeWindows.of(ANY_NAME, size).shiftedBy(size + 1); + } + + @Test + public void windowsForHoppingWindows() { + TimeWindows windows = TimeWindows.of(ANY_NAME, 12L).shiftedBy(5L); + Map matched = windows.windowsFor(21L); + assertEquals(12L / 5L + 1, matched.size()); + assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); + assertEquals(new TimeWindow(15L, 27L), matched.get(15L)); + assertEquals(new TimeWindow(20L, 32L), matched.get(20L)); + } + + @Test + public void windowsForTumblingWindows() { + TimeWindows windows = TimeWindows.of(ANY_NAME, 12L); + Map matched = windows.windowsFor(21L); + assertEquals(1, matched.size()); + assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); + } + } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java new file mode 100644 index 0000000000000..b7a20b208f496 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class UnlimitedWindowsTest { + + private static String ANY_NAME = "window"; + + @Test + public void unlimitedWindows() { + long startTime = 10L; + UnlimitedWindows w = UnlimitedWindows.of(ANY_NAME).startOn(startTime); + + Map matchedWindows1 = w.windowsFor(startTime + 11L); + assertEquals(1, matchedWindows1.size()); + assertEquals(new UnlimitedWindow(startTime), matchedWindows1.get(startTime)); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java deleted file mode 100644 index 4eb1a1b568515..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.UnlimitedWindows; -import org.junit.Test; - -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class WindowsTest { - - @Test - public void hoppingWindows() { - TimeWindows windows = TimeWindows.of("test", 12L).shiftedBy(5L); - Map matched = windows.windowsFor(21L); - assertEquals(3, matched.size()); - assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); - assertEquals(new TimeWindow(15L, 27L), matched.get(15L)); - assertEquals(new TimeWindow(20L, 32L), matched.get(20L)); - } - - @Test - public void tumblingWindows() { - TimeWindows windows = TimeWindows.of("test", 12L); - Map matched = windows.windowsFor(21L); - assertEquals(1, matched.size()); - assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); - } - - @Test - public void unlimitedWindows() { - UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L); - Map matched = windows.windowsFor(21L); - assertEquals(1, matched.size()); - assertEquals(new UnlimitedWindow(10L), matched.get(10L)); - } -} From 26ac68982081f65981b6101043363236e4533142 Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Sun, 24 Apr 2016 17:30:12 -0700 Subject: [PATCH 5/5] Rename static vars to make checkstyle happy --- .../streams/kstream/TimeWindowsTest.java | 26 +++++++++---------- .../streams/kstream/UnlimitedWindowsTest.java | 4 +-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index ef6152ab4fa24..ffde37c183045 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -30,12 +30,12 @@ public class TimeWindowsTest { - private static String ANY_NAME = "window"; - private static long ANY_SIZE = 123L; + private static String anyName = "window"; + private static long anySize = 123L; @Test public void shouldHaveSaneEqualsAndHashCode() { - TimeWindows w1 = TimeWindows.of("w1", ANY_SIZE); + TimeWindows w1 = TimeWindows.of("w1", anySize); TimeWindows w2 = TimeWindows.of("w2", w1.size); // Reflexive @@ -68,43 +68,43 @@ public void shouldHaveSaneEqualsAndHashCode() { @Test(expected = IllegalArgumentException.class) public void nameMustNotBeEmpty() { - TimeWindows.of("", ANY_SIZE); + TimeWindows.of("", anySize); } @Test(expected = IllegalArgumentException.class) public void nameMustNotBeNull() { - TimeWindows.of(null, ANY_SIZE); + TimeWindows.of(null, anySize); } @Test(expected = IllegalArgumentException.class) public void windowSizeMustNotBeNegative() { - TimeWindows.of(ANY_NAME, -1); + TimeWindows.of(anyName, -1); } @Test(expected = IllegalArgumentException.class) public void windowSizeMustNotBeZero() { - TimeWindows.of(ANY_NAME, 0); + TimeWindows.of(anyName, 0); } @Test(expected = IllegalArgumentException.class) public void hopSizeMustNotBeNegative() { - TimeWindows.of(ANY_NAME, ANY_SIZE).shiftedBy(-1); + TimeWindows.of(anyName, anySize).shiftedBy(-1); } @Test(expected = IllegalArgumentException.class) public void hopSizeMustNotBeZero() { - TimeWindows.of(ANY_NAME, ANY_SIZE).shiftedBy(0); + TimeWindows.of(anyName, anySize).shiftedBy(0); } @Test(expected = IllegalArgumentException.class) public void hopSizeMustNotBeLargerThanWindowSize() { - long size = ANY_SIZE; - TimeWindows.of(ANY_NAME, size).shiftedBy(size + 1); + long size = anySize; + TimeWindows.of(anyName, size).shiftedBy(size + 1); } @Test public void windowsForHoppingWindows() { - TimeWindows windows = TimeWindows.of(ANY_NAME, 12L).shiftedBy(5L); + TimeWindows windows = TimeWindows.of(anyName, 12L).shiftedBy(5L); Map matched = windows.windowsFor(21L); assertEquals(12L / 5L + 1, matched.size()); assertEquals(new TimeWindow(10L, 22L), matched.get(10L)); @@ -114,7 +114,7 @@ public void windowsForHoppingWindows() { @Test public void windowsForTumblingWindows() { - TimeWindows windows = TimeWindows.of(ANY_NAME, 12L); + TimeWindows windows = TimeWindows.of(anyName, 12L); Map matched = windows.windowsFor(21L); assertEquals(1, matched.size()); assertEquals(new TimeWindow(12L, 24L), matched.get(12L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java index b7a20b208f496..a1c5b534e5660 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java @@ -28,12 +28,12 @@ public class UnlimitedWindowsTest { - private static String ANY_NAME = "window"; + private static String anyName = "window"; @Test public void unlimitedWindows() { long startTime = 10L; - UnlimitedWindows w = UnlimitedWindows.of(ANY_NAME).startOn(startTime); + UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(startTime); Map matchedWindows1 = w.windowsFor(startTime + 11L); assertEquals(1, matchedWindows1.size());