From 1167df5d515120ab8a1a8bad15cd0e34c6fc060a Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 28 Mar 2019 17:36:06 -0700 Subject: [PATCH 01/10] Guard against crashing on invalid key range queries in CachingXXXStores --- .../streams/state/internals/CachingKeyValueStore.java | 10 ++++++++++ .../streams/state/internals/CachingSessionStore.java | 10 ++++++++++ .../streams/state/internals/CachingWindowStore.java | 10 ++++++++++ 3 files changed, 30 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index bb347de9d1ad1..0b2b7042db043 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -30,11 +30,15 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class CachingKeyValueStore extends WrappedStateStore, byte[], byte[]> implements KeyValueStore, CachedStateStore { + private static final Logger LOG = LoggerFactory.getLogger(CachingKeyValueStore.class); + private CacheFlushListener flushListener; private boolean sendOldValues; private String cacheName; @@ -228,6 +232,12 @@ private byte[] getInternal(final Bytes key) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { + // Make sure this is a valid query + if (from.compareTo(to) > 0) { + LOG.debug("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); + return KeyValueIterators.emptyIterator(); + } + validateStoreOpen(); final KeyValueIterator storeIterator = wrapped().range(from, to); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index edea7e0d4bb8e..3e339ccb175dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -26,11 +26,15 @@ import org.apache.kafka.streams.state.SessionStore; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class CachingSessionStore extends WrappedStateStore, byte[], byte[]> implements SessionStore, CachedStateStore { + private static final Logger LOG = LoggerFactory.getLogger(CachingSessionStore.class); + private final SessionKeySchema keySchema; private final SegmentedCacheFunction cacheFunction; private String cacheName; @@ -153,6 +157,12 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { + // Make sure this is a valid query + if (keyFrom.compareTo(keyTo) > 0) { + LOG.debug("Returning empty iterator for findSessions call with invalid range: keyFrom > keyTo."); + return KeyValueIterators.emptyIterator(); + } + validateStoreOpen(); final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, earliestSessionEndTime)); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0edd8f265b4a4..62cca68b80c73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -28,11 +28,15 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class CachingWindowStore extends WrappedStateStore, byte[], byte[]> implements WindowStore, CachedStateStore { + private static final Logger LOG = LoggerFactory.getLogger(CachingWindowStore.class); + private final long windowSize; private final SegmentedBytesStore.KeySchema keySchema = new WindowKeySchema(); @@ -196,6 +200,12 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { + // Make sure this is a valid query + if (from.compareTo(to) > 0) { + LOG.debug("Returning empty iterator for fetch with invalid range: keyFrom > keyTo."); + return KeyValueIterators.emptyIterator(); + } + // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); From 09ace6b1afb13a37a92fa79c980d50b153457056 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 28 Mar 2019 17:49:03 -0700 Subject: [PATCH 02/10] Added guards for range queries in in-memory stores --- .../streams/state/internals/InMemoryKeyValueStore.java | 10 ++++++++++ .../streams/state/internals/InMemoryWindowStore.java | 6 ++++++ 2 files changed, 16 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index b37c39e3b2076..af31544ae67f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -28,12 +28,16 @@ import java.util.Iterator; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InMemoryKeyValueStore implements KeyValueStore { private final String name; private final ConcurrentNavigableMap map; private volatile boolean open = false; + private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class); + public InMemoryKeyValueStore(final String name) { this.name = name; @@ -111,6 +115,12 @@ public byte[] delete(final Bytes key) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { + // Make sure this is a valid query + if (from.compareTo(to) > 0) { + LOG.debug("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); + return KeyValueIterators.emptyIterator(); + } + return new DelegatingPeekingKeyValueIterator<>( name, new InMemoryKeyValueIterator(this.map.subMap(from, true, to, true).entrySet().iterator())); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 7d1b279fb38a8..acef6a9dd0840 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -185,6 +185,12 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final long timeTo) { removeExpiredSegments(); + // Make sure this is a valid query + if (from.compareTo(to) > 0) { + LOG.debug("Returning empty iterator for fetch with invalid range: keyFrom > keyTo."); + return KeyValueIterators.emptyIterator(); + } + // add one b/c records expire exactly retentionPeriod ms after created final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1); From ac27e8578f69d60a56ba28232d7e96c76957f66c Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 4 Apr 2019 15:33:12 -0700 Subject: [PATCH 03/10] Changed DEBUG log level to WARN --- .../kafka/streams/state/internals/CachingKeyValueStore.java | 2 +- .../kafka/streams/state/internals/CachingSessionStore.java | 2 +- .../kafka/streams/state/internals/CachingWindowStore.java | 2 +- .../kafka/streams/state/internals/InMemoryKeyValueStore.java | 2 +- .../kafka/streams/state/internals/InMemoryWindowStore.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 0b2b7042db043..7de17e31c2ab4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -234,7 +234,7 @@ public KeyValueIterator range(final Bytes from, final Bytes to) { // Make sure this is a valid query if (from.compareTo(to) > 0) { - LOG.debug("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 3e339ccb175dc..26bb3d26ef685 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -159,7 +159,7 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final long latestSessionStartTime) { // Make sure this is a valid query if (keyFrom.compareTo(keyTo) > 0) { - LOG.debug("Returning empty iterator for findSessions call with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for findSessions call with invalid range: keyFrom > keyTo."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 62cca68b80c73..602885bb9f16d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -202,7 +202,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final long timeTo) { // Make sure this is a valid query if (from.compareTo(to) > 0) { - LOG.debug("Returning empty iterator for fetch with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for fetch with invalid range: keyFrom > keyTo."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index af31544ae67f4..19f0551ed505e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -117,7 +117,7 @@ public byte[] delete(final Bytes key) { public KeyValueIterator range(final Bytes from, final Bytes to) { // Make sure this is a valid query if (from.compareTo(to) > 0) { - LOG.debug("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index acef6a9dd0840..57564f85fee9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -131,7 +131,7 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) { expiredRecordSensor.record(); - LOG.debug("Skipping record for expired segment."); + LOG.warn("Skipping record for expired segment."); } else { if (value != null) { this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>()); From b961d0a35b42c96488688c06dd8bce03bf07a9ad Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 5 Apr 2019 12:24:50 -0700 Subject: [PATCH 04/10] Addressing comments from Bruno's review --- .../kafka/streams/state/internals/CachingKeyValueStore.java | 3 +-- .../kafka/streams/state/internals/CachingSessionStore.java | 3 +-- .../kafka/streams/state/internals/CachingWindowStore.java | 3 +-- .../kafka/streams/state/internals/InMemoryKeyValueStore.java | 4 ++-- .../kafka/streams/state/internals/InMemoryWindowStore.java | 3 +-- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 7de17e31c2ab4..61ca64a2c7fa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -232,9 +232,8 @@ private byte[] getInternal(final Bytes key) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { - // Make sure this is a valid query if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 26bb3d26ef685..8441be967b9d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -157,9 +157,8 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - // Make sure this is a valid query if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for findSessions call with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 602885bb9f16d..b44d2984a5534 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -200,9 +200,8 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final Bytes to, final long timeFrom, final long timeTo) { - // Make sure this is a valid query if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 19f0551ed505e..7aaa25ef228bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -115,9 +115,9 @@ public byte[] delete(final Bytes key) { @Override public KeyValueIterator range(final Bytes from, final Bytes to) { - // Make sure this is a valid query + if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for range query with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 57564f85fee9e..148a439227740 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -185,9 +185,8 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final long timeTo) { removeExpiredSegments(); - // Make sure this is a valid query if (from.compareTo(to) > 0) { - LOG.debug("Returning empty iterator for fetch with invalid range: keyFrom > keyTo."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); return KeyValueIterators.emptyIterator(); } From cef3de561df9385b7db74fcc26c6829a3c6a99aa Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 5 Apr 2019 13:40:28 -0700 Subject: [PATCH 05/10] Added unit tests, guard range in MemoryNavigableLRUCache, expanded error messages --- .../state/internals/CachingKeyValueStore.java | 4 +++- .../streams/state/internals/CachingSessionStore.java | 4 +++- .../streams/state/internals/CachingWindowStore.java | 4 +++- .../state/internals/InMemoryKeyValueStore.java | 4 +++- .../streams/state/internals/InMemoryWindowStore.java | 4 +++- .../state/internals/MemoryNavigableLRUCache.java | 12 ++++++++++++ .../state/internals/AbstractKeyValueStoreTest.java | 5 +++++ .../state/internals/CachingSessionStoreTest.java | 8 ++++++++ .../state/internals/CachingWindowStoreTest.java | 7 +++++++ .../state/internals/InMemoryWindowStoreTest.java | 6 ++++++ 10 files changed, 53 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 61ca64a2c7fa5..95e20b475b0b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -233,7 +233,9 @@ private byte[] getInternal(final Bytes key) { public KeyValueIterator range(final Bytes from, final Bytes to) { if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 8441be967b9d2..9599105857df6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -158,7 +158,9 @@ public KeyValueIterator, byte[]> findSessions(final Bytes keyFro final long earliestSessionEndTime, final long latestSessionStartTime) { if (keyFrom.compareTo(keyTo) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index b44d2984a5534..3875a7914f278 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -201,7 +201,9 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, final long timeFrom, final long timeTo) { if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 7aaa25ef228bc..07337806922cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -117,7 +117,9 @@ public byte[] delete(final Bytes key) { public KeyValueIterator range(final Bytes from, final Bytes to) { if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 148a439227740..0cee668e2a8d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -186,7 +186,9 @@ public KeyValueIterator, byte[]> fetch(final Bytes from, removeExpiredSegments(); if (from.compareTo(to) > 0) { - LOG.warn("Returning empty iterator for fetch with invalid key range: from > to."); + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); return KeyValueIterators.emptyIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java index c3cc834ff6596..4bf42def92eb7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java @@ -23,15 +23,27 @@ import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MemoryNavigableLRUCache extends MemoryLRUCache { + private static final Logger LOG = LoggerFactory.getLogger(MemoryNavigableLRUCache.class); + public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { super(name, maxCacheSize); } @Override public KeyValueIterator range(final Bytes from, final Bytes to) { + + if (from.compareTo(to) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + final TreeMap treeMap = toTreeMap(); return new DelegatingPeekingKeyValueIterator<>(name(), new MemoryNavigableLRUCache.CacheIterator(treeMap.navigableKeySet() diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 09a373689a416..9459a2bc111f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -389,4 +389,9 @@ public void shouldNotThrowConcurrentModificationException() { assertEquals(new KeyValue<>(0, "zero"), results.next()); } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + store.range(-1, 1); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 66b27f0a2526c..d27300d325ad6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; @@ -412,6 +413,13 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() { cachingStore.put(null, "1".getBytes()); } + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + cachingStore.findSessions(keyFrom, keyTo, 0L, 10L); + } + private List, byte[]>> addSessionsUntilOverflow(final String... sessionIds) { final Random random = new Random(); final List, byte[]>> results = new ArrayList<>(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index da49dded4c7f4..37f9a9b0a707c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -568,6 +568,13 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() { cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L)); } + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); + final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); + cachingStore.fetch(keyFrom, keyTo, 0L, 10L); + } + private static KeyValue, byte[]> windowedPair(final String key, final String value, final long timestamp) { return KeyValue.pair( new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index e7f5ed0689ca0..74bbdb2e6fa29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -579,4 +579,10 @@ public void shouldNotThrowExceptionWhenFetchRangeIsExpired() { assertFalse(iterator.hasNext()); } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + windowStore = createInMemoryWindowStore(context, false); + windowStore.fetch(-1, 1, 0L, 10L); + } } From f2a556bfd23ffbf44f817eab305614e5d466f540 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 5 Apr 2019 14:24:04 -0700 Subject: [PATCH 06/10] Use LogAppender to catch and check logged messages in unit tests --- .../state/internals/AbstractKeyValueStoreTest.java | 10 ++++++++++ .../state/internals/CachingSessionStoreTest.java | 10 ++++++++++ .../state/internals/CachingWindowStoreTest.java | 10 ++++++++++ .../state/internals/InMemoryWindowStoreTest.java | 9 +++++++++ 4 files changed, 39 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 9459a2bc111f1..4faf2d23d61cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; @@ -38,6 +39,7 @@ import java.util.List; import java.util.Map; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -392,6 +394,14 @@ public void shouldNotThrowConcurrentModificationException() { @Test public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + store.range(-1, 1); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index d27300d325ad6..6ad011e206b21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.TestUtils; @@ -51,6 +52,7 @@ import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertArrayEquals; @@ -415,9 +417,17 @@ public void shouldThrowNullPointerExceptionOnPutNullKey() { @Test public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); cachingStore.findSessions(keyFrom, keyTo, 0L, 10L); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); } private List, byte[]>> addSessionsUntilOverflow(final String... sessionIds) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 37f9a9b0a707c..248134ed8a020 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -61,6 +62,7 @@ import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -570,9 +572,17 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() { @Test public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); cachingStore.fetch(keyFrom, keyTo, 0L, 10L); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); } private static KeyValue, byte[]> windowedPair(final String key, final String value, final long timestamp) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index 74bbdb2e6fa29..4cfa1a303e02f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -583,6 +583,15 @@ public void shouldNotThrowExceptionWhenFetchRangeIsExpired() { @Test public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { windowStore = createInMemoryWindowStore(context, false); + + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + windowStore.fetch(-1, 1, 0L, 10L); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); } } From b2deb14f10bf3bf09c05bfb1d3070419dd91d965 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Apr 2019 16:17:08 -0700 Subject: [PATCH 07/10] Extend check & log error message to RocksDB stores as well --- .../internals/AbstractRocksDBSegmentedBytesStore.java | 7 +++++++ .../kafka/streams/state/internals/RocksDBStore.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 34639e3362d5d..22f3a0249a4c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -86,6 +86,13 @@ public KeyValueIterator fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { + if (keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + final List searchSpace = keySchema.segmentsToSearch(segments, from, to); final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 3e3e4783bb345..3cf8e94de142b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -284,6 +284,14 @@ public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { Objects.requireNonNull(from, "from cannot be null"); Objects.requireNonNull(to, "to cannot be null"); + + if (from.compareTo(to) > 0) { + log.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + validateStoreOpen(); final KeyValueIterator rocksDBRangeIterator = dbAccessor.range(from, to); From 61e86c8d43c85402f07f56bba4ffac27ce9b177b Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Apr 2019 16:23:00 -0700 Subject: [PATCH 08/10] Verify empty iterator returned when fetching from negative starting key range --- .../streams/state/internals/AbstractKeyValueStoreTest.java | 3 ++- .../streams/state/internals/CachingSessionStoreTest.java | 4 +++- .../kafka/streams/state/internals/CachingWindowStoreTest.java | 4 +++- .../streams/state/internals/InMemoryWindowStoreTest.java | 3 ++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 4faf2d23d61cb..ac60ea7942151 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -397,7 +397,8 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - store.range(-1, 1); + final KeyValueIterator iterator = store.range(-1, 1); + assertFalse(iterator.hasNext()); final List messages = appender.getMessages(); assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 6ad011e206b21..9fb71b4374646 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -422,7 +422,9 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); - cachingStore.findSessions(keyFrom, keyTo, 0L, 10L); + + final KeyValueIterator iterator = cachingStore.findSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); final List messages = appender.getMessages(); assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 248134ed8a020..4cca4bd9f120b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -577,7 +577,9 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); final Bytes keyTo = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1)); - cachingStore.fetch(keyFrom, keyTo, 0L, 10L); + + final KeyValueIterator iterator = cachingStore.fetch(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); final List messages = appender.getMessages(); assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index 4cfa1a303e02f..f2ccfbd195cea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -587,7 +587,8 @@ public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - windowStore.fetch(-1, 1, 0L, 10L); + final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L); + assertFalse(iterator.hasNext()); final List messages = appender.getMessages(); assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " From bf38b11743e277c33ab1f997ee5735cca01225a5 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Apr 2019 16:53:04 -0700 Subject: [PATCH 09/10] Unit tests for key range fetch to match single-key fetch in all stores --- .../internals/AbstractKeyValueStoreTest.java | 15 ++++++++ .../internals/CachingSessionStoreTest.java | 16 ++++++++ .../internals/CachingWindowStoreTest.java | 20 ++++++++++ .../internals/InMemoryWindowStoreTest.java | 18 +++++++++ .../internals/RocksDBSessionStoreTest.java | 36 +++++++++++++++++- .../internals/RocksDBWindowStoreTest.java | 37 +++++++++++++++++++ 6 files changed, 141 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index ac60ea7942151..7df6532a188cb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -381,6 +381,21 @@ public void shouldDeleteFromStore() { assertNull(store.get(2)); } + @Test + public void shouldReturnSameResultsForGetAndRangeWithEqualKeys() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>(1, "one")); + entries.add(new KeyValue<>(2, "two")); + entries.add(new KeyValue<>(3, "three")); + + store.putAll(entries); + + final Iterator> iterator = store.range(2, 2); + + assertEquals(iterator.next().value, store.get(2)); + assertFalse(iterator.hasNext()); + } + @Test public void shouldNotThrowConcurrentModificationException() { store.put(0, "zero"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 9fb71b4374646..48c96a28f6c86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -342,6 +342,22 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() flushListener.forwarded.clear(); } + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() { + cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 1)), "1".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(2, 3)), "2".getBytes()); + cachingStore.put(new Windowed<>(keyAA, new SessionWindow(4, 5)), "3".getBytes()); + cachingStore.put(new Windowed<>(keyB, new SessionWindow(6, 7)), "4".getBytes()); + + final KeyValueIterator, byte[]> singleKeyIterator = cachingStore.findSessions(keyAA, 0L, 10L); + final KeyValueIterator, byte[]> keyRangeIterator = cachingStore.findSessions(keyAA, keyAA, 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test public void shouldClearNamespaceCacheOnClose() { final Windowed a1 = new Windowed<>(keyA, new SessionWindow(0, 0)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index 4cca4bd9f120b..c327c5144b45f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -236,6 +236,10 @@ private static Bytes bytesKey(final String key) { return Bytes.wrap(key.getBytes()); } + private String stringFrom(byte[] from) { + return Serdes.String().deserializer().deserialize("", from); + } + @Test public void shouldPutFetchRangeFromCache() { cachingStore.put(bytesKey("a"), bytesValue("a")); @@ -545,6 +549,22 @@ public void shouldFetchAndIterateOverKeyRange() { ); } + @Test + public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() { + cachingStore.put(bytesKey("a"), bytesValue("0001"), 0); + cachingStore.put(bytesKey("aa"), bytesValue("0002"), 1); + cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2); + cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3); + + final WindowStoreIterator singleKeyIterator = cachingStore.fetch(bytesKey("aa"), 0L, 5L); + final KeyValueIterator, byte[]> keyRangeIterator = cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L); + + assertEquals(stringFrom(singleKeyIterator.next().value), stringFrom(keyRangeIterator.next().value)); + assertEquals(stringFrom(singleKeyIterator.next().value), stringFrom(keyRangeIterator.next().value)); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test(expected = NullPointerException.class) public void shouldThrowNullPointerExceptionOnPutNullKey() { cachingStore.put(null, bytesValue("anyValue")); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index f2ccfbd195cea..df924ec43cb9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -568,6 +568,24 @@ public void shouldNotExpireFromOpenIterator() { assertFalse(iterator2.hasNext()); } + @Test + public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() { + windowStore = createInMemoryWindowStore(context, false); + + windowStore.put(1, "one", 0L); + windowStore.put(2, "two", 1L); + windowStore.put(2, "two", 2L); + windowStore.put(3, "three", 3L); + + final WindowStoreIterator singleKeyIterator = windowStore.fetch(2, 0L, 5L); + final KeyValueIterator, String> keyRangeIterator = windowStore.fetch(2, 2, 0L, 5L); + + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test public void shouldNotThrowExceptionWhenFetchRangeIsExpired() { windowStore = createInMemoryWindowStore(context, false); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 0786c3723c856..dac96a811f837 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -18,11 +18,13 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.Stores; @@ -41,6 +43,7 @@ import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.valuesToList; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -224,6 +227,22 @@ public void shouldFetchExactKeys() { } } + @Test + public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 1)), 0L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(2, 3)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(4, 5)), 2L); + sessionStore.put(new Windowed<>("aaa", new SessionWindow(6, 7)), 3L); + + final KeyValueIterator, Long> singleKeyIterator = sessionStore.findSessions("aa", 0L, 10L); + final KeyValueIterator, Long> keyRangeIterator = sessionStore.findSessions("aa", "aa", 0L, 10L); + + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertEquals(singleKeyIterator.next(), keyRangeIterator.next()); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + @Test(expected = NullPointerException.class) public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() { sessionStore.findSessions(null, 1L, 2L); @@ -263,6 +282,21 @@ public void shouldThrowNullPointerExceptionOnRemoveNullKey() { public void shouldThrowNullPointerExceptionOnPutNullKey() { sessionStore.put(null, 1L); } - + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final String keyFrom = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", -1)); + final String keyTo = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", 1)); + + final KeyValueIterator iterator = sessionStore.findSessions(keyFrom, keyTo, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 42b1b8c37f9b7..7405e060c580a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -33,6 +33,8 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; @@ -58,6 +60,7 @@ import static java.util.Arrays.asList; import static java.util.Objects.requireNonNull; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1398,6 +1401,40 @@ public void shouldFetchAndIterateOverExactBinaryKeys() { assertThat(toList(windowStore.fetch(key3, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE))), equalTo(expectedKey3)); } + @Test + public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() { + windowStore = createWindowStore(context, false); + + windowStore.put(1, "one", 0L); + windowStore.put(2, "two", 1L); + windowStore.put(2, "two", 2L); + windowStore.put(3, "three", 3L); + + final WindowStoreIterator singleKeyIterator = windowStore.fetch(2, 0L, 5L); + final KeyValueIterator, String> keyRangeIterator = windowStore.fetch(2, 2, 0L, 5L); + + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertEquals(singleKeyIterator.next().value, keyRangeIterator.next().value); + assertFalse(singleKeyIterator.hasNext()); + assertFalse(keyRangeIterator.hasNext()); + } + + @Test + public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { + windowStore = createWindowStore(context, false); + + LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + + final KeyValueIterator iterator = windowStore.fetch(-1, 1, 0L, 10L); + assertFalse(iterator.hasNext()); + + final List messages = appender.getMessages(); + assertThat(messages, hasItem("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers")); + } + private void putFirstBatch(final WindowStore store, @SuppressWarnings("SameParameterValue") final long startTime, final InternalMockProcessorContext context) { From 892b1e9a0dae3447bc55330e943717847504377c Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Apr 2019 16:54:29 -0700 Subject: [PATCH 10/10] Fix checkstyle --- .../kafka/streams/state/internals/CachingWindowStoreTest.java | 2 +- .../kafka/streams/state/internals/RocksDBSessionStoreTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index c327c5144b45f..b0ccc15ca6639 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -236,7 +236,7 @@ private static Bytes bytesKey(final String key) { return Bytes.wrap(key.getBytes()); } - private String stringFrom(byte[] from) { + private String stringFrom(final byte[] from) { return Serdes.String().deserializer().deserialize("", from); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index dac96a811f837..1821913d4771f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed;