diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index a63cd992cbe63..7c622fc0146e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import java.util.List; + import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -90,11 +91,22 @@ public KeyValueIterator range(final K from, return wrapped().range(from, to); } + @Override + public KeyValueIterator reverseRange(final K from, + final K to) { + return wrapped().reverseRange(from, to); + } + @Override public KeyValueIterator all() { return wrapped().all(); } + @Override + public KeyValueIterator reverseAll() { + return wrapped().reverseAll(); + } + @Override public long approximateNumEntries() { return wrapped().approximateNumEntries(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index 494d98eb5bf40..61e47f6cf6c6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -84,11 +84,22 @@ public KeyValueIterator range(final K from, return wrapped().range(from, to); } + @Override + public KeyValueIterator reverseRange(final K from, + final K to) { + return wrapped().reverseRange(from, to); + } + @Override public KeyValueIterator all() { return wrapped().all(); } + @Override + public KeyValueIterator reverseAll() { + return wrapped().reverseAll(); + } + @Override public long approximateNumEntries() { return wrapped().approximateNumEntries(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index b104ad488db3a..3af8d901d849e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -32,7 +32,7 @@ public interface KeyValueStore extends StateStore, ReadOnlyKeyValueStore extends StateStore, ReadOnlyKeyValueStore * Please note that this contract defines the thread-safe read functionality only; it does not * guarantee anything about whether the actual instance is writable by another thread, or * whether it uses some locking mechanism under the hood. For this reason, making dependencies @@ -38,7 +38,7 @@ public interface ReadOnlyKeyValueStore { * * @param key The key to fetch * @return The value or null if no value is found. - * @throws NullPointerException If null is used for key. + * @throws NullPointerException If null is used for key. * @throws InvalidStateStoreException if the store is not initialized */ V get(K key); @@ -46,27 +46,60 @@ public interface ReadOnlyKeyValueStore { /** * Get an iterator over a given range of keys. This iterator must be closed after use. * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s - * and must not return null values. No ordering guarantees are provided. - * @param from The first key that could be in the range - * @param to The last key that could be in the range - * @return The iterator for this range. - * @throws NullPointerException If null is used for from or to. + * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. + * + * @param from The first key that could be in the range, where iteration starts from. + * @param to The last key that could be in the range, where iteration ends. + * @return The iterator for this range, from smallest to largest bytes. + * @throws NullPointerException If null is used for from or to. * @throws InvalidStateStoreException if the store is not initialized */ KeyValueIterator range(K from, K to); + /** + * Get a reverse iterator over a given range of keys. This iterator must be closed after use. + * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s + * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. + * + * @param from The first key that could be in the range, where iteration ends. + * @param to The last key that could be in the range, where iteration starts from. + * @return The reverse iterator for this range, from largest to smallest key bytes. + * @throws NullPointerException If null is used for from or to. + * @throws InvalidStateStoreException if the store is not initialized + */ + default KeyValueIterator reverseRange(K from, K to) { + throw new UnsupportedOperationException(); + } + /** * Return an iterator over all keys in this store. This iterator must be closed after use. * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s - * and must not return null values. No ordering guarantees are provided. - * @return An iterator of all key/value pairs in the store. + * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. + * + * @return An iterator of all key/value pairs in the store, from smallest to largest bytes. * @throws InvalidStateStoreException if the store is not initialized */ KeyValueIterator all(); /** - * Return an approximate count of key-value mappings in this store. + * Return a reverse iterator over all keys in this store. This iterator must be closed after use. + * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s + * and must not return null values. + * Order is not guaranteed as bytes lexicographical ordering might not represent key order. * + * @return An reverse iterator of all key/value pairs in the store, from largest to smallest key bytes. + * @throws InvalidStateStoreException if the store is not initialized + */ + default KeyValueIterator reverseAll() { + throw new UnsupportedOperationException(); + } + + /** + * Return an approximate count of key-value mappings in this store. + *

* The count is not guaranteed to be exact in order to accommodate stores * where an exact count is expensive to calculate. * diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index 16bdbeb8449f4..a5e56aa0d9ac0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -31,11 +31,14 @@ abstract class AbstractMergedSortedCacheStoreIterator implements KeyValueIterator { private final PeekingKeyValueIterator cacheIterator; private final KeyValueIterator storeIterator; + private final boolean forward; AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator cacheIterator, - final KeyValueIterator storeIterator) { + final KeyValueIterator storeIterator, + final boolean forward) { this.cacheIterator = cacheIterator; this.storeIterator = storeIterator; + this.forward = forward; } abstract int compare(final Bytes cacheKey, final KS storeKey); @@ -87,14 +90,32 @@ public KeyValue next() { } final int comparison = compare(nextCacheKey, nextStoreKey); - if (comparison > 0) { - return nextStoreValue(nextStoreKey); - } else if (comparison < 0) { - return nextCacheValue(nextCacheKey); + return chooseNextValue(nextCacheKey, nextStoreKey, comparison); + } + + private KeyValue chooseNextValue(final Bytes nextCacheKey, + final KS nextStoreKey, + final int comparison) { + if (forward) { + if (comparison > 0) { + return nextStoreValue(nextStoreKey); + } else if (comparison < 0) { + return nextCacheValue(nextCacheKey); + } else { + // skip the same keyed element + storeIterator.next(); + return nextCacheValue(nextCacheKey); + } } else { - // skip the same keyed element - storeIterator.next(); - return nextCacheValue(nextCacheKey); + if (comparison < 0) { + return nextStoreValue(nextStoreKey); + } else if (comparison > 0) { + return nextCacheValue(nextCacheKey); + } else { + // skip the same keyed element + storeIterator.next(); + return nextCacheValue(nextCacheKey); + } } } @@ -136,14 +157,32 @@ public K peekNextKey() { } final int comparison = compare(nextCacheKey, nextStoreKey); - if (comparison > 0) { - return deserializeStoreKey(nextStoreKey); - } else if (comparison < 0) { - return deserializeCacheKey(nextCacheKey); + return chooseNextKey(nextCacheKey, nextStoreKey, comparison); + } + + private K chooseNextKey(final Bytes nextCacheKey, + final KS nextStoreKey, + final int comparison) { + if (forward) { + if (comparison > 0) { + return deserializeStoreKey(nextStoreKey); + } else if (comparison < 0) { + return deserializeCacheKey(nextCacheKey); + } else { + // skip the same keyed element + storeIterator.next(); + return deserializeCacheKey(nextCacheKey); + } } else { - // skip the same keyed element - storeIterator.next(); - return deserializeCacheKey(nextCacheKey); + if (comparison < 0) { + return deserializeStoreKey(nextStoreKey); + } else if (comparison > 0) { + return deserializeCacheKey(nextCacheKey); + } else { + // skip the same keyed element + storeIterator.next(); + return deserializeCacheKey(nextCacheKey); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index dc1ae86ab08c8..39335967aaef2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -81,8 +81,9 @@ public KeyValueIterator fetch(final Bytes keyFrom, final long from, final long to) { if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 2e54014d0e89a..cae38e0d68078 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -64,7 +64,6 @@ public void init(final ProcessorContext context, streamThread = Thread.currentThread(); } - @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; @@ -240,8 +239,9 @@ private byte[] getInternal(final Bytes key) { public KeyValueIterator range(final Bytes from, final Bytes to) { if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } @@ -249,7 +249,24 @@ public KeyValueIterator range(final Bytes from, validateStoreOpen(); final KeyValueIterator storeIterator = wrapped().range(from, to); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to); - return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); + return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); + } + + @Override + public KeyValueIterator reverseRange(final Bytes from, + final Bytes to) { + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + validateStoreOpen(); + final KeyValueIterator storeIterator = wrapped().reverseRange(from, to); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, from, to); + return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false); } @Override @@ -258,7 +275,16 @@ public KeyValueIterator all() { final KeyValueIterator storeIterator = new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().all()); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().all(cacheName); - return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); + return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); + } + + @Override + public KeyValueIterator reverseAll() { + validateStoreOpen(); + final KeyValueIterator storeIterator = + new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().reverseAll()); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseAll(cacheName); + return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false); } @Override @@ -309,7 +335,7 @@ public void close() { ); if (!suppressed.isEmpty()) { throwSuppressed("Caught an exception while closing caching key value store for store " + name(), - suppressed); + suppressed); } } finally { lock.writeLock().unlock(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index e7ad9688f8d80..4ac43a216c3fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -174,8 +174,9 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final long earliestSessionEndTime, final long latestSessionStartTime) { if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 7df0a4b7d0975..8867602c45771 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -222,8 +222,9 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final long timeFrom, final long timeTo) { if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 35f6d365891ab..236f21877ef2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -100,11 +100,22 @@ public KeyValueIterator range(final Bytes from, return wrapped().range(from, to); } + @Override + public KeyValueIterator reverseRange(final Bytes from, + final Bytes to) { + return wrapped().reverseRange(from, to); + } + @Override public KeyValueIterator all() { return wrapped().all(); } + @Override + public KeyValueIterator reverseAll() { + return wrapped().reverseAll(); + } + void log(final Bytes key, final byte[] value) { context.logChange(name(), key, value, context.timestamp()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java index 4ac6fee2e6cec..1614f9f52a15a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeKeyValueIterator.java @@ -50,8 +50,7 @@ public K peekNextKey() { @Override public boolean hasNext() { - while ((current == null || !current.hasNext()) - && storeIterator.hasNext()) { + while ((current == null || !current.hasNext()) && storeIterator.hasNext()) { close(); current = nextIteratorFunction.apply(storeIterator.next()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java index c790b89b13f65..54e5f1e4f243a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStore.java @@ -79,7 +79,29 @@ public KeyValueIterator apply(final ReadOnlyKeyValueStore store) { } }; final List> stores = storeProvider.stores(storeName, storeType); - return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); + } + + @Override + public KeyValueIterator reverseRange(final K from, final K to) { + Objects.requireNonNull(from); + Objects.requireNonNull(to); + final NextIteratorFunction> nextIteratorFunction = new NextIteratorFunction>() { + @Override + public KeyValueIterator apply(final ReadOnlyKeyValueStore store) { + try { + return store.reverseRange(from, to); + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); + } + } + }; + final List> stores = storeProvider.stores(storeName, storeType); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); } @Override @@ -95,7 +117,27 @@ public KeyValueIterator apply(final ReadOnlyKeyValueStore store) { } }; final List> stores = storeProvider.stores(storeName, storeType); - return new DelegatingPeekingKeyValueIterator<>(storeName, new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); + } + + @Override + public KeyValueIterator reverseAll() { + final NextIteratorFunction> nextIteratorFunction = new NextIteratorFunction>() { + @Override + public KeyValueIterator apply(final ReadOnlyKeyValueStore store) { + try { + return store.reverseAll(); + } catch (final InvalidStateStoreException e) { + throw new InvalidStateStoreException("State store is not available anymore and may have been migrated to another instance; please re-discover its location from the state metadata."); + } + } + }; + final List> stores = storeProvider.stores(storeName, storeType); + return new DelegatingPeekingKeyValueIterator<>( + storeName, + new CompositeKeyValueIterator<>(stores.iterator(), nextIteratorFunction)); } @Override @@ -111,6 +153,5 @@ public long approximateNumEntries() { return total; } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 27ec409dc4d31..b02459dc57a67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -16,30 +16,31 @@ */ package org.apache.kafka.streams.state.internals; -import java.util.List; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; - -import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + public class InMemoryKeyValueStore implements KeyValueStore { + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class); + private final String name; private final NavigableMap map = new TreeMap<>(); private volatile boolean open = false; private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it - private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class); - public InMemoryKeyValueStore(final String name) { this.name = name; } @@ -110,24 +111,40 @@ public synchronized byte[] delete(final Bytes key) { @Override public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { + return range(from, to, true); + } + + @Override + public KeyValueIterator reverseRange(final Bytes from, final Bytes to) { + return range(from, to, false); + } + private KeyValueIterator range(final Bytes from, final Bytes to, final boolean forward) { if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet())); + new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet(), forward)); } @Override public synchronized KeyValueIterator all() { return new DelegatingPeekingKeyValueIterator<>( name, - new InMemoryKeyValueIterator(map.keySet())); + new InMemoryKeyValueIterator(map.keySet(), true)); + } + + @Override + public KeyValueIterator reverseAll() { + return new DelegatingPeekingKeyValueIterator<>( + name, + new InMemoryKeyValueIterator(map.keySet(), false)); } @Override @@ -150,8 +167,12 @@ public void close() { private class InMemoryKeyValueIterator implements KeyValueIterator { private final Iterator iter; - private InMemoryKeyValueIterator(final Set keySet) { - this.iter = new TreeSet<>(keySet).iterator(); + private InMemoryKeyValueIterator(final Set keySet, final boolean forward) { + if (forward) { + this.iter = new TreeSet<>(keySet).iterator(); + } else { + this.iter = new TreeSet<>(keySet).descendingIterator(); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index b10f4c4b52406..e4fda06682c86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -181,8 +181,9 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro removeExpiredSegments(); if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index e7220e80178d4..14d33cb4ea47b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -191,8 +191,9 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, removeExpiredSegments(); if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index 62cfac3b09ca2..fa29974c98330 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -118,11 +118,22 @@ public KeyValueIterator range(final Bytes from, return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.range(from, to)); } + @Override + public KeyValueIterator reverseRange(final Bytes from, + final Bytes to) { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.reverseRange(from, to)); + } + @Override public KeyValueIterator all() { return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all()); } + @Override + public KeyValueIterator reverseAll() { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.reverseAll()); + } + @Override public long approximateNumEntries() { return store.approximateNumEntries(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index d69df13f4f096..32a91cd671299 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -142,6 +142,14 @@ public KeyValueIterator range(final Bytes from, final Bytes to) { throw new UnsupportedOperationException("MemoryLRUCache does not support range() function."); } + /** + * @throws UnsupportedOperationException at every invocation + */ + @Override + public KeyValueIterator reverseRange(final Bytes from, final Bytes to) { + throw new UnsupportedOperationException("MemoryLRUCache does not support reverseRange() function."); + } + /** * @throws UnsupportedOperationException at every invocation */ @@ -150,6 +158,14 @@ public KeyValueIterator all() { throw new UnsupportedOperationException("MemoryLRUCache does not support all() function."); } + /** + * @throws UnsupportedOperationException at every invocation + */ + @Override + public KeyValueIterator reverseAll() { + throw new UnsupportedOperationException("MemoryLRUCache does not support reverseAll() function."); + } + @Override public long approximateNumEntries() { return this.map.size(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index 6e0deaa667eb9..93adc25520cf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -36,10 +36,10 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { - if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } @@ -50,12 +50,34 @@ public KeyValueIterator range(final Bytes from, final Bytes to) { .subSet(from, true, to, true).iterator(), treeMap)); } + @Override + public KeyValueIterator reverseRange(final Bytes from, final Bytes to) { + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + final TreeMap treeMap = toTreeMap(); + return new DelegatingPeekingKeyValueIterator<>(name(), + new MemoryNavigableLRUCache.CacheIterator(treeMap + .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap)); + } + @Override public KeyValueIterator all() { final TreeMap treeMap = toTreeMap(); return new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet().iterator(), treeMap); } + @Override + public KeyValueIterator reverseAll() { + final TreeMap treeMap = toTreeMap(); + return new MemoryNavigableLRUCache.CacheIterator(treeMap.descendingKeySet().iterator(), treeMap); + } + private synchronized TreeMap toTreeMap() { return new TreeMap<>(this.map); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java index 7a545e4da3f9b..701bdd1d4d548 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java @@ -24,12 +24,14 @@ * Merges two iterators. Assumes each of them is sorted by key * */ -class MergedSortedCacheKeyValueBytesStoreIterator extends AbstractMergedSortedCacheStoreIterator { +class MergedSortedCacheKeyValueBytesStoreIterator + extends AbstractMergedSortedCacheStoreIterator { MergedSortedCacheKeyValueBytesStoreIterator(final PeekingKeyValueIterator cacheIterator, - final KeyValueIterator storeIterator) { - super(cacheIterator, storeIterator); + final KeyValueIterator storeIterator, + final boolean forward) { + super(cacheIterator, storeIterator, forward); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index 6994222531067..ff45a418889a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -33,7 +33,7 @@ class MergedSortedCacheSessionStoreIterator extends AbstractMergedSortedCacheSto MergedSortedCacheSessionStoreIterator(final PeekingKeyValueIterator cacheIterator, final KeyValueIterator, byte[]> storeIterator, final SegmentedCacheFunction cacheFunction) { - super(cacheIterator, storeIterator); + super(cacheIterator, storeIterator, true); this.cacheFunction = cacheFunction; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java index 98b3f064db178..7d40dda2bc5ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java @@ -32,7 +32,7 @@ class MergedSortedCacheWindowStoreIterator extends AbstractMergedSortedCacheStor MergedSortedCacheWindowStoreIterator(final PeekingKeyValueIterator cacheIterator, final KeyValueIterator storeIterator) { - super(cacheIterator, storeIterator); + super(cacheIterator, storeIterator, true); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java index 1cba018c285d6..36e922fee4bb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java @@ -37,7 +37,7 @@ class MergedSortedCacheWindowStoreKeyValueIterator final long windowSize, final SegmentedCacheFunction cacheFunction ) { - super(filteredCacheIterator, underlyingIterator); + super(filteredCacheIterator, underlyingIterator, true); this.serdes = serdes; this.windowSize = windowSize; this.cacheFunction = cacheFunction; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index d77834b66e65c..9fe5be998477e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -42,6 +42,7 @@ * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality. * The inner {@link KeyValueStore} of this class is of type <Bytes,byte[]>, hence we use {@link Serde}s * to convert from <K,V> to <Bytes,byte[]> + * * @param * @param */ @@ -187,11 +188,25 @@ public KeyValueIterator range(final K from, ); } + @Override + public KeyValueIterator reverseRange(final K from, + final K to) { + return new MeteredKeyValueIterator( + wrapped().reverseRange(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))), + rangeSensor + ); + } + @Override public KeyValueIterator all() { return new MeteredKeyValueIterator(wrapped().all(), allSensor); } + @Override + public KeyValueIterator reverseAll() { + return new MeteredKeyValueIterator(wrapped().reverseAll(), allSensor); + } + @Override public void flush() { maybeMeasureLatency(super::flush, time, flushSensor); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 4693fbc7b6540..032a303f617e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -281,15 +281,27 @@ public boolean isEmpty() { } synchronized Iterator keyRange(final Bytes from, final Bytes to) { - return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true)); + return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), true); } - private Iterator keySetIterator(final Set keySet) { - return new TreeSet<>(keySet).iterator(); + synchronized Iterator reverseKeyRange(final Bytes from, final Bytes to) { + return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true), false); + } + + private Iterator keySetIterator(final Set keySet, final boolean forward) { + if (forward) { + return new TreeSet<>(keySet).iterator(); + } else { + return new TreeSet<>(keySet).descendingIterator(); + } } synchronized Iterator allKeys() { - return keySetIterator(cache.navigableKeySet()); + return keySetIterator(cache.navigableKeySet(), true); + } + + synchronized Iterator reverseAllKeys() { + return keySetIterator(cache.navigableKeySet(), false); } synchronized LRUCacheEntry first() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java index 862e6fcfbe89e..2ffa3f3529863 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java @@ -40,11 +40,22 @@ public KeyValueIterator range(final K from, return new KeyValueIteratorFacade<>(inner.range(from, to)); } + @Override + public KeyValueIterator reverseRange(final K from, + final K to) { + return new KeyValueIteratorFacade<>(inner.reverseRange(from, to)); + } + @Override public KeyValueIterator all() { return new KeyValueIteratorFacade<>(inner.all()); } + @Override + public KeyValueIterator reverseAll() { + return new KeyValueIteratorFacade<>(inner.reverseAll()); + } + @Override public long approximateNumEntries() { return inner.approximateNumEntries(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java deleted file mode 100644 index b84175e1ee0c6..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.rocksdb.RocksIterator; - -import java.util.Set; - -class RocksDBPrefixIterator extends RocksDbIterator { - private byte[] rawPrefix; - - RocksDBPrefixIterator(final String name, - final RocksIterator newIterator, - final Set> openIterators, - final Bytes prefix) { - super(name, newIterator, openIterators); - rawPrefix = prefix.get(); - newIterator.seek(rawPrefix); - } - - @Override - public synchronized boolean hasNext() { - if (!super.hasNext()) { - return false; - } - - final byte[] rawNextKey = super.peekNextKey().get(); - for (int i = 0; i < rawPrefix.length; i++) { - if (i == rawNextKey.length) { - throw new IllegalStateException("Unexpected RocksDB Key Value. Should have been skipped with seek."); - } - if (rawNextKey[i] != rawPrefix[i]) { - return false; - } - } - return true; - } -} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java index b1cf24dcdd1b5..29154860af3d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java @@ -29,32 +29,50 @@ class RocksDBRangeIterator extends RocksDbIterator { // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; - private final byte[] rawToKey; + private final byte[] rawLastKey; + private final boolean forward; RocksDBRangeIterator(final String storeName, final RocksIterator iter, final Set> openIterators, final Bytes from, - final Bytes to) { - super(storeName, iter, openIterators); - iter.seek(from.get()); - rawToKey = to.get(); - if (rawToKey == null) { - throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); + final Bytes to, + final boolean forward) { + super(storeName, iter, openIterators, forward); + this.forward = forward; + if (forward) { + iter.seek(from.get()); + rawLastKey = to.get(); + if (rawLastKey == null) { + throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + to); + } + } else { + iter.seekForPrev(to.get()); + rawLastKey = from.get(); + if (rawLastKey == null) { + throw new NullPointerException("RocksDBRangeIterator: RawLastKey is null for key " + from); + } } } @Override public KeyValue makeNext() { final KeyValue next = super.makeNext(); - if (next == null) { return allDone(); } else { - if (comparator.compare(next.key.get(), rawToKey) <= 0) { - return next; + if (forward) { + if (comparator.compare(next.key.get(), rawLastKey) <= 0) { + return next; + } else { + return allDone(); + } } else { - return allDone(); + if (comparator.compare(next.key.get(), rawLastKey) >= 0) { + return next; + } else { + return allDone(); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 0bc256f83c4ec..f6c812ebf2378 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -65,7 +65,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.regex.Pattern; import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; @@ -75,8 +74,6 @@ public class RocksDBStore implements KeyValueStore, BatchWritingStore { private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class); - private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst"); - private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION; private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL; private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L; @@ -327,19 +324,32 @@ public synchronized byte[] delete(final Bytes key) { @Override public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { + return range(from, to, true); + } + + @Override + public synchronized KeyValueIterator reverseRange(final Bytes from, + final Bytes to) { + return range(from, to, false); + } + + KeyValueIterator range(final Bytes from, + final Bytes to, + final boolean forward) { Objects.requireNonNull(from, "from cannot be null"); Objects.requireNonNull(to, "to cannot be null"); if (from.compareTo(to) > 0) { log.warn("Returning empty iterator for fetch with invalid key range: from > to. " - + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } validateStoreOpen(); - final KeyValueIterator rocksDBRangeIterator = dbAccessor.range(from, to); + final KeyValueIterator rocksDBRangeIterator = dbAccessor.range(from, to, forward); openIterators.add(rocksDBRangeIterator); return rocksDBRangeIterator; @@ -347,8 +357,17 @@ public synchronized KeyValueIterator range(final Bytes from, @Override public synchronized KeyValueIterator all() { + return all(true); + } + + @Override + public KeyValueIterator reverseAll() { + return all(false); + } + + KeyValueIterator all(final boolean forward) { validateStoreOpen(); - final KeyValueIterator rocksDbIterator = dbAccessor.all(); + final KeyValueIterator rocksDbIterator = dbAccessor.all(forward); openIterators.add(rocksDbIterator); return rocksDbIterator; } @@ -474,9 +493,10 @@ void prepareBatch(final List> entries, byte[] getOnly(final byte[] key) throws RocksDBException; KeyValueIterator range(final Bytes from, - final Bytes to); + final Bytes to, + final boolean forward); - KeyValueIterator all(); + KeyValueIterator all(final boolean forward); long approximateNumEntries() throws RocksDBException; @@ -540,20 +560,26 @@ public byte[] getOnly(final byte[] key) throws RocksDBException { @Override public KeyValueIterator range(final Bytes from, - final Bytes to) { + final Bytes to, + final boolean forward) { return new RocksDBRangeIterator( name, db.newIterator(columnFamily), openIterators, from, - to); + to, + forward); } @Override - public KeyValueIterator all() { + public KeyValueIterator all(final boolean forward) { final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily); - innerIterWithTimestamp.seekToFirst(); - return new RocksDbIterator(name, innerIterWithTimestamp, openIterators); + if (forward) { + innerIterWithTimestamp.seekToFirst(); + } else { + innerIterWithTimestamp.seekToLast(); + } + return new RocksDbIterator(name, innerIterWithTimestamp, openIterators, forward); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index 6c31e9b43d208..e16577513395e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -193,22 +193,29 @@ public byte[] getOnly(final byte[] key) throws RocksDBException { @Override public KeyValueIterator range(final Bytes from, - final Bytes to) { + final Bytes to, + final boolean forward) { return new RocksDBDualCFRangeIterator( name, db.newIterator(newColumnFamily), db.newIterator(oldColumnFamily), from, - to); + to, + forward); } @Override - public KeyValueIterator all() { + public KeyValueIterator all(final boolean forward) { final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily); - innerIterWithTimestamp.seekToFirst(); final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily); - innerIterNoTimestamp.seekToFirst(); - return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp); + if (forward) { + innerIterWithTimestamp.seekToFirst(); + innerIterNoTimestamp.seekToFirst(); + } else { + innerIterWithTimestamp.seekToLast(); + innerIterNoTimestamp.seekToLast(); + } + return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp, forward); } @Override @@ -262,6 +269,7 @@ private class RocksDBDualCFIterator extends AbstractIterator makeNext() { } else { next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); nextWithTimestamp = null; - iterWithTimestamp.next(); + if (forward) { + iterWithTimestamp.next(); + } else { + iterWithTimestamp.prev(); + } } } else { if (nextWithTimestamp == null) { next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value())); nextNoTimestamp = null; - iterNoTimestamp.next(); - } else { - if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) { - next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value())); - nextNoTimestamp = null; + if (forward) { iterNoTimestamp.next(); } else { - next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); - nextWithTimestamp = null; - iterWithTimestamp.next(); + iterNoTimestamp.prev(); + } + } else { + if (forward) { + if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) { + next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value())); + nextNoTimestamp = null; + iterNoTimestamp.next(); + } else { + next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); + nextWithTimestamp = null; + iterWithTimestamp.next(); + } + } else { + if (comparator.compare(nextNoTimestamp, nextWithTimestamp) >= 0) { + next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value())); + nextNoTimestamp = null; + iterNoTimestamp.prev(); + } else { + next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value()); + nextWithTimestamp = null; + iterWithTimestamp.prev(); + } } } } - return next; } @@ -351,19 +380,31 @@ private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator { // comparator to be pluggable, and the default is lexicographic, so it's // safe to just force lexicographic comparator here for now. private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; - private final byte[] upperBoundKey; + private final byte[] rawLastKey; + private final boolean forward; RocksDBDualCFRangeIterator(final String storeName, final RocksIterator iterWithTimestamp, final RocksIterator iterNoTimestamp, final Bytes from, - final Bytes to) { - super(storeName, iterWithTimestamp, iterNoTimestamp); - iterWithTimestamp.seek(from.get()); - iterNoTimestamp.seek(from.get()); - upperBoundKey = to.get(); - if (upperBoundKey == null) { - throw new NullPointerException("RocksDBDualCFRangeIterator: upperBoundKey is null for key " + to); + final Bytes to, + final boolean forward) { + super(storeName, iterWithTimestamp, iterNoTimestamp, forward); + this.forward = forward; + if (forward) { + iterWithTimestamp.seek(from.get()); + iterNoTimestamp.seek(from.get()); + rawLastKey = to.get(); + if (rawLastKey == null) { + throw new NullPointerException("RocksDBDualCFRangeIterator: rawLastKey is null for key " + to); + } + } else { + iterWithTimestamp.seekForPrev(to.get()); + iterNoTimestamp.seekForPrev(to.get()); + rawLastKey = from.get(); + if (rawLastKey == null) { + throw new NullPointerException("RocksDBDualCFRangeIterator: rawLastKey is null for key " + from); + } } } @@ -374,10 +415,18 @@ public KeyValue makeNext() { if (next == null) { return allDone(); } else { - if (comparator.compare(next.key.get(), upperBoundKey) <= 0) { - return next; + if (forward) { + if (comparator.compare(next.key.get(), rawLastKey) <= 0) { + return next; + } else { + return allDone(); + } } else { - return allDone(); + if (comparator.compare(next.key.get(), rawLastKey) >= 0) { + return next; + } else { + return allDone(); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java index 9fa747a45b9c0..388195a81b89e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java @@ -25,12 +25,14 @@ import java.util.NoSuchElementException; import java.util.Set; +import java.util.function.Consumer; class RocksDbIterator extends AbstractIterator> implements KeyValueIterator { private final String storeName; private final RocksIterator iter; private final Set> openIterators; + private final Consumer advanceIterator; private volatile boolean open = true; @@ -38,10 +40,12 @@ class RocksDbIterator extends AbstractIterator> implemen RocksDbIterator(final String storeName, final RocksIterator iter, - final Set> openIterators) { + final Set> openIterators, + final boolean forward) { this.storeName = storeName; this.iter = iter; this.openIterators = openIterators; + this.advanceIterator = forward ? RocksIterator::next : RocksIterator::prev; } @Override @@ -58,7 +62,7 @@ public KeyValue makeNext() { return allDone(); } else { next = getKeyValue(); - iter.next(); + advanceIterator.accept(iter); return next; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 0179536b63a28..594dc4616b6c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -76,7 +76,6 @@ public long flushes() { * underlying store name. This method creates those names. * @param taskIDString Task ID * @param underlyingStoreName Underlying store name - * @return */ public static String nameSpaceFromTaskIdAndStore(final String taskIDString, final String underlyingStoreName) { return taskIDString + "-" + underlyingStoreName; @@ -84,8 +83,6 @@ public static String nameSpaceFromTaskIdAndStore(final String taskIDString, fina /** * Given a cache name of the form taskid-storename, return the task ID. - * @param cacheName - * @return */ public static String taskIDfromCacheName(final String cacheName) { final String[] tokens = cacheName.split("-", 2); @@ -94,8 +91,6 @@ public static String taskIDfromCacheName(final String cacheName) { /** * Given a cache name of the form taskid-storename, return the store name. - * @param cacheName - * @return */ public static String underlyingStoreNamefromCacheName(final String cacheName) { final String[] tokens = cacheName.split("-", 2); @@ -105,9 +100,6 @@ public static String underlyingStoreNamefromCacheName(final String cacheName) { /** * Add a listener that is called each time an entry is evicted from the cache or an explicit flush is called - * - * @param namespace - * @param listener */ public void addDirtyEntryFlushListener(final String namespace, final DirtyEntryFlushListener listener) { final NamedCache cache = getOrCreateCache(namespace); @@ -185,6 +177,14 @@ public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes fro return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache); } + public MemoryLRUCacheBytesIterator reverseRange(final String namespace, final Bytes from, final Bytes to) { + final NamedCache cache = getCache(namespace); + if (cache == null) { + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); + } + return new MemoryLRUCacheBytesIterator(cache.reverseKeyRange(from, to), cache); + } + public MemoryLRUCacheBytesIterator all(final String namespace) { final NamedCache cache = getCache(namespace); if (cache == null) { @@ -193,6 +193,14 @@ public MemoryLRUCacheBytesIterator all(final String namespace) { return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); } + public MemoryLRUCacheBytesIterator reverseAll(final String namespace) { + final NamedCache cache = getCache(namespace); + if (cache == null) { + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); + } + return new MemoryLRUCacheBytesIterator(cache.reverseAllKeys(), cache); + } + public long size() { long size = 0; for (final NamedCache cache : caches.values()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index 863b44ba4f68e..be8f259366841 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -133,11 +133,22 @@ public KeyValueIterator range(final Bytes from, return wrapped.range(from, to); } + @Override + public KeyValueIterator reverseRange(final Bytes from, + final Bytes to) { + return wrapped.reverseRange(from, to); + } + @Override public KeyValueIterator all() { return wrapped.all(); } + @Override + public KeyValueIterator reverseAll() { + return wrapped.reverseAll(); + } + @Override public long approximateNumEntries() { return wrapped.approximateNumEntries(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index ca5fd2fc7a840..61f317d495416 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -40,12 +40,12 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public abstract class AbstractKeyValueStoreTest { @@ -188,7 +188,55 @@ public void testPutGetRange() { } @Test - public void testPutGetRangeWithDefaultSerdes() { + public void testPutGetReverseRange() { + // Verify that the store reads and writes correctly ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, driver.sizeOf(store)); + assertEquals("zero", store.get(0)); + assertEquals("one", store.get(1)); + assertEquals("two", store.get(2)); + assertNull(store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + // Flush now so that for caching store, we will not skip the deletion following an put + store.flush(); + store.delete(5); + assertEquals(4, driver.sizeOf(store)); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertEquals("zero", driver.flushedEntryStored(0)); + assertEquals("one", driver.flushedEntryStored(1)); + assertEquals("two", driver.flushedEntryStored(2)); + assertEquals("four", driver.flushedEntryStored(4)); + assertNull(driver.flushedEntryStored(5)); + + assertFalse(driver.flushedEntryRemoved(0)); + assertFalse(driver.flushedEntryRemoved(1)); + assertFalse(driver.flushedEntryRemoved(2)); + assertFalse(driver.flushedEntryRemoved(4)); + assertTrue(driver.flushedEntryRemoved(5)); + + final HashMap expectedContents = new HashMap<>(); + expectedContents.put(2, "two"); + expectedContents.put(4, "four"); + + // Check range iteration ... + assertEquals(expectedContents, getContents(store.reverseRange(2, 4))); + assertEquals(expectedContents, getContents(store.reverseRange(2, 6))); + + // Check all iteration ... + expectedContents.put(0, "zero"); + expectedContents.put(1, "one"); + assertEquals(expectedContents, getContents(store.reverseAll())); + } + + @Test + public void testPutGetWithDefaultSerdes() { // Verify that the store reads and writes correctly ... store.put(0, "zero"); store.put(1, "one"); @@ -371,7 +419,25 @@ public void shouldPutAll() { allReturned.add(iterator.next()); } assertThat(allReturned, equalTo(expectedReturned)); + } + + @Test + public void shouldPutReverseAll() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(1, "one")); + entries.add(new KeyValue<>(2, "two")); + + store.putAll(entries); + + final List> allReturned = new ArrayList<>(); + final List> expectedReturned = + Arrays.asList(KeyValue.pair(2, "two"), KeyValue.pair(1, "one")); + final Iterator> iterator = store.reverseAll(); + while (iterator.hasNext()) { + allReturned.add(iterator.next()); + } + assertThat(allReturned, equalTo(expectedReturned)); } @Test @@ -397,6 +463,21 @@ public void shouldReturnSameResultsForGetAndRangeWithEqualKeys() { assertFalse(iterator.hasNext()); } + @Test + public void shouldReturnSameResultsForGetAndReverseRangeWithEqualKeys() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(1, "one")); + entries.add(new KeyValue<>(2, "two")); + entries.add(new KeyValue<>(3, "three")); + + store.putAll(entries); + + final Iterator> iterator = store.reverseRange(2, 2); + + assertEquals(iterator.next().value, store.get(2)); + assertFalse(iterator.hasNext()); + } + @Test public void shouldNotThrowConcurrentModificationException() { store.put(0, "zero"); @@ -418,10 +499,61 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { assertThat( messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to." + - " This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + " Note that the built-in numerical serdes do not follow this for negative numbers") ); } + } + + @Test + public void shouldNotThrowInvalidReverseRangeExceptionWithNegativeFromKey() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + final KeyValueIterator iterator = store.reverseRange(-1, 1); + assertFalse(iterator.hasNext()); + final List messages = appender.getMessages(); + assertThat( + messages, + hasItem("Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers") + ); + } + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithFromLargerThanTo() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + final KeyValueIterator iterator = store.range(2, 1); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat( + messages, + hasItem("Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers") + ); + } + } + + @Test + public void shouldNotThrowInvalidReverseRangeExceptionWithFromLargerThanTo() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + final KeyValueIterator iterator = store.reverseRange(2, 1); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat( + messages, + hasItem("Returning empty iterator for fetch with invalid key range: from > to." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " Note that the built-in numerical serdes do not follow this for negative numbers") + ); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 3147c80f3f012..ce3aa86fd2965 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -273,25 +273,25 @@ public void shouldFetchExactKeys() { new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions("a", 0, Long.MAX_VALUE) + sessionStore.findSessions("a", 0, Long.MAX_VALUE) ) { assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 3L, 5L)))); } try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions("aa", 0, Long.MAX_VALUE) + sessionStore.findSessions("aa", 0, Long.MAX_VALUE) ) { assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(2L, 4L)))); } try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE) + sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE) ) { assertThat(valuesToSet(iterator), equalTo(new HashSet<>(asList(1L, 2L, 3L, 4L, 5L)))); } try (final KeyValueIterator, Long> iterator = - sessionStore.findSessions("a", "aa", 10, 0) + sessionStore.findSessions("a", "aa", 10, 0) ) { assertThat(valuesToSet(iterator), equalTo(new HashSet<>(Collections.singletonList(2L)))); } @@ -304,9 +304,9 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { sessionStore.init(context, sessionStore); - final Bytes key1 = Bytes.wrap(new byte[]{0}); - final Bytes key2 = Bytes.wrap(new byte[]{0, 0}); - final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0}); + final Bytes key1 = Bytes.wrap(new byte[] {0}); + final Bytes key2 = Bytes.wrap(new byte[] {0, 0}); + final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0}); sessionStore.put(new Windowed<>(key1, new SessionWindow(1, 100)), "1"); sessionStore.put(new Windowed<>(key2, new SessionWindow(2, 100)), "2"); @@ -554,7 +554,8 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { assertThat( messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to." + - " This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + " Note that the built-in numerical serdes do not follow this for negative numbers") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 2211bd3dc200d..a40621697c33a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -668,10 +668,10 @@ public void shouldFetchAndIterateOverExactKeys() { final long windowSize = 0x7a00000000000000L; final long retentionPeriod = 0x7a00000000000000L; final WindowStore windowStore = buildWindowStore(retentionPeriod, - windowSize, - false, - Serdes.String(), - Serdes.String()); + windowSize, + false, + Serdes.String(), + Serdes.String()); windowStore.init(context, windowStore); @@ -754,15 +754,15 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() { @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final WindowStore windowStore = buildWindowStore(RETENTION_PERIOD, - WINDOW_SIZE, - true, - Serdes.Bytes(), - Serdes.String()); + WINDOW_SIZE, + true, + Serdes.Bytes(), + Serdes.String()); windowStore.init(context, windowStore); - final Bytes key1 = Bytes.wrap(new byte[]{0}); - final Bytes key2 = Bytes.wrap(new byte[]{0, 0}); - final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0}); + final Bytes key1 = Bytes.wrap(new byte[] {0}); + final Bytes key2 = Bytes.wrap(new byte[] {0, 0}); + final Bytes key3 = Bytes.wrap(new byte[] {0, 0, 0}); windowStore.put(key1, "1", 0); windowStore.put(key2, "2", 0); windowStore.put(key3, "3", 0); @@ -816,7 +816,8 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { assertThat( messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to." + - " This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + " Note that the built-in numerical serdes do not follow this for negative numbers") ); } @@ -1015,8 +1016,8 @@ public void testFetchDuplicates() { @SuppressWarnings("deprecation") private void putFirstBatch(final WindowStore store, - @SuppressWarnings("SameParameterValue") final long startTime, - final InternalMockProcessorContext context) { + @SuppressWarnings("SameParameterValue") final long startTime, + final InternalMockProcessorContext context) { context.setRecordContext(createRecordContext(startTime)); store.put(0, "zero"); context.setRecordContext(createRecordContext(startTime + 1L)); @@ -1031,8 +1032,8 @@ private void putFirstBatch(final WindowStore store, @SuppressWarnings("deprecation") private void putSecondBatch(final WindowStore store, - @SuppressWarnings("SameParameterValue") final long startTime, - final InternalMockProcessorContext context) { + @SuppressWarnings("SameParameterValue") final long startTime, + final InternalMockProcessorContext context) { context.setRecordContext(createRecordContext(startTime + 3L)); store.put(2, "two+1"); context.setRecordContext(createRecordContext(startTime + 4L)); @@ -1048,7 +1049,7 @@ private void putSecondBatch(final WindowStore store, } private Map> entriesByKey(final List> changeLog, - @SuppressWarnings("SameParameterValue") final long startTime) { + @SuppressWarnings("SameParameterValue") final long startTime) { final HashMap> entriesByKey = new HashMap<>(); for (final KeyValue entry : changeLog) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index ffc8134a43a6c..d0e10d5367837 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -40,6 +40,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -54,7 +55,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -280,6 +280,27 @@ public void shouldIterateAllStoredItems() { results.add(all.next().key); } assertEquals(items, results.size()); + assertEquals(Arrays.asList( + Bytes.wrap("0".getBytes()), + Bytes.wrap("1".getBytes()), + Bytes.wrap("2".getBytes()) + ), results); + } + + @Test + public void shouldReverseIterateAllStoredItems() { + final int items = addItemsToCache(); + final KeyValueIterator all = store.reverseAll(); + final List results = new ArrayList<>(); + while (all.hasNext()) { + results.add(all.next().key); + } + assertEquals(items, results.size()); + assertEquals(Arrays.asList( + Bytes.wrap("2".getBytes()), + Bytes.wrap("1".getBytes()), + Bytes.wrap("0".getBytes()) + ), results); } @Test @@ -292,6 +313,28 @@ public void shouldIterateOverRange() { results.add(range.next().key); } assertEquals(items, results.size()); + assertEquals(Arrays.asList( + Bytes.wrap("0".getBytes()), + Bytes.wrap("1".getBytes()), + Bytes.wrap("2".getBytes()) + ), results); + } + + @Test + public void shouldReverseIterateOverRange() { + final int items = addItemsToCache(); + final KeyValueIterator range = + store.reverseRange(bytesKey(String.valueOf(0)), bytesKey(String.valueOf(items))); + final List results = new ArrayList<>(); + while (range.hasNext()) { + results.add(range.next().key); + } + assertEquals(items, results.size()); + assertEquals(Arrays.asList( + Bytes.wrap("2".getBytes()), + Bytes.wrap("1".getBytes()), + Bytes.wrap("0".getBytes()) + ), results); } @Test @@ -300,7 +343,9 @@ public void shouldDeleteItemsFromCache() { store.delete(bytesKey("a")); assertNull(store.get(bytesKey("a"))); assertFalse(store.range(bytesKey("a"), bytesKey("b")).hasNext()); + assertFalse(store.reverseRange(bytesKey("a"), bytesKey("b")).hasNext()); assertFalse(store.all().hasNext()); + assertFalse(store.reverseAll().hasNext()); } @Test @@ -310,7 +355,9 @@ public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() { store.delete(bytesKey("a")); assertNull(store.get(bytesKey("a"))); assertFalse(store.range(bytesKey("a"), bytesKey("b")).hasNext()); + assertFalse(store.reverseRange(bytesKey("a"), bytesKey("b")).hasNext()); assertFalse(store.all().hasNext()); + assertFalse(store.reverseAll().hasNext()); } @Test @@ -321,67 +368,94 @@ public void shouldClearNamespaceCacheOnClose() { assertEquals(0, cache.size()); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToGetFromClosedCachingStore() { - store.close(); - store.get(bytesKey("a")); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.get(bytesKey("a")); + }); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToWriteToClosedCachingStore() { - store.close(); - store.put(bytesKey("a"), bytesValue("a")); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.put(bytesKey("a"), bytesValue("a")); + }); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToDoRangeQueryOnClosedCachingStore() { - store.close(); - store.range(bytesKey("a"), bytesKey("b")); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.range(bytesKey("a"), bytesKey("b")); + }); + } + + @Test + public void shouldThrowIfTryingToDoReverseRangeQueryOnClosedCachingStore() { + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.reverseRange(bytesKey("a"), bytesKey("b")); + }); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToDoAllQueryOnClosedCachingStore() { - store.close(); - store.all(); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.all(); + }); + } + + @Test + public void shouldThrowIfTryingToDoReverseAllQueryOnClosedCachingStore() { + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.reverseAll(); + }); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToDoGetApproxSizeOnClosedCachingStore() { - store.close(); - store.approximateNumEntries(); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.close(); + store.approximateNumEntries(); + }); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToDoPutAllClosedCachingStore() { - store.close(); - store.putAll(Collections.singletonList(KeyValue.pair(bytesKey("a"), bytesValue("a")))); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.putAll(Collections.singletonList(KeyValue.pair(bytesKey("a"), bytesValue("a")))); + }); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToDoPutIfAbsentClosedCachingStore() { - store.close(); - store.putIfAbsent(bytesKey("b"), bytesValue("c")); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.putIfAbsent(bytesKey("b"), bytesValue("c")); + }); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnPutWithNullKey() { - store.put(null, bytesValue("c")); + assertThrows(NullPointerException.class, () -> store.put(null, bytesValue("c"))); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnPutIfAbsentWithNullKey() { - store.putIfAbsent(null, bytesValue("c")); + assertThrows(NullPointerException.class, () -> store.putIfAbsent(null, bytesValue("c"))); } @Test public void shouldThrowNullPointerExceptionOnPutAllWithNullKey() { final List> entries = new ArrayList<>(); entries.add(new KeyValue<>(null, bytesValue("a"))); - try { - store.putAll(entries); - fail("Should have thrown NullPointerException while putAll null key"); - } catch (final NullPointerException expected) { - } + assertThrows(NullPointerException.class, () -> store.putAll(entries)); } @Test @@ -408,10 +482,12 @@ public void shouldReturnUnderlying() { assertEquals(underlyingStore, store.wrapped()); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowIfTryingToDeleteFromClosedCachingStore() { - store.close(); - store.delete(bytesKey("key")); + assertThrows(InvalidStateStoreException.class, () -> { + store.close(); + store.delete(bytesKey("key")); + }); } private int addItemsToCache() { @@ -427,13 +503,13 @@ private int addItemsToCache() { public static class CacheFlushListenerStub implements CacheFlushListener { final Deserializer keyDeserializer; - final Deserializer valueDesializer; + final Deserializer valueDeserializer; final Map> forwarded = new HashMap<>(); CacheFlushListenerStub(final Deserializer keyDeserializer, - final Deserializer valueDesializer) { + final Deserializer valueDeserializer) { this.keyDeserializer = keyDeserializer; - this.valueDesializer = valueDesializer; + this.valueDeserializer = valueDeserializer; } @Override @@ -444,8 +520,8 @@ public void apply(final byte[] key, forwarded.put( keyDeserializer.deserialize(null, key), new Change<>( - valueDesializer.deserialize(null, newValue), - valueDesializer.deserialize(null, oldValue))); + valueDeserializer.deserialize(null, newValue), + valueDeserializer.deserialize(null, oldValue))); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 5283c38e7eebd..d8e97b8b72f29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -532,7 +532,8 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { assertThat( messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to." + - " This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + " Note that the built-in numerical serdes do not follow this for negative numbers") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 097ebe9a58953..49a3a95a910b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -169,7 +169,8 @@ public KeyValue transform(final String key, final String value) } @Override - public void close() {} + public void close() { + } }, "store-name"); final String bootstrapServers = "localhost:9092"; @@ -623,7 +624,8 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { assertThat( messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to." + - " This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + + " This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes." + " Note that the built-in numerical serdes do not follow this for negative numbers") ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 67b876ab48ed0..27dcff402faef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -21,16 +21,14 @@ import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreType; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.NoOpReadOnlyStore; import org.apache.kafka.test.MockRecordCollector; +import org.apache.kafka.test.NoOpReadOnlyStore; import org.apache.kafka.test.StateStoreProviderStub; import org.junit.Before; import org.junit.Test; @@ -41,15 +39,15 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class CompositeReadOnlyKeyValueStoreTest { private final String storeName = "my-store"; - private final String storeNameA = "my-storeA"; private StateStoreProviderStub stubProviderTwo; private KeyValueStore stubOneUnderlying; private KeyValueStore otherUnderlyingStore; @@ -64,7 +62,6 @@ public void before() { stubProviderOne.addStore(storeName, stubOneUnderlying); otherUnderlyingStore = newStoreInstance(); stubProviderOne.addStore("other-store", otherUnderlyingStore); - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); theStore = new CompositeReadOnlyKeyValueStore<>( new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())), QueryableStoreTypes.keyValueStore(), @@ -74,9 +71,9 @@ public void before() { private KeyValueStore newStoreInstance() { final KeyValueStore store = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), - Serdes.String(), - Serdes.String()) - .build(); + Serdes.String(), + Serdes.String()) + .build(); final InternalMockProcessorContext context = new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), Serdes.String(), Serdes.String()), new MockRecordCollector()); @@ -88,23 +85,33 @@ private KeyValueStore newStoreInstance() { } @Test - public void shouldReturnNullIfKeyDoesntExist() { + public void shouldReturnNullIfKeyDoesNotExist() { assertNull(theStore.get("whatever")); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnGetNullKey() { - theStore.get(null); + assertThrows(NullPointerException.class, () -> theStore.get(null)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { - theStore.range(null, "to"); + assertThrows(NullPointerException.class, () -> theStore.range(null, "to")); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerExceptionOnRangeNullToKey() { - theStore.range("from", null); + assertThrows(NullPointerException.class, () -> theStore.range("from", null)); + } + + @Test + public void shouldThrowNullPointerExceptionOnReverseRangeNullFromKey() { + assertThrows(NullPointerException.class, () -> theStore.reverseRange(null, "to")); + } + + @Test + public void shouldThrowNullPointerExceptionOnReverseRangeNullToKey() { + assertThrows(NullPointerException.class, () -> theStore.reverseRange("from", null)); } @Test @@ -124,10 +131,7 @@ public void shouldThrowNoSuchElementExceptionWhileNext() { stubOneUnderlying.put("a", "1"); final KeyValueIterator keyValueIterator = theStore.range("a", "b"); keyValueIterator.next(); - try { - keyValueIterator.next(); - fail("Should have thrown NoSuchElementException with next()"); - } catch (final NoSuchElementException e) { } + assertThrows(NoSuchElementException.class, keyValueIterator::next); } @Test @@ -135,19 +139,21 @@ public void shouldThrowNoSuchElementExceptionWhilePeekNext() { stubOneUnderlying.put("a", "1"); final KeyValueIterator keyValueIterator = theStore.range("a", "b"); keyValueIterator.next(); - try { - keyValueIterator.peekNextKey(); - fail("Should have thrown NoSuchElementException with peekNextKey()"); - } catch (final NoSuchElementException e) { } + assertThrows(NoSuchElementException.class, keyValueIterator::peekNextKey); } @Test public void shouldThrowUnsupportedOperationExceptionWhileRemove() { final KeyValueIterator keyValueIterator = theStore.all(); - try { - keyValueIterator.remove(); - fail("Should have thrown UnsupportedOperationException"); - } catch (final UnsupportedOperationException e) { } + assertThrows(UnsupportedOperationException.class, keyValueIterator::remove); + } + + @Test + public void shouldThrowUnsupportedOperationExceptionWhileReverseRange() { + stubOneUnderlying.put("a", "1"); + stubOneUnderlying.put("b", "1"); + final KeyValueIterator keyValueIterator = theStore.reverseRange("a", "b"); + assertThrows(UnsupportedOperationException.class, keyValueIterator::remove); } @Test @@ -155,10 +161,7 @@ public void shouldThrowUnsupportedOperationExceptionWhileRange() { stubOneUnderlying.put("a", "1"); stubOneUnderlying.put("b", "1"); final KeyValueIterator keyValueIterator = theStore.range("a", "b"); - try { - keyValueIterator.remove(); - fail("Should have thrown UnsupportedOperationException"); - } catch (final UnsupportedOperationException e) { } + assertThrows(UnsupportedOperationException.class, keyValueIterator::remove); } @Test @@ -185,6 +188,21 @@ public void shouldSupportRange() { assertEquals(2, results.size()); } + @Test + public void shouldSupportReverseRange() { + stubOneUnderlying.put("a", "a"); + stubOneUnderlying.put("b", "b"); + stubOneUnderlying.put("c", "c"); + + final List> results = toList(theStore.reverseRange("a", "b")); + assertArrayEquals( + asList( + new KeyValue<>("b", "b"), + new KeyValue<>("a", "a") + ).toArray(), + results.toArray()); + } + @Test public void shouldSupportRangeAcrossMultipleKVStores() { final KeyValueStore cache = newStoreInstance(); @@ -199,6 +217,30 @@ public void shouldSupportRangeAcrossMultipleKVStores() { cache.put("x", "x"); final List> results = toList(theStore.range("a", "e")); + assertArrayEquals( + asList( + new KeyValue<>("a", "a"), + new KeyValue<>("b", "b"), + new KeyValue<>("c", "c"), + new KeyValue<>("d", "d") + ).toArray(), + results.toArray()); + } + + @Test + public void shouldSupportReverseRangeAcrossMultipleKVStores() { + final KeyValueStore cache = newStoreInstance(); + stubProviderTwo.addStore(storeName, cache); + + stubOneUnderlying.put("a", "a"); + stubOneUnderlying.put("b", "b"); + stubOneUnderlying.put("z", "z"); + + cache.put("c", "c"); + cache.put("d", "d"); + cache.put("x", "x"); + + final List> results = toList(theStore.reverseRange("a", "e")); assertTrue(results.contains(new KeyValue<>("a", "a"))); assertTrue(results.contains(new KeyValue<>("b", "b"))); assertTrue(results.contains(new KeyValue<>("c", "c"))); @@ -229,24 +271,57 @@ public void shouldSupportAllAcrossMultipleStores() { assertEquals(6, results.size()); } - @Test(expected = InvalidStateStoreException.class) + @Test + public void shouldSupportReverseAllAcrossMultipleStores() { + final KeyValueStore cache = newStoreInstance(); + stubProviderTwo.addStore(storeName, cache); + + stubOneUnderlying.put("a", "a"); + stubOneUnderlying.put("b", "b"); + stubOneUnderlying.put("z", "z"); + + cache.put("c", "c"); + cache.put("d", "d"); + cache.put("x", "x"); + + final List> results = toList(theStore.reverseAll()); + assertTrue(results.contains(new KeyValue<>("a", "a"))); + assertTrue(results.contains(new KeyValue<>("b", "b"))); + assertTrue(results.contains(new KeyValue<>("c", "c"))); + assertTrue(results.contains(new KeyValue<>("d", "d"))); + assertTrue(results.contains(new KeyValue<>("x", "x"))); + assertTrue(results.contains(new KeyValue<>("z", "z"))); + assertEquals(6, results.size()); + } + + @Test public void shouldThrowInvalidStoreExceptionDuringRebalance() { - rebalancing().get("anything"); + assertThrows(InvalidStateStoreException.class, () -> rebalancing().get("anything")); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowInvalidStoreExceptionOnApproximateNumEntriesDuringRebalance() { - rebalancing().approximateNumEntries(); + assertThrows(InvalidStateStoreException.class, () -> rebalancing().approximateNumEntries()); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowInvalidStoreExceptionOnRangeDuringRebalance() { - rebalancing().range("anything", "something"); + assertThrows(InvalidStateStoreException.class, () -> rebalancing().range("anything", "something")); + } + + @Test + public void shouldThrowInvalidStoreExceptionOnReverseRangeDuringRebalance() { + assertThrows(InvalidStateStoreException.class, () -> rebalancing().reverseRange("anything", "something")); } - @Test(expected = InvalidStateStoreException.class) + @Test public void shouldThrowInvalidStoreExceptionOnAllDuringRebalance() { - rebalancing().all(); + assertThrows(InvalidStateStoreException.class, () -> rebalancing().all()); + } + + @Test + public void shouldThrowInvalidStoreExceptionOnReverseAllDuringRebalance() { + assertThrows(InvalidStateStoreException.class, () -> rebalancing().reverseAll()); } @Test @@ -286,7 +361,7 @@ public long approximateNumEntries() { return Long.MAX_VALUE; } }); - stubProviderTwo.addStore(storeNameA, new NoOpReadOnlyStore() { + stubProviderTwo.addStore("my-storeA", new NoOpReadOnlyStore() { @Override public long approximateNumEntries() { return Long.MAX_VALUE; @@ -297,9 +372,10 @@ public long approximateNumEntries() { } private CompositeReadOnlyKeyValueStore rebalancing() { - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); return new CompositeReadOnlyKeyValueStore<>( - new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())), + new WrappingStoreProvider( + singletonList(new StateStoreProviderStub(true)), + StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())), QueryableStoreTypes.keyValueStore(), storeName ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index 62f89495c7f9d..6dc90eac8d417 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -33,9 +33,9 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { @Override protected KeyValueStore createKeyValueStore(final ProcessorContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("my-store"), - (Serde) context.keySerde(), - (Serde) context.valueSerde()); + Stores.inMemoryKeyValueStore("my-store"), + (Serde) context.keySerde(), + (Serde) context.valueSerde()); final KeyValueStore store = storeBuilder.build(); store.init(context, store); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java index 4028b0cd901f3..b8075d49c74d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIteratorTest.java @@ -17,13 +17,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StateSerdes; import org.junit.Before; import org.junit.Test; @@ -33,30 +31,30 @@ public class MergedSortedCacheKeyValueBytesStoreIteratorTest { private final String namespace = "0.0-one"; - private final StateSerdes serdes = new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray()); private KeyValueStore store; private ThreadCache cache; @Before - public void setUp() throws Exception { + public void setUp() { store = new InMemoryKeyValueStore(namespace); cache = new ThreadCache(new LogContext("testCache "), 10000L, new MockStreamsMetrics(new Metrics())); } - @Test - public void shouldIterateOverRange() throws Exception { + public void shouldIterateOverRange() { final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}}; for (int i = 0; i < bytes.length; i += 2) { store.put(Bytes.wrap(bytes[i]), bytes[i]); cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1])); } - final Bytes from = Bytes.wrap(new byte[]{2}); - final Bytes to = Bytes.wrap(new byte[]{9}); - final KeyValueIterator storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to)); + final Bytes from = Bytes.wrap(new byte[] {2}); + final Bytes to = Bytes.wrap(new byte[] {9}); + final KeyValueIterator storeIterator = + new DelegatingPeekingKeyValueIterator<>("store", store.range(from, to)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to); - final MergedSortedCacheKeyValueBytesStoreIterator iterator = new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); + final MergedSortedCacheKeyValueBytesStoreIterator iterator = + new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); final byte[][] values = new byte[8][]; int index = 0; int bytesIndex = 2; @@ -70,7 +68,34 @@ public void shouldIterateOverRange() throws Exception { @Test - public void shouldSkipLargerDeletedCacheValue() throws Exception { + public void shouldReverseIterateOverRange() { + final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}}; + for (int i = 0; i < bytes.length; i += 2) { + store.put(Bytes.wrap(bytes[i]), bytes[i]); + cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1])); + } + + final Bytes from = Bytes.wrap(new byte[] {2}); + final Bytes to = Bytes.wrap(new byte[] {9}); + final KeyValueIterator storeIterator = + new DelegatingPeekingKeyValueIterator<>("store", store.reverseRange(from, to)); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.reverseRange(namespace, from, to); + + final MergedSortedCacheKeyValueBytesStoreIterator iterator = + new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false); + final byte[][] values = new byte[8][]; + int index = 0; + int bytesIndex = 9; + while (iterator.hasNext()) { + final byte[] value = iterator.next().value; + values[index++] = value; + assertArrayEquals(bytes[bytesIndex--], value); + } + iterator.close(); + } + + @Test + public void shouldSkipLargerDeletedCacheValue() { final byte[][] bytes = {{0}, {1}}; store.put(Bytes.wrap(bytes[0]), bytes[0]); cache.put(namespace, Bytes.wrap(bytes[1]), new LRUCacheEntry(null)); @@ -80,7 +105,7 @@ public void shouldSkipLargerDeletedCacheValue() throws Exception { } @Test - public void shouldSkipSmallerDeletedCachedValue() throws Exception { + public void shouldSkipSmallerDeletedCachedValue() { final byte[][] bytes = {{0}, {1}}; cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null)); store.put(Bytes.wrap(bytes[1]), bytes[1]); @@ -90,7 +115,7 @@ public void shouldSkipSmallerDeletedCachedValue() throws Exception { } @Test - public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception { + public void shouldIgnoreIfDeletedInCacheButExistsInStore() { final byte[][] bytes = {{0}}; cache.put(namespace, Bytes.wrap(bytes[0]), new LRUCacheEntry(null)); store.put(Bytes.wrap(bytes[0]), bytes[0]); @@ -99,7 +124,7 @@ public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception { } @Test - public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception { + public void shouldNotHaveNextIfAllCachedItemsDeleted() { final byte[][] bytes = {{0}, {1}, {2}}; for (final byte[] aByte : bytes) { final Bytes aBytes = Bytes.wrap(aByte); @@ -110,7 +135,7 @@ public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception { } @Test - public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception { + public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() { final byte[][] bytes = {{0}, {1}, {2}}; for (final byte[] aByte : bytes) { cache.put(namespace, Bytes.wrap(aByte), new LRUCacheEntry(null)); @@ -119,7 +144,7 @@ public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception { } @Test - public void shouldSkipAllDeletedFromCache() throws Exception { + public void shouldSkipAllDeletedFromCache() { final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}}; for (final byte[] aByte : bytes) { final Bytes aBytes = Bytes.wrap(aByte); @@ -145,7 +170,7 @@ public void shouldSkipAllDeletedFromCache() throws Exception { } @Test - public void shouldPeekNextKey() throws Exception { + public void shouldPeekNextKey() { final KeyValueStore kv = new InMemoryKeyValueStore("one"); final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics())); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; @@ -154,15 +179,13 @@ public void shouldPeekNextKey() throws Exception { cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1])); } - final Bytes from = Bytes.wrap(new byte[]{2}); - final Bytes to = Bytes.wrap(new byte[]{9}); + final Bytes from = Bytes.wrap(new byte[] {2}); + final Bytes to = Bytes.wrap(new byte[] {9}); final KeyValueIterator storeIterator = kv.range(from, to); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from, to); final MergedSortedCacheKeyValueBytesStoreIterator iterator = - new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, - storeIterator - ); + new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); final byte[][] values = new byte[8][]; int index = 0; int bytesIndex = 2; @@ -175,9 +198,38 @@ public void shouldPeekNextKey() throws Exception { iterator.close(); } + @Test + public void shouldPeekNextKeyReverse() { + final KeyValueStore kv = new InMemoryKeyValueStore("one"); + final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics())); + final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; + for (int i = 0; i < bytes.length - 1; i += 2) { + kv.put(Bytes.wrap(bytes[i]), bytes[i]); + cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1])); + } + + final Bytes from = Bytes.wrap(new byte[] {2}); + final Bytes to = Bytes.wrap(new byte[] {9}); + final KeyValueIterator storeIterator = kv.reverseRange(from, to); + final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.reverseRange(namespace, from, to); + + final MergedSortedCacheKeyValueBytesStoreIterator iterator = + new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false); + final byte[][] values = new byte[8][]; + int index = 0; + int bytesIndex = 9; + while (iterator.hasNext()) { + final byte[] keys = iterator.peekNextKey().get(); + values[index++] = keys; + assertArrayEquals(bytes[bytesIndex--], keys); + iterator.next(); + } + iterator.close(); + } + private MergedSortedCacheKeyValueBytesStoreIterator createIterator() { final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace); final KeyValueIterator storeIterator = new DelegatingPeekingKeyValueIterator<>("store", store.all()); - return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); + return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index 504aa9b760f19..5937af03121d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -40,9 +40,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { @Override protected KeyValueStore createKeyValueStore(final ProcessorContext context) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("my-store"), - (Serde) context.keySerde(), - (Serde) context.valueSerde()); + Stores.persistentKeyValueStore("my-store"), + (Serde) context.keySerde(), + (Serde) context.valueSerde()); final KeyValueStore store = storeBuilder.build(); store.init(context, store); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index a8e278f03071a..c61996cbf88d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -400,7 +400,7 @@ public void shouldHandleDeletesOnRestoreAll() { } @Test - public void shouldHandleDeletesAndPutbackOnRestoreAll() { + public void shouldHandleDeletesAndPutBackOnRestoreAll() { final List> entries = new ArrayList<>(); entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8))); entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8))); @@ -521,14 +521,14 @@ public void shouldThrowNullPointerExceptionOnRange() { () -> rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")))); } - @Test(expected = ProcessorStateException.class) + @Test public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException { rocksDBStore.init(context, rocksDBStore); Utils.delete(dir); rocksDBStore.put( new Bytes(stringSerializer.serialize(null, "anyKey")), stringSerializer.serialize(null, "anyValue")); - rocksDBStore.flush(); + assertThrows(ProcessorStateException.class, () -> rocksDBStore.flush()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index 0108bbbf4816a..042039c87735a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -271,6 +271,71 @@ private void iteratorsShouldNotMigrateData() { } assertFalse(it.hasNext()); } + + try (final KeyValueIterator itAll = rocksDBStore.reverseAll()) { + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key8".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key7".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 7777777 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '7', '7', '7', '7', '7', '7', '7'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key5".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 55555 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key4".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 4444 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key2".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key11".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, keyValue.value); + } + { + final KeyValue keyValue = itAll.next(); + assertArrayEquals("key1".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 1 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, keyValue.value); + } + assertFalse(itAll.hasNext()); + } + + try (final KeyValueIterator it = + rocksDBStore.reverseRange(new Bytes("key2".getBytes()), new Bytes("key5".getBytes()))) { + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key5".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 55555 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value); + } + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key4".getBytes(), keyValue.key.get()); + // unknown timestamp == -1 plus value == 4444 + assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value); + } + { + final KeyValue keyValue = it.next(); + assertArrayEquals("key2".getBytes(), keyValue.key.get()); + assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value); + } + assertFalse(it.hasNext()); + } } private void verifyOldAndNewColumnFamily() throws Exception {