Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -160,7 +160,7 @@ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion vi
return new KeyValue<>(viewRegion.region, viewRegion);
}
})
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
.countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).shiftedBy(1000), Serdes.String())
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
Expand Down Expand Up @@ -99,7 +99,7 @@ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
}
})
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
.countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).shiftedBy(1000), Serdes.String())
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.kafka.streams.kstream;


import org.apache.kafka.streams.kstream.internals.TumblingWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;

import java.util.Map;

/**
* The window specifications used for joins.
*/
public class JoinWindows extends Windows<TumblingWindow> {
public class JoinWindows extends Windows<TimeWindow> {

public final long before;
public final long after;
Expand Down Expand Up @@ -74,19 +73,29 @@ public JoinWindows after(long timeDifference) {
}

@Override
public Map<Long, TumblingWindow> windowsFor(long timestamp) {
public Map<Long, TimeWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}

@Override
public boolean equalTo(Windows other) {
if (!other.getClass().equals(JoinWindows.class))
public final boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof JoinWindows)) {
return false;
}

JoinWindows otherWindows = (JoinWindows) other;
JoinWindows other = (JoinWindows) o;
return this.before == other.before && this.after == other.after;
}

return this.before == otherWindows.before && this.after == otherWindows.after;
@Override
public int hashCode() {
int result = (int) (before ^ (before >>> 32));
result = 31 * result + (int) (after ^ (after >>> 32));
return result;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.kstream.internals.TimeWindow;

import java.util.HashMap;
import java.util.Map;

/**
* The time-based window specifications used for aggregations.
*/
public class TimeWindows extends Windows<TimeWindow> {

/**
* The size of the window, i.e. how long a window lasts.
* The window size's effective time unit is determined by the semantics of the topology's
* configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long size;

/**
* The size of the window's hop, i.e. by how much a window moves forward relative to the previous one.
* The hop size's effective time unit is determined by the semantics of the topology's
* configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long hop;

private TimeWindows(String name, long size, long hop) {
super(name);
if (size <= 0) {
throw new IllegalArgumentException("size must be > 0 (you provided " + size + ")");
}
this.size = size;
if (!(0 < hop && hop <= size)) {
throw new IllegalArgumentException(String.format("hop (%d) must lie within interval (0, %d]", hop, size));
}
this.hop = hop;
}

/**
* Returns a window definition with the given window size, and with the hop size being equal to
* the window size. Think: [N * size, N * size + size), with N denoting the N-th window.
*
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
* non-overlapping windows. Tumbling windows are a specialization of hopping windows.
*
* @param name The name of the window. Must not be null or empty.
* @param size The size of the window, with the requirement that size &gt; 0.
* The window size's effective time unit is determined by the semantics of the
* topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
*/
public static TimeWindows of(String name, long size) {
return new TimeWindows(name, size, size);
}

/**
* Returns a window definition with the original size, but shift ("hop") the window by the given
* hop size, which specifies by how much a window moves forward relative to the previous one.
* Think: [N * hop, N * hop + size), with N denoting the N-th window.
*
* This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
*
* @param hop The hop size of the window, with the requirement that 0 &lt; hop &le; size.
* The hop size's effective time unit is determined by the semantics of the
* topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
*/
public TimeWindows shiftedBy(long hop) {
return new TimeWindows(this.name, this.size, hop);
}

/**
* Returns the windows that contain the provided timestamp.
*
* @param timestamp the timestamp
* @return a map of (windowStartTimestamp, window) entries
*/
@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
long enclosed = (size - 1) / hop;
long windowStart = Math.max(0, timestamp - timestamp % hop - enclosed * hop);

Map<Long, TimeWindow> windows = new HashMap<>();
while (windowStart <= timestamp) {
TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);
windows.put(windowStart, window);
windowStart += this.hop;
}
return windows;
}

@Override
public final boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof TimeWindows)) {
return false;
}
TimeWindows other = (TimeWindows) o;
return this.size == other.size && this.hop == other.hop;
}

@Override
public int hashCode() {
int result = (int) (size ^ (size >>> 32));
result = 31 * result + (int) (hop ^ (hop >>> 32));
return result;
}

}

This file was deleted.

Loading