Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,27 @@
*/
public abstract class Window {

private long start;
private long end;
protected final long start;
protected final long end;

/**
* Create a new window for the given start time (inclusive) and end time (exclusive).
*
* @param start the start timestamp of the window (inclusive)
* @param end the end timestamp of the window (exclusive)
* @throws IllegalArgumentException if {@code start} or {@code end} is negative or if {@code end} is smaller than
* {@code start}
*/
public Window(long start, long end) {
public Window(long start, long end) throws IllegalArgumentException {
if (start < 0) {
throw new IllegalArgumentException("Window start time cannot be negative.");
}
if (end < 0) {
throw new IllegalArgumentException("Window end time cannot be negative.");
}
if (end < start) {
throw new IllegalArgumentException("Window end time cannot be smaller than window start time.");
}
this.start = start;
this.end = end;
}
Expand All @@ -56,9 +67,7 @@ public long end() {
* @param other another window
* @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
*/
public boolean overlap(Window other) {
return this.start() < other.end() || other.start() < this.end();
}
public abstract boolean overlap(Window other);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be abstract? Can't we just have the impl of it here? i.e., the SessionWindow and TimeWindow implementations are identical.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be abstract because Window does not know if boundaries will be inclusive or exclusive. And implementation for SessionWindow and TimeWindow are different IMHO -- or do I miss anything -- at least, both should be different because SessionWindow does have inclusive end time while TimeWindow has exclusive end time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. np


@Override
public boolean equals(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void process(final K key, final V value) {

final long timestamp = context().timestamp();
final List<KeyValue<Windowed<K>, T>> merged = new ArrayList<>();
final TimeWindow newTimeWindow = new TimeWindow(timestamp, timestamp);
TimeWindow mergedWindow = newTimeWindow;
final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp);
SessionWindow mergedWindow = newSessionWindow;
T agg = initializer.apply();

try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(key, timestamp - windows.inactivityGap(),
Expand All @@ -98,13 +98,13 @@ public void process(final K key, final V value) {
final KeyValue<Windowed<K>, T> next = iterator.next();
merged.add(next);
agg = sessionMerger.apply(key, agg, next.value);
mergedWindow = mergeTimeWindow(mergedWindow, (TimeWindow) next.key.window());
mergedWindow = mergeSessionWindow(mergedWindow, (SessionWindow) next.key.window());
}
}

agg = aggregator.apply(key, value, agg);
final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
if (!mergedWindow.equals(newTimeWindow)) {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<K>, T> session : merged) {
store.remove(session.key);
tupleForwarder.maybeForward(session.key, null, session.value);
Expand All @@ -117,10 +117,10 @@ public void process(final K key, final V value) {
}


private TimeWindow mergeTimeWindow(final TimeWindow one, final TimeWindow two) {
private SessionWindow mergeSessionWindow(final SessionWindow one, final SessionWindow two) {
final long start = one.start() < two.start() ? one.start() : two.start();
final long end = one.end() > two.end() ? one.end() : two.end();
return new TimeWindow(start, end);
return new SessionWindow(start, end);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static <K> Windowed<K> from(final byte[] binaryKey, final Deserializer<K>
final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
return new Windowed<>(key, new TimeWindow(start, end));
return new Windowed<>(key, new SessionWindow(start, end));
}

private static <K> K extractKey(final byte[] binaryKey, Deserializer<K> deserializer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.Window;

/**
* A session window covers a closed time interval with its start and end timestamp both being an inclusive boundary.
*
* @see TimeWindow
* @see UnlimitedWindow
* @see org.apache.kafka.streams.kstream.SessionWindows
*/
public final class SessionWindow extends Window {

/**
* Create a new window for the given start time and end time (both inclusive).
*
* @param start the start timestamp of the window
* @param end the end timestamp of the window
*/
public SessionWindow(final long start, final long end) {
super(start, end);
}

/**
* Check if the given window overlaps with this window.
*
* @param other another window
* @return {@code true} if {@code other} overlaps with this window&mdash;{@code false} otherwise
* @throws IllegalArgumentException if the {@code other} window has a different type than {@link this} window
*/
public boolean overlap(final Window other) throws IllegalArgumentException {
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
+ other.getClass());
}
final SessionWindow otherWindow = (SessionWindow) other;
return !(otherWindow.end < start || end < otherWindow.start);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ public TimeWindow(long start, long end) {

@Override
public boolean overlap(Window other) {
return getClass() == other.getClass() && super.overlap(other);
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
+ other.getClass());
}
final TimeWindow otherWindow = (TimeWindow) other;
return start < otherWindow.end && otherWindow.start < end;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ public UnlimitedWindow(long start) {

@Override
public boolean overlap(Window other) {
return getClass() == other.getClass() && super.overlap(other);
if (getClass() != other.getClass()) {
throw new IllegalArgumentException("Cannot compare windows of different type. Other window has type "
+ other.getClass());
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;

import java.util.List;
Expand All @@ -30,13 +30,13 @@ class SessionKeySchema implements SegmentedBytesStore.KeySchema {

@Override
public Bytes upperRange(final Bytes key, final long to) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new TimeWindow(to, Long.MAX_VALUE));
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(to, Long.MAX_VALUE));
return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
}

@Override
public Bytes lowerRange(final Bytes key, final long from) {
final Windowed<Bytes> sessionKey = new Windowed<>(key, new TimeWindow(0, Math.max(0, from)));
final Windowed<Bytes> sessionKey = new Windowed<>(key, new SessionWindow(0, Math.max(0, from)));
return SessionKeySerde.toBinary(sessionKey, Serdes.Bytes().serializer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
Expand Down Expand Up @@ -506,13 +506,13 @@ public void apply(final Windowed<String> key, final Long value) {

startStreams();
latch.await(30, TimeUnit.SECONDS);
assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo(1L));
assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo(2L));
assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo(2L));
assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo(1L));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L));
assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(2L));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(1L));
}

@Test
Expand Down Expand Up @@ -601,18 +601,18 @@ public void apply(final Windowed<String> key, final String value) {
= kafkaStreams.store(userSessionsStore, QueryableStoreTypes.<String, String>sessionStore());

// verify correct data received
assertThat(results.get(new Windowed<>("bob", new TimeWindow(t1, t1))), equalTo("start"));
assertThat(results.get(new Windowed<>("penny", new TimeWindow(t1, t1))), equalTo("start"));
assertThat(results.get(new Windowed<>("jo", new TimeWindow(t1, t1))), equalTo("pause"));
assertThat(results.get(new Windowed<>("jo", new TimeWindow(t4, t4))), equalTo("resume"));
assertThat(results.get(new Windowed<>("emily", new TimeWindow(t1, t2))), equalTo("pause:resume"));
assertThat(results.get(new Windowed<>("bob", new TimeWindow(t3, t4))), equalTo("pause:resume"));
assertThat(results.get(new Windowed<>("penny", new TimeWindow(t3, t3))), equalTo("stop"));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start"));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo("start"));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo("pause"));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo("resume"));
assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo("pause:resume"));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo("pause:resume"));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo("stop"));

// verify can query data via IQ
final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob");
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t1, t1)), "start")));
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new TimeWindow(t3, t4)), "pause:resume")));
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume")));
assertFalse(bob.hasNext());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public void apply(final Windowed<String> key, final Integer value) {
driver.setTime(100);
driver.process(TOPIC, "1", "1");
driver.flushState();
assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30))));
assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15))));
assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100))));
assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}

@Test
Expand All @@ -202,9 +202,9 @@ public void apply(final Windowed<String> key, final Long value) {
driver.setTime(100);
driver.process(TOPIC, "1", "1");
driver.flushState();
assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new TimeWindow(10, 30))));
assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new TimeWindow(15, 15))));
assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new TimeWindow(70, 100))));
assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}

@Test
Expand Down Expand Up @@ -238,9 +238,9 @@ public void apply(final Windowed<String> key, final String value) {
driver.setTime(100);
driver.process(TOPIC, "1", "C");
driver.flushState();
assertEquals("A:B", results.get(new Windowed<>("1", new TimeWindow(10, 30))));
assertEquals("Z", results.get(new Windowed<>("2", new TimeWindow(15, 15))));
assertEquals("A:B:C", results.get(new Windowed<>("1", new TimeWindow(70, 100))));
assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30))));
assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15))));
assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100))));
}

@Test(expected = NullPointerException.class)
Expand Down
Loading