Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;

import java.util.List;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down Expand Up @@ -90,11 +91,22 @@ public KeyValueIterator<K, V> range(final K from,
return wrapped().range(from, to);
}

@Override
public KeyValueIterator<K, V> reverseRange(final K from,
final K to) {
return wrapped().reverseRange(from, to);
}

@Override
public KeyValueIterator<K, V> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<K, V> reverseAll() {
return wrapped().reverseAll();
}

@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,22 @@ public KeyValueIterator<K, V> range(final K from,
return wrapped().range(from, to);
}

@Override
public KeyValueIterator<K, V> reverseRange(final K from,
final K to) {
return wrapped().reverseRange(from, to);
}

@Override
public KeyValueIterator<K, V> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<K, V> reverseAll() {
return wrapped().reverseAll();
}

@Override
public long approximateNumEntries() {
return wrapped().approximateNumEntries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
/**
* Update the value associated with this key.
*
* @param key The key to associate the value to
* @param key The key to associate the value to
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
Expand All @@ -42,7 +42,7 @@ public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K
/**
* Update the value associated with this key, unless a value is already associated with the key.
*
* @param key The key to associate the value to
* @param key The key to associate the value to
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @return The old value or {@code null} if there is no such key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* A key-value store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes are expected.
*
* <p>
* Please note that this contract defines the thread-safe read functionality only; it does not
* guarantee anything about whether the actual instance is writable by another thread, or
* whether it uses some locking mechanism under the hood. For this reason, making dependencies
Expand All @@ -38,35 +38,68 @@ public interface ReadOnlyKeyValueStore<K, V> {
*
* @param key The key to fetch
* @return The value or null if no value is found.
* @throws NullPointerException If null is used for key.
* @throws NullPointerException If null is used for key.
* @throws InvalidStateStoreException if the store is not initialized
*/
V get(K key);

/**
* Get an iterator over a given range of keys. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values. No ordering guarantees are provided.
* @param from The first key that could be in the range
* @param to The last key that could be in the range
* @return The iterator for this range.
* @throws NullPointerException If null is used for from or to.
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @param from The first key that could be in the range, where iteration starts from.
* @param to The last key that could be in the range, where iteration ends.
* @return The iterator for this range, from smallest to largest bytes.
* @throws NullPointerException If null is used for from or to.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> range(K from, K to);

/**
* Get a reverse iterator over a given range of keys. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @param from The first key that could be in the range, where iteration ends.
* @param to The last key that could be in the range, where iteration starts from.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit tricky to say that to is the variable where iteration starts from 😉 But I can see it both ways, so being clear in the javadocs is good enough for me

* @return The reverse iterator for this range, from largest to smallest key bytes.
* @throws NullPointerException If null is used for from or to.
* @throws InvalidStateStoreException if the store is not initialized
*/
default KeyValueIterator<K, V> reverseRange(K from, K to) {
throw new UnsupportedOperationException();
}

/**
* Return an iterator over all keys in this store. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values. No ordering guarantees are provided.
* @return An iterator of all key/value pairs in the store.
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @return An iterator of all key/value pairs in the store, from smallest to largest bytes.
* @throws InvalidStateStoreException if the store is not initialized
*/
KeyValueIterator<K, V> all();

/**
* Return an approximate count of key-value mappings in this store.
* Return a reverse iterator over all keys in this store. This iterator must be closed after use.
* The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
* and must not return null values.
* Order is not guaranteed as bytes lexicographical ordering might not represent key order.
*
* @return An reverse iterator of all key/value pairs in the store, from largest to smallest key bytes.
* @throws InvalidStateStoreException if the store is not initialized
*/
default KeyValueIterator<K, V> reverseAll() {
throw new UnsupportedOperationException();
}

/**
* Return an approximate count of key-value mappings in this store.
* <p>
* The count is not guaranteed to be exact in order to accommodate stores
* where an exact count is expensive to calculate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V, VS> implements KeyValueIterator<K, V> {
private final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator;
private final KeyValueIterator<KS, VS> storeIterator;
private final boolean forward;

AbstractMergedSortedCacheStoreIterator(final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator,
final KeyValueIterator<KS, VS> storeIterator) {
final KeyValueIterator<KS, VS> storeIterator,
final boolean forward) {
this.cacheIterator = cacheIterator;
this.storeIterator = storeIterator;
this.forward = forward;
}

abstract int compare(final Bytes cacheKey, final KS storeKey);
Expand Down Expand Up @@ -87,14 +90,32 @@ public KeyValue<K, V> next() {
}

final int comparison = compare(nextCacheKey, nextStoreKey);
if (comparison > 0) {
return nextStoreValue(nextStoreKey);
} else if (comparison < 0) {
return nextCacheValue(nextCacheKey);
return chooseNextValue(nextCacheKey, nextStoreKey, comparison);
}

private KeyValue<K, V> chooseNextValue(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
if (forward) {
if (comparison > 0) {
return nextStoreValue(nextStoreKey);
} else if (comparison < 0) {
return nextCacheValue(nextCacheKey);
} else {
// skip the same keyed element
storeIterator.next();
return nextCacheValue(nextCacheKey);
}
} else {
// skip the same keyed element
storeIterator.next();
return nextCacheValue(nextCacheKey);
if (comparison < 0) {
return nextStoreValue(nextStoreKey);
} else if (comparison > 0) {
return nextCacheValue(nextCacheKey);
} else {
// skip the same keyed element
storeIterator.next();
return nextCacheValue(nextCacheKey);
}
}
}

Expand Down Expand Up @@ -136,14 +157,32 @@ public K peekNextKey() {
}

final int comparison = compare(nextCacheKey, nextStoreKey);
if (comparison > 0) {
return deserializeStoreKey(nextStoreKey);
} else if (comparison < 0) {
return deserializeCacheKey(nextCacheKey);
return chooseNextKey(nextCacheKey, nextStoreKey, comparison);
}

private K chooseNextKey(final Bytes nextCacheKey,
final KS nextStoreKey,
final int comparison) {
if (forward) {
if (comparison > 0) {
return deserializeStoreKey(nextStoreKey);
} else if (comparison < 0) {
return deserializeCacheKey(nextCacheKey);
} else {
// skip the same keyed element
storeIterator.next();
return deserializeCacheKey(nextCacheKey);
}
} else {
// skip the same keyed element
storeIterator.next();
return deserializeCacheKey(nextCacheKey);
if (comparison < 0) {
return deserializeStoreKey(nextStoreKey);
} else if (comparison > 0) {
return deserializeCacheKey(nextCacheKey);
} else {
// skip the same keyed element
storeIterator.next();
return deserializeCacheKey(nextCacheKey);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final long from,
final long to) {
if (keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public void init(final ProcessorContext context,
streamThread = Thread.currentThread();
}

@SuppressWarnings("unchecked")
private void initInternal(final ProcessorContext context) {
this.context = (InternalProcessorContext) context;

Expand Down Expand Up @@ -240,16 +239,34 @@ private byte[] getInternal(final Bytes key) {
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().range(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}

validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().reverseRange(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseRange(cacheName, from, to);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
}

@Override
Expand All @@ -258,7 +275,16 @@ public KeyValueIterator<Bytes, byte[]> all() {
final KeyValueIterator<Bytes, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().all());
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().all(cacheName);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().reverseAll());
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseAll(cacheName);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
}

@Override
Expand Down Expand Up @@ -309,7 +335,7 @@ public void close() {
);
if (!suppressed.isEmpty()) {
throwSuppressed("Caught an exception while closing caching key value store for store " + name(),
suppressed);
suppressed);
}
} finally {
lock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFro
final long earliestSessionEndTime,
final long latestSessionStartTime) {
if (keyFrom.compareTo(keyTo) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final long timeFrom,
final long timeTo) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
+ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this for negative numbers");
return KeyValueIterators.emptyIterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,22 @@ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
return wrapped().range(from, to);
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
return wrapped().reverseRange(from, to);
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {
return wrapped().all();
}

@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
return wrapped().reverseAll();
}

void log(final Bytes key,
final byte[] value) {
context.logChange(name(), key, value, context.timestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ public K peekNextKey() {

@Override
public boolean hasNext() {
while ((current == null || !current.hasNext())
&& storeIterator.hasNext()) {
while ((current == null || !current.hasNext()) && storeIterator.hasNext()) {
close();
current = nextIteratorFunction.apply(storeIterator.next());
}
Expand Down
Loading