Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -160,7 +160,7 @@ public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion vi
return new KeyValue<>(viewRegion.region, viewRegion);
}
})
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
.countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is out of the scope of this PR but may just be OK do to together: since now we have added the toStream(map) function to KTable, we can actually get rid of this TODO.

.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
Expand Down Expand Up @@ -99,7 +99,7 @@ public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) {
return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
}
})
.countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
.countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Iterable<String> apply(String value) {
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(value, value);
return new KeyValue<>(value, value);
}
})
.countByKey("Counts");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.kafka.streams.kstream;


import org.apache.kafka.streams.kstream.internals.TumblingWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;

import java.util.Map;

/**
* The window specifications used for joins.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extend the JavaDoc? Or should this be done in #1250 ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was waiting for #1250 and intentionally didn't touch javadocs in this PR where I could avoid it.

*/
public class JoinWindows extends Windows<TumblingWindow> {
public class JoinWindows extends Windows<TimeWindow> {

public final long before;
public final long after;
Expand Down Expand Up @@ -74,19 +73,29 @@ public JoinWindows after(long timeDifference) {
}

@Override
public Map<Long, TumblingWindow> windowsFor(long timestamp) {
public Map<Long, TimeWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}

@Override
public boolean equalTo(Windows other) {
if (!other.getClass().equals(JoinWindows.class))
public final boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof JoinWindows)) {
return false;
}

JoinWindows otherWindows = (JoinWindows) other;
JoinWindows other = (JoinWindows) o;
return this.before == other.before && this.after == other.after;
}

return this.before == otherWindows.before && this.after == otherWindows.after;
@Override
public int hashCode() {
int result = (int) (before ^ (before >>> 32));
result = 31 * result + (int) (after ^ (after >>> 32));
return result;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.kstream;

import org.apache.kafka.streams.kstream.internals.TimeWindow;

import java.util.HashMap;
import java.util.Map;

/**
* The time-based window specifications used for aggregations.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be more precise. See discussion about HoppingWindow and TumblindWindow in #1250

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, I intentionally didn't touch that because your PR at #1250 is coming up.

*/
public class TimeWindows extends Windows<TimeWindow> {

/**
* The size of the window, i.e. how long a window lasts.
* The window size's effective time unit is determined by the semantics of the topology's
* configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long size;

/**
* The size of the window's advance interval, i.e. by how much a window moves forward relative
* to the previous one. The interval's effective time unit is determined by the semantics of
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove double blank.

* the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
*/
public final long advance;

private TimeWindows(String name, long size, long advance) {
super(name);
if (size <= 0) {
throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")");
}
this.size = size;
if (!(0 < advance && advance <= size)) {
throw new IllegalArgumentException(
String.format("advance interval (%d) must lie within interval (0, %d]", advance, size));
}
this.advance = advance;
}

/**
* Returns a window definition with the given window size, and with the advance interval being
* equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove double blank.

* window.
*
* This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
* non-overlapping windows. Tumbling windows are a specialization of hopping windows.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove double blank.

*
* @param name The name of the window. Must not be null or empty.
* @param size The size of the window, with the requirement that size &gt; 0.
* The window size's effective time unit is determined by the semantics of the
* topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
*/
public static TimeWindows of(String name, long size) {
return new TimeWindows(name, size, size);
}

/**
* Returns a window definition with the original size, but advance ("hop") the window by the given
* interval, which specifies by how much a window moves forward relative to the previous one.
* Think: [N * advanceInterval, N * advanceInterval + size), with N denoting the N-th window.
*
* This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
*
* @param interval The advance interval ("hop") of the window, with the requirement that
* 0 &lt; interval &le; size. The interval's effective time unit is
* determined by the semantics of the topology's configured
* {@link org.apache.kafka.streams.processor.TimestampExtractor}.
* @return a new window definition
*/
public TimeWindows advanceBy(long interval) {
return new TimeWindows(this.name, this.size, interval);
}

@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
long enclosed = (size - 1) / advance;
long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance);

Map<Long, TimeWindow> windows = new HashMap<>();
while (windowStart <= timestamp) {
TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);
windows.put(windowStart, window);
windowStart += this.advance;
}
return windows;
}

@Override
public final boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof TimeWindows)) {
return false;
}
TimeWindows other = (TimeWindows) o;
return this.size == other.size && this.advance == other.advance;
}

@Override
public int hashCode() {
int result = (int) (size ^ (size >>> 32));
result = 31 * result + (int) (advance ^ (advance >>> 32));
return result;
}

}
Loading