diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index ad74ae1e74d78..50ce386f13f54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -17,9 +17,12 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; +import java.time.Instant; + /** * A windowed store interface extending {@link StateStore}. * @@ -87,6 +90,13 @@ public interface WindowStore extends StateStore, ReadOnlyWindowStore */ WindowStoreIterator fetch(K key, long timeFrom, long timeTo); + @Override + default WindowStoreIterator fetch(final K key, final Instant from, final Instant to) { + ApiUtils.validateMillisecondInstant(from, "from"); + ApiUtils.validateMillisecondInstant(to, "to"); + return fetch(key, from.toEpochMilli(), to.toEpochMilli()); + } + /** * Get all the key-value pairs in the given key range and time range from all the existing windows. *

@@ -102,6 +112,13 @@ public interface WindowStore extends StateStore, ReadOnlyWindowStore */ KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo); + @Override + default KeyValueIterator, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) { + ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); + ApiUtils.validateMillisecondInstant(toTime, "toTime"); + return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); + } + /** * Gets all the key-value pairs that belong to the windows within in the given time range. * @@ -112,4 +129,11 @@ public interface WindowStore extends StateStore, ReadOnlyWindowStore * @throws NullPointerException if {@code null} is used for any key */ KeyValueIterator, V> fetchAll(long timeFrom, long timeTo); + + @Override + default KeyValueIterator, V> fetchAll(final Instant from, final Instant to) { + ApiUtils.validateMillisecondInstant(from, "from"); + ApiUtils.validateMillisecondInstant(to, "to"); + return fetchAll(from.toEpochMilli(), to.toEpochMilli()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index f6b62b2b9351b..b55e544813997 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.CacheFlushListener; import org.apache.kafka.streams.processor.ProcessorContext; @@ -205,13 +203,6 @@ public synchronized WindowStoreIterator fetch(final Bytes key, final lon return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, underlyingIterator); } - @Override - public WindowStoreIterator fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { // since this function may not access the underlying inner store, we need to validate @@ -241,16 +232,6 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final B ); } - @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, - final Bytes to, - final Instant fromTime, - final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - private V fetchPrevious(final Bytes key, final long timestamp) { final byte[] value = underlying.fetch(key, timestamp); if (value != null) { @@ -294,11 +275,4 @@ public KeyValueIterator, byte[]> fetchAll(final long timeFrom, f cacheFunction ); } - - @Override - public KeyValueIterator, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index d4e47c6d18f96..9808ca967cf19 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -16,10 +16,8 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -58,28 +56,11 @@ public WindowStoreIterator fetch(final Bytes key, final long from, final return bytesStore.fetch(key, from, to); } - @Override - public WindowStoreIterator fetch(final Bytes key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { return bytesStore.fetch(keyFrom, keyTo, from, to); } - @Override - public KeyValueIterator, byte[]> fetch(final Bytes from, - final Bytes to, - final Instant fromTime, - final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - @Override public KeyValueIterator, byte[]> all() { return bytesStore.all(); @@ -90,13 +71,6 @@ public KeyValueIterator, byte[]> fetchAll(final long timeFrom, f return bytesStore.fetchAll(timeFrom, timeTo); } - @Override - public KeyValueIterator, byte[]> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } - @Override public void put(final Bytes key, final byte[] value) { put(key, value, context.timestamp()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index e1b6cd1d52e4f..5162eac8848d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -149,13 +147,6 @@ public WindowStoreIterator fetch(final K key, final long timeFrom, final long time); } - @Override - public WindowStoreIterator fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator, V> all() { return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time); @@ -170,13 +161,6 @@ public KeyValueIterator, V> fetchAll(final long timeFrom, final long time); } - @Override - public KeyValueIterator, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), @@ -186,13 +170,6 @@ public KeyValueIterator, V> fetch(final K from, final K to, final lo time); } - @Override - public KeyValueIterator, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - @Override public void flush() { final long startNs = time.nanoseconds(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index e8037bc816358..d7bb523b049ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -16,10 +16,8 @@ */ package org.apache.kafka.streams.state.internals; -import java.time.Instant; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -93,26 +91,12 @@ public WindowStoreIterator fetch(final K key, final long timeFrom, final long return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator(); } - @Override - public WindowStoreIterator fetch(final K key, final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetch(key, from.toEpochMilli(), to.toEpochMilli()); - } - @Override public KeyValueIterator, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { final KeyValueIterator bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } - @Override - public KeyValueIterator, V> fetch(final K from, final K to, final Instant fromTime, final Instant toTime) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(fromTime, "fromTime"); - ApiUtils.validateMillisecondInstant(toTime, "toTime"); - return fetch(from, to, fromTime.toEpochMilli(), toTime.toEpochMilli()); - } - @Override public KeyValueIterator, V> all() { final KeyValueIterator bytesIterator = bytesStore.all(); @@ -125,13 +109,6 @@ public KeyValueIterator, V> fetchAll(final long timeFrom, final long return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } - @Override - public KeyValueIterator, V> fetchAll(final Instant from, final Instant to) throws IllegalArgumentException { - ApiUtils.validateMillisecondInstant(from, "from"); - ApiUtils.validateMillisecondInstant(to, "to"); - return fetchAll(from.toEpochMilli(), to.toEpochMilli()); - } - private void maybeUpdateSeqnumForDups() { if (retainDuplicates) { seqnum = (seqnum + 1) & 0x7FFFFFFF;