From 12cbe22a2f00effb82d1556e5e4d219a6da9e002 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 10 Jan 2017 17:06:53 -0800 Subject: [PATCH 1/3] KAFKA-3452: follow-up -- introduce SesssionWindows - TimeWindows represent half-open time intervals while SessionWindows represent closed time intervals --- .../apache/kafka/streams/kstream/Window.java | 8 +- .../KStreamSessionWindowAggregate.java | 12 +-- .../kstream/internals/SessionKeySerde.java | 2 +- .../kstream/internals/SessionWindow.java | 62 +++++++++++++ .../streams/kstream/internals/TimeWindow.java | 7 +- .../kstream/internals/UnlimitedWindow.java | 6 +- .../state/internals/SessionKeySchema.java | 6 +- .../KStreamAggregationIntegrationTest.java | 34 ++++---- .../internals/KGroupedStreamImplTest.java | 18 ++-- ...amSessionWindowAggregateProcessorTest.java | 40 ++++----- .../internals/SessionKeySerdeTest.java | 10 +-- .../kstream/internals/SessionWindowTest.java | 87 +++++++++++++++++++ .../internals/CachingSessionStoreTest.java | 42 ++++----- .../RocksDBSegmentedBytesStoreTest.java | 44 +++++----- .../internals/RocksDBSessionStoreTest.java | 46 +++++----- 15 files changed, 290 insertions(+), 134 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index 7d78d7420d114..9dc607f5f69b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -22,8 +22,8 @@ */ public abstract class Window { - private long start; - private long end; + protected long start; + protected long end; /** * Create a new window for the given start time (inclusive) and end time (exclusive). @@ -56,9 +56,7 @@ public long end() { * @param other another window * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise */ - public boolean overlap(Window other) { - return this.start() < other.end() || other.start() < this.end(); - } + public abstract boolean overlap(Window other); @Override public boolean equals(Object obj) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index bb86f52b20923..70b2b90095465 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -88,8 +88,8 @@ public void process(final K key, final V value) { final long timestamp = context().timestamp(); final List, T>> merged = new ArrayList<>(); - final TimeWindow newTimeWindow = new TimeWindow(timestamp, timestamp); - TimeWindow mergedWindow = newTimeWindow; + final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); + SessionWindow mergedWindow = newSessionWindow; T agg = initializer.apply(); try (final KeyValueIterator, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(), @@ -98,13 +98,13 @@ public void process(final K key, final V value) { final KeyValue, T> next = iterator.next(); merged.add(next); agg = sessionMerger.apply(key, agg, next.value); - mergedWindow = mergeTimeWindow(mergedWindow, (TimeWindow) next.key.window()); + mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window()); } } agg = aggregator.apply(key, value, agg); final Windowed sessionKey = new Windowed<>(key, mergedWindow); - if (!mergedWindow.equals(newTimeWindow)) { + if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue, T> session : merged) { store.remove(session.key); tupleForwarder.maybeForward(session.key, null, session.value); @@ -117,10 +117,10 @@ public void process(final K key, final V value) { } - private TimeWindow mergeTimeWindow(final TimeWindow one, final TimeWindow two) { + private SessionWindow mergeSessionWindow(final SessionWindow one, final SessionWindow two) { final long start = one.start() < two.start() ? one.start() : two.start(); final long end = one.end() > two.end() ? one.end() : two.end(); - return new TimeWindow(start, end); + return new SessionWindow(start, end); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 165d5c60c232a..48213d67da153 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -131,7 +131,7 @@ public static Windowed from(final byte[] binaryKey, final Deserializer final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); - return new Windowed<>(key, new TimeWindow(start, end)); + return new Windowed<>(key, new SessionWindow(start, end)); } private static K extractKey(final byte[] binaryKey, Deserializer deserializer) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java new file mode 100644 index 0000000000000..d31e7cf2f743d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Window; + +/** + * A session window covers a closed time interval with its start and end timestamp both being an inclusive boundary. + * + * @see TimeWindow + * @see UnlimitedWindow + * @see org.apache.kafka.streams.kstream.SessionWindows + */ +public final class SessionWindow extends Window { + + /** + * Create a new window for the given start time and end time (both inclusive). + * + * @param start the start timestamp of the window + * @param end the end timestamp of the window + * @throws IllegalArgumentException if {@code end} is smaller than {@code start} + */ + public SessionWindow(final long start, final long end) { + super(start, end); + if (end < start) { + throw new IllegalArgumentException("Window end time cannot be smaller than window start time."); + } + } + + /** + * Check if the given window overlaps with this window. + * + * @param other another window + * @return {@code true} if {@code other} overlaps with this window—{@code false} otherwise + * @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window + */ + public boolean overlap(final Window other) throws IllegalArgumentException { + if (getClass() != other.getClass()) { + throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + + other.getClass()); + } + final SessionWindow otherWindow = (SessionWindow) other; + return !(otherWindow.end < start || end < otherWindow.start); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java index 5dfb9eb601d85..630821fdb8064 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java @@ -27,7 +27,12 @@ public TimeWindow(long start, long end) { @Override public boolean overlap(Window other) { - return getClass() == other.getClass() && super.overlap(other); + if (getClass() != other.getClass()) { + throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + + other.getClass()); + } + final TimeWindow otherWindow = (TimeWindow) other; + return start < otherWindow.end && otherWindow.start < end; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java index 4b93f9b731d19..e9ec040d0d191 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java @@ -27,7 +27,11 @@ public UnlimitedWindow(long start) { @Override public boolean overlap(Window other) { - return getClass() == other.getClass() && super.overlap(other); + if (getClass() != other.getClass()) { + throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type " + + other.getClass()); + } + return true; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index b15eec92264bc..604abb321382e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; import java.util.List; @@ -30,13 +30,13 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema { @Override public Bytes upperRange(final Bytes key, final long to) { - final Windowed sessionKey = new Windowed<>(key, new TimeWindow(to, Long.MAX_VALUE)); + final Windowed sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE)); return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); } @Override public Bytes lowerRange(final Bytes key, final long from) { - final Windowed sessionKey = new Windowed<>(key, new TimeWindow(0, Math.max(0, from))); + final Windowed sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from))); return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 5cc2a59d9f118..0833f3c8d1fe4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; @@ -506,13 +506,13 @@ public void apply(final Windowed key, final Long value) { startStreams(); latch.await(30, TimeUnit.SECONDS); - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo(1L)); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo(1L)); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo(1L)); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo(1L)); - assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo(2L)); - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo(2L)); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo(1L)); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L)); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(1L)); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L)); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L)); + assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(2L)); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L)); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(1L)); } @Test @@ -601,18 +601,18 @@ public void apply(final Windowed key, final String value) { = kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore()); // verify correct data received - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo("start")); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo("start")); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo("pause")); - assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo("resume")); - assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo("pause:resume")); - assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo("pause:resume")); - assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo("stop")); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start")); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo("start")); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo("pause")); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo("resume")); + assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo("pause:resume")); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo("pause:resume")); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo("stop")); // verify can query data via IQ final KeyValueIterator, String> bob = sessionStore.fetch("bob"); - assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t1, t1)), "start"))); - assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t3, t4)), "pause:resume"))); + assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start"))); + assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume"))); assertFalse(bob.hasNext()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 62dd1d501aca0..729e1905b43d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -173,9 +173,9 @@ public void apply(final Windowed key, final Integer value) { driver.setTime(100); driver.process(TOPIC, "1", "1"); driver.flushState(); - assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30)))); - assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15)))); - assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100)))); + assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30)))); + assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15)))); + assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100)))); } @Test @@ -202,9 +202,9 @@ public void apply(final Windowed key, final Long value) { driver.setTime(100); driver.process(TOPIC, "1", "1"); driver.flushState(); - assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30)))); - assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15)))); - assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100)))); + assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30)))); + assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15)))); + assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100)))); } @Test @@ -238,9 +238,9 @@ public void apply(final Windowed key, final String value) { driver.setTime(100); driver.process(TOPIC, "1", "C"); driver.flushState(); - assertEquals("A:B", results.get(new Windowed<>("1", new TimeWindow(10, 30)))); - assertEquals("Z", results.get(new Windowed<>("2", new TimeWindow(15, 15)))); - assertEquals("A:B:C", results.get(new Windowed<>("1", new TimeWindow(70, 100)))); + assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30)))); + assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15)))); + assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100)))); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 0ecaf3a45ad79..c3368a116a90f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -184,9 +184,9 @@ public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() sessionStore.flush(); assertEquals(Arrays.asList( - KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>(sessionId, new TimeWindow(time, time)), new Change<>(3L, null)) + KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(time, time)), new Change<>(3L, null)) ), results); @@ -200,14 +200,14 @@ public void shouldRemoveMergedSessionsFromStateStore() throws Exception { // first ensure it is in the store final KeyValueIterator, Long> a1 = sessionStore.findSessions("a", 0, 0); - assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a1.next()); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a1.next()); context.setTime(100); processor.process("a", "2"); // a1 from above should have been removed // should have merged session in store final KeyValueIterator, Long> a2 = sessionStore.findSessions("a", 0, 100); - assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L), a2.next()); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 100)), 2L), a2.next()); assertFalse(a2.hasNext()); } @@ -229,13 +229,13 @@ public void shouldHandleMultipleSessionsAndMerging() throws Exception { sessionStore.flush(); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("d", new TimeWindow(0, GAP_MS / 2)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>("b", new TimeWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>("c", new TimeWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null)) + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)), + KeyValue.pair(new Windowed<>("c", new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null)) ), results); } @@ -250,8 +250,8 @@ public void shouldGetAggregatedValuesFromValueGetter() throws Exception { context.setTime(GAP_MS + 1); processor.process("a", "1"); processor.process("a", "2"); - final long t0 = getter.get(new Windowed<>("a", new TimeWindow(0, 0))); - final long t1 = getter.get(new Windowed<>("a", new TimeWindow(GAP_MS + 1, GAP_MS + 1))); + final long t0 = getter.get(new Windowed<>("a", new SessionWindow(0, 0))); + final long t1 = getter.get(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1))); assertEquals(1L, t0); assertEquals(2L, t1); } @@ -266,9 +266,9 @@ public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() throws Except processor.process("b", "1"); processor.process("c", "1"); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("c", new TimeWindow(0, 0)), new Change<>(1L, null))), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null))), results); } @Test @@ -280,9 +280,9 @@ public void shouldImmediatelyForwardRemovedSessionsWhenMerging() throws Exceptio processor.process("a", "1"); context.setTime(5); processor.process("a", "1"); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), new Change<>(null, null)), - KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 5)), new Change<>(2L, null))), results); + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(null, null)), + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 5)), new Change<>(2L, null))), results); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java index 2f5972ca694b9..3a0f490bf6d0c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java @@ -30,7 +30,7 @@ public class SessionKeySerdeTest { @Test public void shouldSerializeDeserialize() throws Exception { - final Windowed key = new Windowed<>(1L, new TimeWindow(10, 100)); + final Windowed key = new Windowed<>(1L, new SessionWindow(10, 100)); final SessionKeySerde serde = new SessionKeySerde<>(Serdes.Long()); final byte[] bytes = serde.serializer().serialize("t", key); final Windowed result = serde.deserializer().deserialize("t", bytes); @@ -57,7 +57,7 @@ public void shouldDeSerializeNullToNull() throws Exception { @Test public void shouldConvertToBinaryAndBack() throws Exception { - final Windowed key = new Windowed<>("key", new TimeWindow(10, 20)); + final Windowed key = new Windowed<>("key", new SessionWindow(10, 20)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); final Windowed result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer()); assertEquals(key, result); @@ -65,21 +65,21 @@ public void shouldConvertToBinaryAndBack() throws Exception { @Test public void shouldExtractEndTimeFromBinary() throws Exception { - final Windowed key = new Windowed<>("key", new TimeWindow(10, 100)); + final Windowed key = new Windowed<>("key", new SessionWindow(10, 100)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); assertEquals(100, SessionKeySerde.extractEnd(serialized.get())); } @Test public void shouldExtractStartTimeFromBinary() throws Exception { - final Windowed key = new Windowed<>("key", new TimeWindow(50, 100)); + final Windowed key = new Windowed<>("key", new SessionWindow(50, 100)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); assertEquals(50, SessionKeySerde.extractStart(serialized.get())); } @Test public void shouldExtractKeyBytesFromBinary() throws Exception { - final Windowed key = new Windowed<>("blah", new TimeWindow(50, 100)); + final Windowed key = new Windowed<>("blah", new SessionWindow(50, 100)); final Bytes serialized = SessionKeySerde.toBinary(key, Serdes.String().serializer()); assertArrayEquals("blah".getBytes(), SessionKeySerde.extractKeyBytes(serialized.get())); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java new file mode 100644 index 0000000000000..424c04d7e4eac --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SessionWindowTest { + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfEndSmallerThanStart() { + new SessionWindow(1, 0); + } + + @Test + public void testOverlap() { + final SessionWindow window = new SessionWindow(50, 100); + + assertFalse(window.overlap(new SessionWindow(0, 25))); + assertFalse(window.overlap(new SessionWindow(0, 49))); + assertTrue(window.overlap(new SessionWindow(0, 50))); + assertTrue(window.overlap(new SessionWindow(0, 75))); + assertTrue(window.overlap(new SessionWindow(0, 99))); + assertTrue(window.overlap(new SessionWindow(0, 100))); + assertTrue(window.overlap(new SessionWindow(0, 101))); + assertTrue(window.overlap(new SessionWindow(0, 150))); + + assertFalse(window.overlap(new SessionWindow(49, 49))); + assertTrue(window.overlap(new SessionWindow(49, 50))); + assertTrue(window.overlap(new SessionWindow(49, 75))); + assertTrue(window.overlap(new SessionWindow(49, 99))); + assertTrue(window.overlap(new SessionWindow(49, 100))); + assertTrue(window.overlap(new SessionWindow(49, 101))); + assertTrue(window.overlap(new SessionWindow(49, 150))); + + assertTrue(window.overlap(new SessionWindow(50, 50))); + assertTrue(window.overlap(new SessionWindow(50, 75))); + assertTrue(window.overlap(new SessionWindow(50, 99))); + assertTrue(window.overlap(new SessionWindow(50, 100))); + assertTrue(window.overlap(new SessionWindow(50, 101))); + assertTrue(window.overlap(new SessionWindow(50, 150))); + + assertTrue(window.overlap(new SessionWindow(75, 75))); + assertTrue(window.overlap(new SessionWindow(75, 99))); + assertTrue(window.overlap(new SessionWindow(75, 100))); + assertTrue(window.overlap(new SessionWindow(75, 101))); + assertTrue(window.overlap(new SessionWindow(75, 150))); + + assertTrue(window.overlap(new SessionWindow(100, 100))); + assertTrue(window.overlap(new SessionWindow(100, 101))); + assertTrue(window.overlap(new SessionWindow(100, 150))); + + assertFalse(window.overlap(new SessionWindow(101, 101))); + assertFalse(window.overlap(new SessionWindow(101, 150))); + + assertFalse(window.overlap(new SessionWindow(125, 150))); + } + + @Test + public void testEquals() { + assertTrue(new SessionWindow(5, 10).equals(new SessionWindow(5, 10))); + assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(0, 10))); + assertFalse(new SessionWindow(7, 10).equals(new SessionWindow(0, 10))); + assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(5, 8))); + assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(5, 15))); + assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(0, 15))); + assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(7, 8))); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index a4e8df3df1eea..c603aa0b93fcf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -70,15 +70,15 @@ public void setUp() throws Exception { @Test public void shouldPutFetchFromCache() throws Exception { - cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); - cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L); - cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("b", new SessionWindow(0, 0)), 1L); final KeyValueIterator, Long> a = cachingStore.findSessions("a", 0, 0); final KeyValueIterator, Long> b = cachingStore.findSessions("b", 0, 0); - assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), a.next()); - assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L), b.next()); + assertEquals(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), a.next()); + assertEquals(KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), 1L), b.next()); assertFalse(a.hasNext()); assertFalse(b.hasNext()); assertEquals(3, cache.size()); @@ -87,16 +87,16 @@ public void shouldPutFetchFromCache() throws Exception { @Test public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { - final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L)); + final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (KeyValue, Long> kv : expected) { cachingStore.put(kv.key, kv.value); } // add one that shouldn't appear in the results - cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L); + cachingStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); final List, Long>> results = toList(cachingStore.fetch("a")); @@ -124,8 +124,8 @@ public void shouldQueryItemsInCacheAndStore() throws Exception { @Test public void shouldRemove() throws Exception { - final Windowed a = new Windowed<>("a", new TimeWindow(0, 0)); - final Windowed b = new Windowed<>("b", new TimeWindow(0, 0)); + final Windowed a = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed b = new Windowed<>("b", new SessionWindow(0, 0)); cachingStore.put(a, 2L); cachingStore.put(b, 2L); cachingStore.flush(); @@ -137,9 +137,9 @@ public void shouldRemove() throws Exception { @Test public void shouldFetchCorrectlyAcrossSegments() throws Exception { - final Windowed a1 = new Windowed<>("a", new TimeWindow(0, 0)); - final Windowed a2 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL)); - final Windowed a3 = new Windowed<>("a", new TimeWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); + final Windowed a1 = new Windowed<>("a", new SessionWindow(0, 0)); + final Windowed a2 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL)); + final Windowed a3 = new Windowed<>("a", new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2)); cachingStore.put(a1, 1L); cachingStore.put(a2, 2L); cachingStore.put(a3, 3L); @@ -153,7 +153,7 @@ public void shouldFetchCorrectlyAcrossSegments() throws Exception { @Test public void shouldClearNamespaceCacheOnClose() throws Exception { - final Windowed a1 = new Windowed<>("a", new TimeWindow(0, 0)); + final Windowed a1 = new Windowed<>("a", new SessionWindow(0, 0)); cachingStore.put(a1, 1L); assertEquals(1, cache.size()); cachingStore.close(); @@ -175,13 +175,13 @@ public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws @Test(expected = InvalidStateStoreException.class) public void shouldThrowIfTryingToRemoveFromClosedCachingStore() throws Exception { cachingStore.close(); - cachingStore.remove(new Windowed<>("a", new TimeWindow(0, 0))); + cachingStore.remove(new Windowed<>("a", new SessionWindow(0, 0))); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowIfTryingToPutIntoClosedCachingStore() throws Exception { cachingStore.close(); - cachingStore.put(new Windowed<>("a", new TimeWindow(0, 0)), 1L); + cachingStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); } private List, Long>> addSessionsUntilOverflow(final String...sessionIds) { @@ -196,11 +196,11 @@ private List, Long>> addSessionsUntilOverflow(final St private void addSingleSession(final String sessionId, final List, Long>> allSessions) { final int timestamp = allSessions.size() * 10; - final Windowed key = new Windowed<>(sessionId, new TimeWindow(timestamp, timestamp)); + final Windowed key = new Windowed<>(sessionId, new SessionWindow(timestamp, timestamp)); final Long value = 1L; cachingStore.put(key, value); allSessions.add(KeyValue.pair(key, value)); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 9ff2762248f6c..7fe490c2b82f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionKeySerde; -import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.MockProcessorContext; @@ -79,13 +79,13 @@ public void close() { @Test public void shouldPutAndFetch() throws Exception { final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(10, 10L))), serializeValue(10L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(500L, 1000L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1500L, 2000L))), serializeValue(100L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(2500L, 3000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(10, 10L))), serializeValue(10L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(500L, 1000L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1500L, 2000L))), serializeValue(100L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(2500L, 3000L))), serializeValue(200L)); - final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(10, 10)), 10L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(500, 1000)), 50L)); + final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(10, 10)), 10L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(500, 1000)), 50L)); final KeyValueIterator values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 1000L); assertEquals(expected, toList(values)); @@ -94,18 +94,18 @@ public void shouldPutAndFetch() throws Exception { @Test public void shouldFindValuesWithinRange() throws Exception { final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L)); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(1000L, 1000L))), serializeValue(10L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(1000L, 1000L))), serializeValue(10L)); final KeyValueIterator results = bytesStore.fetch(Bytes.wrap(key.getBytes()), 1L, 1999L); - assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 10L)), toList(results)); + assertEquals(Collections.singletonList(KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 10L)), toList(results)); } @Test public void shouldRemove() throws Exception { - bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000))), serializeValue(30L)); - bytesStore.put(serializeKey(new Windowed<>("a", new TimeWindow(1500, 2500))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000))), serializeValue(30L)); + bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500, 2500))), serializeValue(50L)); - bytesStore.remove(serializeKey(new Windowed<>("a", new TimeWindow(0, 1000)))); + bytesStore.remove(serializeKey(new Windowed<>("a", new SessionWindow(0, 1000)))); final KeyValueIterator value = bytesStore.fetch(Bytes.wrap("a".getBytes()), 0, 1000L); assertFalse(value.hasNext()); } @@ -115,32 +115,32 @@ public void shouldRollSegments() throws Exception { // just to validate directories final Segments segments = new Segments(storeName, retention, numSegments); final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(0L, 0L))), serializeValue(50L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); assertEquals(Collections.singleton(segments.segmentName(0)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(30000L, 60000L))), serializeValue(100L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(30000L, 60000L))), serializeValue(100L)); assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(61000L, 120000L))), serializeValue(200L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(61000L, 120000L))), serializeValue(200L)); assertEquals(Utils.mkSet(segments.segmentName(0), segments.segmentName(1), segments.segmentName(2)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(121000L, 180000L))), serializeValue(300L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(121000L, 180000L))), serializeValue(300L)); assertEquals(Utils.mkSet(segments.segmentName(1), segments.segmentName(2), segments.segmentName(3)), segmentDirs()); - bytesStore.put(serializeKey(new Windowed<>(key, new TimeWindow(181000L, 240000L))), serializeValue(400L)); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(181000L, 240000L))), serializeValue(400L)); assertEquals(Utils.mkSet(segments.segmentName(2), segments.segmentName(3), segments.segmentName(4)), segmentDirs()); final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 240000)); - assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new TimeWindow(61000L, 120000L)), 200L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(121000L, 180000L)), 300L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(181000L, 240000L)), 400L) + assertEquals(Arrays.asList(KeyValue.pair(new Windowed<>(key, new SessionWindow(61000L, 120000L)), 200L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(121000L, 180000L)), 300L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(181000L, 240000L)), 400L) ), results); } @@ -170,4 +170,4 @@ private List, Long>> toList(final KeyValueIterator a1 = new Windowed<>(key, new TimeWindow(10, 10L)); - final Windowed a2 = new Windowed<>(key, new TimeWindow(500L, 1000L)); + final Windowed a1 = new Windowed<>(key, new SessionWindow(10, 10L)); + final Windowed a2 = new Windowed<>(key, new SessionWindow(500L, 1000L)); sessionStore.put(a1, 1L); sessionStore.put(a2, 2L); - sessionStore.put(new Windowed<>(key, new TimeWindow(1500L, 2000L)), 1L); - sessionStore.put(new Windowed<>(key, new TimeWindow(2500L, 3000L)), 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1500L, 2000L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(2500L, 3000L)), 2L); final List, Long>> expected = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L)); @@ -87,16 +87,16 @@ public void shouldPutAndFindSessionsInRange() throws Exception { @Test public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { - final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(10, 10)), 2L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(100, 100)), 3L), - KeyValue.pair(new Windowed<>("a", new TimeWindow(1000, 1000)), 4L)); + final List, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L), + KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L)); for (KeyValue, Long> kv : expected) { sessionStore.put(kv.key, kv.value); } // add one that shouldn't appear in the results - sessionStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 5L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L); final List, Long>> results = toList(sessionStore.fetch("a")); assertEquals(expected, results); @@ -107,22 +107,22 @@ public void shouldFetchAllSessionsWithSameRecordKey() throws Exception { @Test public void shouldFindValuesWithinMergingSessionWindowRange() throws Exception { final String key = "a"; - sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L); - sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L); + sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L); + sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L); final KeyValueIterator, Long> results = sessionStore.findSessions(key, -1, 1000L); final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L), - KeyValue.pair(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L)); + KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L), + KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L)); assertEquals(expected, toList(results)); } @Test public void shouldRemove() throws Exception { - sessionStore.put(new Windowed<>("a", new TimeWindow(0, 1000)), 1L); - sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1000)), 1L); + sessionStore.put(new Windowed<>("a", new SessionWindow(1500, 2500)), 2L); - sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000))); + sessionStore.remove(new Windowed<>("a", new SessionWindow(0, 1000))); assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext()); assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext()); @@ -130,11 +130,11 @@ public void shouldRemove() throws Exception { @Test public void shouldFindSessionsToMerge() throws Exception { - final Windowed session1 = new Windowed<>("a", new TimeWindow(0, 100)); - final Windowed session2 = new Windowed<>("a", new TimeWindow(101, 200)); - final Windowed session3 = new Windowed<>("a", new TimeWindow(201, 300)); - final Windowed session4 = new Windowed<>("a", new TimeWindow(301, 400)); - final Windowed session5 = new Windowed<>("a", new TimeWindow(401, 500)); + final Windowed session1 = new Windowed<>("a", new SessionWindow(0, 100)); + final Windowed session2 = new Windowed<>("a", new SessionWindow(101, 200)); + final Windowed session3 = new Windowed<>("a", new SessionWindow(201, 300)); + final Windowed session4 = new Windowed<>("a", new SessionWindow(301, 400)); + final Windowed session5 = new Windowed<>("a", new SessionWindow(401, 500)); sessionStore.put(session1, 1L); sessionStore.put(session2, 2L); sessionStore.put(session3, 3L); @@ -155,4 +155,4 @@ static List, Long>> toList(final KeyValueIterator Date: Wed, 11 Jan 2017 09:55:31 -0800 Subject: [PATCH 2/3] Damian's comments --- .../apache/kafka/streams/kstream/Window.java | 17 ++- .../kstream/internals/SessionWindow.java | 4 - .../kstream/internals/SessionWindowTest.java | 127 +++++++++++------- 3 files changed, 96 insertions(+), 52 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index 9dc607f5f69b6..aa513d2f489f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -22,16 +22,27 @@ */ public abstract class Window { - protected long start; - protected long end; + protected final long start; + protected final long end; /** * Create a new window for the given start time (inclusive) and end time (exclusive). * * @param start the start timestamp of the window (inclusive) * @param end the end timestamp of the window (exclusive) + * @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than + * {@code start} */ - public Window(long start, long end) { + public Window(long start, long end) throws IllegalArgumentException{ + if (start < 0) { + throw new IllegalArgumentException("Window start time cannot be negative."); + } + if (end < 0) { + throw new IllegalArgumentException("Window end time cannot be negative."); + } + if (end < start) { + throw new IllegalArgumentException("Window end time cannot be smaller than window start time."); + } this.start = start; this.end = end; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java index d31e7cf2f743d..db63029e04ae7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindow.java @@ -34,13 +34,9 @@ public final class SessionWindow extends Window { * * @param start the start timestamp of the window * @param end the end timestamp of the window - * @throws IllegalArgumentException if {@code end} is smaller than {@code start} */ public SessionWindow(final long start, final long end) { super(start, end); - if (end < start) { - throw new IllegalArgumentException("Window end time cannot be smaller than window start time."); - } } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java index 424c04d7e4eac..2df7741213f6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowTest.java @@ -25,63 +25,100 @@ public class SessionWindowTest { - @Test(expected = IllegalArgumentException.class) - public void shouldThrowIfEndSmallerThanStart() { - new SessionWindow(1, 0); - } + private long start = 50; + private long end = 100; + private final SessionWindow window = new SessionWindow(start, end); + private final TimeWindow timeWindow = new TimeWindow(start, end); @Test - public void testOverlap() { - final SessionWindow window = new SessionWindow(50, 100); - + public void shouldNotOverlapIfOtherWindowIsBeforeThisWindow() { + /* + * This: [-------] + * Other: [---] + */ assertFalse(window.overlap(new SessionWindow(0, 25))); - assertFalse(window.overlap(new SessionWindow(0, 49))); - assertTrue(window.overlap(new SessionWindow(0, 50))); + assertFalse(window.overlap(new SessionWindow(0, start - 1))); + assertFalse(window.overlap(new SessionWindow(start - 1, start - 1))); + } + + @Test + public void shouldOverlapIfOtherWindowEndIsWithinThisWindow() { + /* + * This: [-------] + * Other: [---------] + */ + assertTrue(window.overlap(new SessionWindow(0, start))); + assertTrue(window.overlap(new SessionWindow(0, start + 1))); assertTrue(window.overlap(new SessionWindow(0, 75))); - assertTrue(window.overlap(new SessionWindow(0, 99))); - assertTrue(window.overlap(new SessionWindow(0, 100))); - assertTrue(window.overlap(new SessionWindow(0, 101))); - assertTrue(window.overlap(new SessionWindow(0, 150))); + assertTrue(window.overlap(new SessionWindow(0, end - 1))); + assertTrue(window.overlap(new SessionWindow(0, end))); - assertFalse(window.overlap(new SessionWindow(49, 49))); - assertTrue(window.overlap(new SessionWindow(49, 50))); - assertTrue(window.overlap(new SessionWindow(49, 75))); - assertTrue(window.overlap(new SessionWindow(49, 99))); - assertTrue(window.overlap(new SessionWindow(49, 100))); - assertTrue(window.overlap(new SessionWindow(49, 101))); - assertTrue(window.overlap(new SessionWindow(49, 150))); + assertTrue(window.overlap(new SessionWindow(start - 1, start))); + assertTrue(window.overlap(new SessionWindow(start - 1, start + 1))); + assertTrue(window.overlap(new SessionWindow(start - 1, 75))); + assertTrue(window.overlap(new SessionWindow(start - 1, end - 1))); + assertTrue(window.overlap(new SessionWindow(start - 1, end))); + } - assertTrue(window.overlap(new SessionWindow(50, 50))); - assertTrue(window.overlap(new SessionWindow(50, 75))); - assertTrue(window.overlap(new SessionWindow(50, 99))); - assertTrue(window.overlap(new SessionWindow(50, 100))); - assertTrue(window.overlap(new SessionWindow(50, 101))); - assertTrue(window.overlap(new SessionWindow(50, 150))); + @Test + public void shouldOverlapIfOtherWindowContainsThisWindow() { + /* + * This: [-------] + * Other: [------------------] + */ + assertTrue(window.overlap(new SessionWindow(0, end))); + assertTrue(window.overlap(new SessionWindow(0, end + 1))); + assertTrue(window.overlap(new SessionWindow(0, 150))); - assertTrue(window.overlap(new SessionWindow(75, 75))); - assertTrue(window.overlap(new SessionWindow(75, 99))); - assertTrue(window.overlap(new SessionWindow(75, 100))); - assertTrue(window.overlap(new SessionWindow(75, 101))); - assertTrue(window.overlap(new SessionWindow(75, 150))); + assertTrue(window.overlap(new SessionWindow(start - 1, end))); + assertTrue(window.overlap(new SessionWindow(start - 1, end + 1))); + assertTrue(window.overlap(new SessionWindow(start - 1, 150))); - assertTrue(window.overlap(new SessionWindow(100, 100))); - assertTrue(window.overlap(new SessionWindow(100, 101))); - assertTrue(window.overlap(new SessionWindow(100, 150))); + assertTrue(window.overlap(new SessionWindow(start, end))); + assertTrue(window.overlap(new SessionWindow(start, end + 1))); + assertTrue(window.overlap(new SessionWindow(start, 150))); + } - assertFalse(window.overlap(new SessionWindow(101, 101))); - assertFalse(window.overlap(new SessionWindow(101, 150))); + @Test + public void shouldOverlapIfOtherWindowIsWithinThisWindow() { + /* + * This: [-------] + * Other: [---] + */ + assertTrue(window.overlap(new SessionWindow(start, start))); + assertTrue(window.overlap(new SessionWindow(start, 75))); + assertTrue(window.overlap(new SessionWindow(start, end))); + assertTrue(window.overlap(new SessionWindow(75, end))); + assertTrue(window.overlap(new SessionWindow(end, end))); + } - assertFalse(window.overlap(new SessionWindow(125, 150))); + @Test + public void shouldOverlapIfOtherWindowStartIsWithinThisWindow() { + /* + * This: [-------] + * Other: [-------] + */ + assertTrue(window.overlap(new SessionWindow(start, end + 1))); + assertTrue(window.overlap(new SessionWindow(start, 150))); + assertTrue(window.overlap(new SessionWindow(75, end + 1))); + assertTrue(window.overlap(new SessionWindow(75, 150))); + assertTrue(window.overlap(new SessionWindow(end, end + 1))); + assertTrue(window.overlap(new SessionWindow(end, 150))); } @Test - public void testEquals() { - assertTrue(new SessionWindow(5, 10).equals(new SessionWindow(5, 10))); - assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(0, 10))); - assertFalse(new SessionWindow(7, 10).equals(new SessionWindow(0, 10))); - assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(5, 8))); - assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(5, 15))); - assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(0, 15))); - assertFalse(new SessionWindow(5, 10).equals(new SessionWindow(7, 8))); + public void shouldNotOverlapIsOtherWindowIsAfterThisWindow() { + /* + * This: [-------] + * Other: [---] + */ + assertFalse(window.overlap(new SessionWindow(end + 1, end + 1))); + assertFalse(window.overlap(new SessionWindow(end + 1, 150))); + assertFalse(window.overlap(new SessionWindow(125, 150))); + } + + @Test(expected = IllegalArgumentException.class) + public void cannotCompareSessionWindowWithDifferentWindowType() { + window.overlap(timeWindow); } } \ No newline at end of file From 6c3d5af3a8b6218b884fbf888d0413052d8a88f4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 11 Jan 2017 11:15:40 -0800 Subject: [PATCH 3/3] fix checkstyle --- .../src/main/java/org/apache/kafka/streams/kstream/Window.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java index aa513d2f489f6..13a9529d6bedd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java @@ -33,7 +33,7 @@ public abstract class Window { * @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than * {@code start} */ - public Window(long start, long end) throws IllegalArgumentException{ + public Window(long start, long end) throws IllegalArgumentException { if (start < 0) { throw new IllegalArgumentException("Window start time cannot be negative."); }