Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package org.apache.kafka.streams.state;

import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;

import java.time.Instant;

/**
* A windowed store interface extending {@link StateStore}.
*
Expand Down Expand Up @@ -87,6 +90,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
*/
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);

@Override
default WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}

/**
* Get all the key-value pairs in the given key range and time range from all the existing windows.
* <p>
Expand All @@ -102,6 +112,13 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
*/
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);

@Override
default KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) {
ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
ApiUtils.validateMillisecondInstant(toTime, "toTime");
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}

/**
* Gets all the key-value pairs that belong to the windows within in the given time range.
*
Expand All @@ -112,4 +129,11 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @throws NullPointerException if {@code null} is used for any key
*/
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

@Override
default KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -205,13 +203,6 @@ public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key, final lon
return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator);
}

@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) {
// since this function may not access the underlying inner store, we need to validate
Expand Down Expand Up @@ -241,16 +232,6 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final B
);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final Instant fromTime,
final Instant toTime) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
ApiUtils.validateMillisecondInstant(toTime, "toTime");
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}

private V fetchPrevious(final Bytes key, final long timestamp) {
final byte[] value = underlying.fetch(key, timestamp);
if (value != null) {
Expand Down Expand Up @@ -294,11 +275,4 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, f
cacheFunction
);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -58,28 +56,11 @@ public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final
return bytesStore.fetch(key, from, to);
}

@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) {
return bytesStore.fetch(keyFrom, keyTo, from, to);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final Instant fromTime,
final Instant toTime) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
ApiUtils.validateMillisecondInstant(toTime, "toTime");
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
return bytesStore.all();
Expand All @@ -90,13 +71,6 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, f
return bytesStore.fetchAll(timeFrom, timeTo);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}

@Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -149,13 +147,6 @@ public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long
time);
}

@Override
public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time);
Expand All @@ -170,13 +161,6 @@ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long
time);
}

@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
Expand All @@ -186,13 +170,6 @@ public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final lo
time);
}

@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
ApiUtils.validateMillisecondInstant(toTime, "toTime");
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}

@Override
public void flush() {
final long startNs = time.nanoseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;

import java.time.Instant;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
Expand Down Expand Up @@ -93,26 +91,12 @@ public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator();
}

@Override
public WindowStoreIterator<V> fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetch(key, from.toEpochMilli(), to.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo);
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}

@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(fromTime, "fromTime");
ApiUtils.validateMillisecondInstant(toTime, "toTime");
return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli());
}

@Override
public KeyValueIterator<Windowed<K>, V> all() {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
Expand All @@ -125,13 +109,6 @@ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}

@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException {
ApiUtils.validateMillisecondInstant(from, "from");
ApiUtils.validateMillisecondInstant(to, "to");
return fetchAll(from.toEpochMilli(), to.toEpochMilli());
}

private void maybeUpdateSeqnumForDups() {
if (retainDuplicates) {
seqnum = (seqnum + 1) & 0x7FFFFFFF;
Expand Down