diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 34639e3362d5d..22f3a0249a4c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -86,6 +86,13 @@ public KeyValueIterator fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + final List searchSpace = keySchema.segmentsToSearch(segments, from, to); final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index bb347de9d1ad1..95e20b475b0b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -30,11 +30,15 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class CachingKeyValueStore extends WrappedStateStore, byte[], byte[]> implements KeyValueStore, CachedStateStore { + private static final Logger LOG = LoggerFactory.getLogger(CachingKeyValueStore.class); + private CacheFlushListener flushListener; private boolean sendOldValues; private String cacheName; @@ -228,6 +232,13 @@ private byte[] getInternal(final Bytes key) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + validateStoreOpen(); final KeyValueIterator storeIterator = wrapped().range(from, to); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index edea7e0d4bb8e..9599105857df6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -26,11 +26,15 @@ import org.apache.kafka.streams.state.SessionStore; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class CachingSessionStore extends WrappedStateStore, byte[], byte[]> implements SessionStore, CachedStateStore { + private static final Logger LOG = LoggerFactory.getLogger(CachingSessionStore.class); + private final SessionKeySchema keySchema; private final SegmentedCacheFunction cacheFunction; private String cacheName; @@ -153,6 +157,13 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + validateStoreOpen(); final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0edd8f265b4a4..3875a7914f278 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -28,11 +28,15 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class CachingWindowStore extends WrappedStateStore, byte[], byte[]> implements WindowStore, CachedStateStore { + private static final Logger LOG = LoggerFactory.getLogger(CachingWindowStore.class); + private final long windowSize; private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema(); @@ -196,6 +200,13 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index b37c39e3b2076..07337806922cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -28,12 +28,16 @@ import java.util.Iterator; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InMemoryKeyValueStore implements KeyValueStore { private final String name; private final ConcurrentNavigableMap map; private volatile boolean open = false; + private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class); + public InMemoryKeyValueStore(final String name) { this.name = name; @@ -111,6 +115,14 @@ public byte[] delete(final Bytes key) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { + + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + return new DelegatingPeekingKeyValueIterator<>( name, new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator())); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 7d1b279fb38a8..0cee668e2a8d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -131,7 +131,7 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) { expiredRecordSensor.record(); - LOG.debug("Skipping record for expired segment."); + LOG.warn("Skipping record for expired segment."); } else { if (value != null) { this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>()); @@ -185,6 +185,13 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final long timeTo) { removeExpiredSegments(); + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + // add one b/c records expire exactly retentionPeriod ms after created final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index c3cc834ff6596..4bf42def92eb7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -23,15 +23,27 @@ import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MemoryNavigableLRUCache extends MemoryLRUCache { + private static final Logger LOG = LoggerFactory.getLogger(MemoryNavigableLRUCache.class); + public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { super(name, maxCacheSize); } @Override public KeyValueIterator range(final Bytes from, final Bytes to) { + + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + final TreeMap treeMap = toTreeMap(); return new DelegatingPeekingKeyValueIterator<>(name(), new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet() diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3e3e4783bb345..3cf8e94de142b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -284,6 +284,14 @@ public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { Objects.requireNonNull(from, "from cannot be null"); Objects.requireNonNull(to, "to cannot be null"); + + if (from.compareTo(to) > 0) { + log.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + validateStoreOpen(); final KeyValueIterator rocksDBRangeIterator = dbAccessor.range(from, to); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 09a373689a416..7df6532a188cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Map; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -379,6 +381,21 @@ public void shouldDeleteFromStore() { assertNull(store.get(2)); } + @Test + public void shouldReturnSameResultsForGetAndRangeWithEqualKeys() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(1, "one")); + entries.add(new KeyValue<>(2, "two")); + entries.add(new KeyValue<>(3, "three")); + + store.putAll(entries); + + final Iterator> iterator = store.range(2, 2); + + assertEquals(iterator.next().value, store.get(2)); + assertFalse(iterator.hasNext()); + } + @Test public void shouldNotThrowConcurrentModificationException() { store.put(0, "zero"); @@ -389,4 +406,18 @@ public void shouldNotThrowConcurrentModificationException() { assertEquals(new KeyValue<>(0, "zero"), results.next()); } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final KeyValueIterator iterator = store.range(-1, 1); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 66b27f0a2526c..48c96a28f6c86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; @@ -28,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.TestUtils; @@ -50,6 +52,7 @@ import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertArrayEquals; @@ -339,6 +342,22 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() flushListener.forwarded.clear(); } + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator, byte[]> singleKeyIterator = cachingStore.findSessions(keyAA, 0L, 10L); + final KeyValueIterator, byte[]> keyRangeIterator = cachingStore.findSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test public void shouldClearNamespaceCacheOnClose() { final Windowed a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); @@ -412,6 +431,23 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() { cachingStore.put(null, "1".getBytes()); } + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + final KeyValueIterator iterator = cachingStore.findSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } + private List, byte[]>> addSessionsUntilOverflow(final String... sessionIds) { final Random random = new Random(); final List, byte[]>> results = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index da49dded4c7f4..b0ccc15ca6639 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -61,6 +62,7 @@ import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -234,6 +236,10 @@ private static Bytes bytesKey(final String key) { return Bytes.wrap(key.getBytes()); } + private String stringFrom(final byte[] from) { + return Serdes.String().deserializer().deserialize("", from); + } + @Test public void shouldPutFetchRangeFromCache() { cachingStore.put(bytesKey("a"), bytesValue("a")); @@ -543,6 +549,22 @@ public void shouldFetchAndIterateOverKeyRange() { ); } + @Test + public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() { + cachingStore.put(bytesKey("a"), bytesValue("0001"), 0); + cachingStore.put(bytesKey("aa"), bytesValue("0002"), 1); + cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2); + cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3); + + final WindowStoreIterator singleKeyIterator = cachingStore.fetch(bytesKey("aa"), 0L, 5L); + final KeyValueIterator, byte[]> keyRangeIterator = cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L); + + assertEquals(stringFrom(singleKeyIterator.next().value), stringFrom(keyRangeIterator.next().value)); + assertEquals(stringFrom(singleKeyIterator.next().value), stringFrom(keyRangeIterator.next().value)); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test(expected = NullPointerException.class) public void shouldThrowNullPointerExceptionOnPutNullKey() { cachingStore.put(null, bytesValue("anyValue")); @@ -568,6 +590,23 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() { cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L)); } + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + + final KeyValueIterator iterator = cachingStore.fetch(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } + private static KeyValue, byte[]> windowedPair(final String key, final String value, final long timestamp) { return KeyValue.pair( new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index e7f5ed0689ca0..df924ec43cb9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -568,6 +568,24 @@ public void shouldNotExpireFromOpenIterator() { assertFalse(iterator2.hasNext()); } + @Test + public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() { + windowStore = createInMemoryWindowStore(context, false); + + windowStore.put(1, "one", 0L); + windowStore.put(2, "two", 1L); + windowStore.put(2, "two", 2L); + windowStore.put(3, "three", 3L); + + final WindowStoreIterator singleKeyIterator = windowStore.fetch(2, 0L, 5L); + final KeyValueIterator, String> keyRangeIterator = windowStore.fetch(2, 2, 0L, 5L); + + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test public void shouldNotThrowExceptionWhenFetchRangeIsExpired() { windowStore = createInMemoryWindowStore(context, false); @@ -579,4 +597,20 @@ public void shouldNotThrowExceptionWhenFetchRangeIsExpired() { assertFalse(iterator.hasNext()); } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + windowStore = createInMemoryWindowStore(context, false); + + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 0786c3723c856..1821913d4771f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.Stores; @@ -41,6 +42,7 @@ import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.valuesToList; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -224,6 +226,22 @@ public void shouldFetchExactKeys() { } } + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1)), 0L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(2, 3)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(4, 5)), 2L); + sessionStore.put(new Windowed<>("aaa", new SessionWindow(6, 7)), 3L); + + final KeyValueIterator, Long> singleKeyIterator = sessionStore.findSessions("aa", 0L, 10L); + final KeyValueIterator, Long> keyRangeIterator = sessionStore.findSessions("aa", "aa", 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test(expected = NullPointerException.class) public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { sessionStore.findSessions(null, 1L, 2L); @@ -263,6 +281,21 @@ public void shouldThrowNullPointerExceptionOnRemoveNullKey() { public void shouldThrowNullPointerExceptionOnPutNullKey() { sessionStore.put(null, 1L); } - + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final String keyFrom = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", -1)); + final String keyTo = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", 1)); + + final KeyValueIterator iterator = sessionStore.findSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 42b1b8c37f9b7..7405e060c580a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -33,6 +33,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; @@ -58,6 +60,7 @@ import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1398,6 +1401,40 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { assertThat(toList(windowStore.fetch(key3, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE))), equalTo(expectedKey3)); } + @Test + public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() { + windowStore = createWindowStore(context, false); + + windowStore.put(1, "one", 0L); + windowStore.put(2, "two", 1L); + windowStore.put(2, "two", 2L); + windowStore.put(3, "three", 3L); + + final WindowStoreIterator singleKeyIterator = windowStore.fetch(2, 0L, 5L); + final KeyValueIterator, String> keyRangeIterator = windowStore.fetch(2, 2, 0L, 5L); + + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + windowStore = createWindowStore(context, false); + + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } + private void putFirstBatch(final WindowStore store, @SuppressWarnings("SameParameterValue") final long startTime, final InternalMockProcessorContext context) {