diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java index aa84bfcf5f9fe..3df170d5ab620 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java @@ -128,12 +128,13 @@ default WindowStoreIterator backwardFetch(K key, Instant timeFrom, Instant ti * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive), where iteration starts. * @param timeTo time range end (inclusive), where iteration ends. * @return an iterator over windowed key-value pairs {@code , value>}, from beginning to end of time. * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if {@code null} is used for any key. * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} */ KeyValueIterator, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo) @@ -146,12 +147,13 @@ KeyValueIterator, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Ins * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive), where iteration ends. * @param timeTo time range end (inclusive), where iteration starts. * @return an iterator over windowed key-value pairs {@code , value>}, from end to beginning of time. * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if {@code null} is used for any key. * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} */ default KeyValueIterator, V> backwardFetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 31c6eb107cd68..86c82fa55bf2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -117,12 +117,13 @@ default WindowStoreIterator backwardFetch(final K key, * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive) * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if one of the given keys is {@code null} */ // WindowStore keeps a long-based implementation of ReadOnlyWindowStore#fetch Instant-based // if super#fetch is removed, keep this implementation as it serves PAPI Stores. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index f7aef116a91ca..bfee6b2754a8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -114,7 +114,7 @@ KeyValueIterator fetch(final Bytes keyFrom, final long from, final long to, final boolean forward) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -124,8 +124,8 @@ KeyValueIterator fetch(final Bytes keyFrom, final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); - final Bytes binaryTo = keySchema.upperRange(keyTo, to); + final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); + final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); return new SegmentIterator<>( searchSpace.iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 59604a502bd00..fa04ac8141a29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -244,7 +244,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, final long timeTo) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -266,8 +266,8 @@ public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, true) : context.cache().range( cacheName, - cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), - cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) + keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), + keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo); @@ -289,7 +289,7 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr final Bytes keyTo, final long timeFrom, final long timeTo) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); @@ -310,8 +310,8 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) : context.cache().reverseRange( cacheName, - cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), - cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) + keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), + keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo); @@ -573,12 +573,14 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } - if (keyFrom.equals(keyTo)) { + if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { - cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); - cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); + cacheKeyFrom = keyFrom == null ? null : + cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); + cacheKeyTo = keyTo == null ? null : + cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index 42278559a9042..c6b0b604f14a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -115,8 +115,6 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo, final Instant timeFrom, final Instant timeTo) { - Objects.requireNonNull(keyFrom, "keyFrom can't be null"); - Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlyWindowStore> nextIteratorFunction = store -> store.fetch(keyFrom, keyTo, timeFrom, timeTo); return new DelegatingPeekingKeyValueIterator<>( @@ -131,8 +129,6 @@ public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo, final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException { - Objects.requireNonNull(keyFrom, "keyFrom can't be null"); - Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlyWindowStore> nextIteratorFunction = store -> store.backwardFetch(keyFrom, keyTo, timeFrom, timeTo); return new DelegatingPeekingKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index ae37542ba4bf8..5327e75f1ece0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -213,12 +213,9 @@ KeyValueIterator, byte[]> fetch(final Bytes from, final long timeFrom, final long timeTo, final boolean forward) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); - removeExpiredSegments(); - if (from.compareTo(to) > 0) { + if (from != null && to != null && from.compareTo(to) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -397,7 +394,8 @@ private WrappedWindowedKeyValueIterator registerNewWindowedKeyValueIterator(fina final Bytes to = (retainDuplicates && keyTo != null) ? wrapForDups(keyTo, Integer.MAX_VALUE) : keyTo; final WrappedWindowedKeyValueIterator iterator = - new WrappedWindowedKeyValueIterator(from, + new WrappedWindowedKeyValueIterator( + from, to, segmentIterator, openIterators::remove, @@ -462,7 +460,7 @@ public boolean hasNext() { } final Bytes key = getKey(next.key); - if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) { + if (isKeyWithinRange(key)) { return true; } else { next = null; @@ -470,6 +468,23 @@ public boolean hasNext() { } } + private boolean isKeyWithinRange(final Bytes key) { + // split all cases for readability and avoid BooleanExpressionComplexity checkstyle warning + if (keyFrom == null && keyTo == null) { + // fetch all + return true; + } else if (keyFrom == null) { + // start from the beginning + return key.compareTo(getKey(keyTo)) <= 0; + } else if (keyTo == null) { + // end to the last + return key.compareTo(getKey(keyFrom)) >= 0; + } else { + // key is within the range + return key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0; + } + } + public void close() { next = null; recordIterator = null; @@ -499,9 +514,16 @@ Iterator> setRecordIterator() { final Map.Entry> currentSegment = segmentIterator.next(); currentTime = currentSegment.getKey(); - final ConcurrentNavigableMap subMap = allKeys ? - currentSegment.getValue() : - currentSegment.getValue().subMap(keyFrom, true, keyTo, true); + final ConcurrentNavigableMap subMap; + if (allKeys) { // keyFrom == null && keyTo == null + subMap = currentSegment.getValue(); + } else if (keyFrom == null) { + subMap = currentSegment.getValue().headMap(keyTo, true); + } else if (keyTo == null) { + subMap = currentSegment.getValue().tailMap(keyFrom, true); + } else { + subMap = currentSegment.getValue().subMap(keyFrom, true, keyTo, true); + } if (forward) { return subMap.entrySet().iterator(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 24c17f2c2b534..48b42fbabe6ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -245,10 +245,12 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo, final long timeFrom, final long timeTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo), + wrapped().fetch( + keyFrom == null ? null : keyBytes(keyFrom), + keyTo == null ? null : keyBytes(keyTo), + timeFrom, + timeTo), fetchSensor, streamsMetrics, serdes, @@ -260,10 +262,12 @@ public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo, final long timeFrom, final long timeTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( - wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo), + wrapped().backwardFetch( + keyFrom == null ? null : keyBytes(keyFrom), + keyTo == null ? null : keyBytes(keyTo), + timeFrom, + timeTo), fetchSensor, streamsMetrics, serdes, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 03b66a634f2db..6191c4988877e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -74,18 +74,10 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { - if (from == null || to == null) { - if (forward) { - currentIterator = currentSegment.all(); - } else { - currentIterator = currentSegment.reverseAll(); - } + if (forward) { + currentIterator = currentSegment.range(from, to); } else { - if (forward) { - currentIterator = currentSegment.range(from, to); - } else { - currentIterator = currentSegment.reverseRange(from, to); - } + currentIterator = currentSegment.reverseRange(from, to); } } catch (final InvalidStateStoreException e) { // segment may have been closed so we ignore it. diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 21aaf0fd8378f..e1dda05bfefb9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -145,16 +145,138 @@ public void close() { @Test public void shouldPutAndFetch() { - final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10)); - bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); - bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); + final String keyA = "a"; + final String keyB = "b"; + final String keyC = "c"; + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100)); + bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200)); - try (final KeyValueIterator values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500)) { + try (final KeyValueIterator values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 10L), - KeyValue.pair(new Windowed<>(key, windows[1]), 50L) + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + null, null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) + ); + + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldPutAndBackwardFetch() { + final String keyA = "a"; + final String keyB = "b"; + final String keyC = "c"; + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100)); + bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200)); + + try (final KeyValueIterator values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + null, null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) ); assertEquals(expected, toList(values)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 153db976c9970..e93f758c5cffc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -436,6 +436,30 @@ public void testFetchRange() { ofEpochMilli(defaultStartTime + 3L), ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5))) ); + assertEquals( + asList(zero, one, two), + toList(windowStore.fetch( + null, + 2, + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L))) + ); + assertEquals( + asList(two, three, four, five), + toList(windowStore.fetch( + 2, + null, + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L))) + ); + assertEquals( + asList(zero, one, two, three, four, five), + toList(windowStore.fetch( + null, + null, + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L))) + ); } @Test @@ -506,6 +530,30 @@ public void testBackwardFetchRange() { ofEpochMilli(defaultStartTime + 3L), ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5))) ); + assertEquals( + asList(two, one, zero), + toList(windowStore.backwardFetch( + null, + 2, + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L))) + ); + assertEquals( + asList(five, four, three, two), + toList(windowStore.backwardFetch( + 2, + null, + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L))) + ); + assertEquals( + asList(five, four, three, two, one, zero), + toList(windowStore.backwardFetch( + null, + null, + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L))) + ); } @Test @@ -860,16 +908,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } - @Test - public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { - assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L))); - } - - @Test - public void shouldThrowNullPointerExceptionOnRangeNullToKey() { - assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L))); - } - @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final WindowStore windowStore = buildWindowStore(RETENTION_PERIOD, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 023d69a6a9f4d..2d64a44aa063a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -54,6 +54,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -64,6 +65,7 @@ import static java.util.Arrays.asList; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.hamcrest.CoreMatchers.equalTo; @@ -283,20 +285,157 @@ public void shouldPutFetchRangeFromCache() { cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final KeyValueIterator, byte[]> iterator = - cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) { - verifyWindowedKeyValue( - iterator.next(), + cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) { + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "a"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("a", "b"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); assertEquals(2, cache.size()); } } + @Test + public void shouldPutFetchRangeFromCacheForNullKeyFrom() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("a", "b", "c", "d"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("b", "c", "d", "e"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("a", "b", "c", "d", "e"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFetch(null, bytesKey("c"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("c", "b", "a"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFetch(bytesKey("c"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("e", "d", "c"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + final List> expectedKeys = Arrays.asList( + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("e", "d", "c", "b", "a"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); + } + } + @Test public void shouldGetAllFromCache() { cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); @@ -813,16 +952,6 @@ public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } - @Test - public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, bytesKey("anyTo"), ofEpochMilli(1L), ofEpochMilli(2L))); - } - - @Test - public void shouldThrowNullPointerExceptionOnRangeNullToKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L))); - } - @Test public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java index 3c486c34fd60f..51f58cd33de76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java @@ -90,6 +90,7 @@ public void shouldFetchValuesFromWindowStore() { } } + @Test public void shouldBackwardFetchValuesFromWindowStore() { underlyingWindowStore.put("my-key", "my-value", 0L); @@ -361,6 +362,97 @@ public void shouldFetchKeyRangeAcrossStores() { KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")))); } + @Test + public void shouldFetchKeyRangeAcrossStoresWithNullKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c")))); + } + + @Test + public void shouldFetchKeyRangeAcrossStoresWithNullKeyFrom() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.fetch(null, "c", ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c")))); + } + + @Test + public void shouldFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.fetch(null, null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c")))); + } + + @Test + public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.backwardFetch("a", null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")))); + } + + @Test + public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFrom() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.backwardFetch(null, "c", ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b") + ))); + } + + @Test + public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.backwardFetch(null, null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")))); + } + @Test public void shouldBackwardFetchKeyRangeAcrossStores() { final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); @@ -444,15 +536,4 @@ public void shouldBackwardFetchAllAcrossStores() { public void shouldThrowNPEIfKeyIsNull() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(0), ofEpochMilli(0))); } - - @Test - public void shouldThrowNPEIfFromKeyIsNull() { - assertThrows(NullPointerException.class, () -> windowStore.fetch(null, "a", ofEpochMilli(0), ofEpochMilli(0))); - } - - @Test - public void shouldThrowNPEIfToKeyIsNull() { - assertThrows(NullPointerException.class, () -> windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(0))); - } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 649e159c81330..ca6a518eb4cb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -274,10 +274,19 @@ public void shouldFetchFromInnerStoreAndRecordFetchMetrics() { public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.fetch(null, null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); replay(innerStoreMock); store.init((StateStoreContext) context, store); store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.fetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.fetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.fetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; // it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor // and the sensor is tested elsewhere @@ -306,10 +315,19 @@ public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() { public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() { expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.backwardFetch(null, null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); replay(innerStoreMock); store.init((StateStoreContext) context, store); store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.backwardFetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.backwardFetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.backwardFetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; // it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor // and the sensor is tested elsewhere @@ -454,27 +472,6 @@ public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() { assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L)); } - @Test - public void shouldThrowNullPointerOnFetchRangeIfFromIsNull() { - assertThrows(NullPointerException.class, () -> store.fetch(null, "to", 0L, 1L)); - } - - @Test - public void shouldThrowNullPointerOnFetchRangeIfToIsNull() { - assertThrows(NullPointerException.class, () -> store.fetch("from", null, 0L, 1L)); - } - - - @Test - public void shouldThrowNullPointerOnbackwardFetchRangeIfFromIsNull() { - assertThrows(NullPointerException.class, () -> store.backwardFetch(null, "to", 0L, 1L)); - } - - @Test - public void shouldThrowNullPointerOnbackwardFetchRangeIfToIsNull() { - assertThrows(NullPointerException.class, () -> store.backwardFetch("from", null, 0L, 1L)); - } - private KafkaMetric metric(final String name) { return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 79d22296fad88..752334d171e0d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -266,7 +266,19 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo, fi for (long now = timeFrom.toEpochMilli(); now <= timeTo.toEpochMilli(); now++) { final NavigableMap kvMap = data.get(now); if (kvMap != null) { - for (final Entry entry : kvMap.subMap(keyFrom, true, keyTo, true).entrySet()) { + final NavigableMap kvSubMap; + if (keyFrom == null && keyFrom == null) { + kvSubMap = kvMap; + } else if (keyFrom == null) { + kvSubMap = kvMap.headMap(keyTo, true); + } else if (keyTo == null) { + kvSubMap = kvMap.tailMap(keyFrom, true); + } else { + // keyFrom != null and KeyTo != null + kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true); + } + + for (final Entry entry : kvSubMap.entrySet()) { results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue())); } } @@ -297,8 +309,8 @@ public KeyValue, V> next() { } @Override - public KeyValueIterator, V> backwardFetch(final K from, - final K to, + public KeyValueIterator, V> backwardFetch(final K keyFrom, + final K keyTo, final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException { final long timeFromTs = ApiUtils.validateMillisecondInstant(timeFrom, prepareMillisCheckFailMsgPrefix(timeFrom, "timeFrom")); @@ -310,7 +322,19 @@ public KeyValueIterator, V> backwardFetch(final K from, for (long now = timeToTs; now >= timeFromTs; now--) { final NavigableMap kvMap = data.get(now); if (kvMap != null) { - for (final Entry entry : kvMap.subMap(from, true, to, true).descendingMap().entrySet()) { + final NavigableMap kvSubMap; + if (keyFrom == null && keyFrom == null) { + kvSubMap = kvMap; + } else if (keyFrom == null) { + kvSubMap = kvMap.headMap(keyTo, true); + } else if (keyTo == null) { + kvSubMap = kvMap.tailMap(keyFrom, true); + } else { + // keyFrom != null and KeyTo != null + kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true); + } + + for (final Entry entry : kvSubMap.descendingMap().entrySet()) { results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index c7e59247e1376..31ce3c696368a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -111,6 +111,34 @@ public void shouldIterateOverAllSegments() { assertFalse(iterator.hasNext()); } + @Test + public void shouldIterateOverAllSegmentsWhenNullKeyFromKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + null, + null, + true); + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldIterateBackwardOverAllSegments() { iterator = new SegmentIterator<>( @@ -139,6 +167,34 @@ public void shouldIterateBackwardOverAllSegments() { assertFalse(iterator.hasNext()); } + @Test + public void shouldIterateBackwardOverAllSegmentsWhenNullKeyFromKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentTwo, segmentOne).iterator(), //store should pass the segments in the right order + hasNextCondition, + null, + null, + false); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() { iterator = new SegmentIterator<>( @@ -173,6 +229,47 @@ public void shouldOnlyIterateOverSegmentsInBackwardRange() { assertFalse(iterator.hasNext()); } + @Test + public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyFrom() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + null, + Bytes.wrap("b".getBytes()), + false); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + Bytes.wrap("c".getBytes()), + null, + false); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldOnlyIterateOverSegmentsInRange() { iterator = new SegmentIterator<>( @@ -193,6 +290,54 @@ public void shouldOnlyIterateOverSegmentsInRange() { assertFalse(iterator.hasNext()); } + @Test + public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyFrom() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + null, + Bytes.wrap("c".getBytes()), + true); + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + Bytes.wrap("b".getBytes()), + null, + true); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() { iterator = new SegmentIterator<>( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java index 833ab6a074cc4..a429256ad9fd1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -39,6 +40,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.TestUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -54,12 +56,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.function.Predicate; import java.util.function.Supplier; import static java.time.Duration.ofMillis; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; @RunWith(Parameterized.class) public class WindowStoreFetchTest { @@ -77,6 +82,14 @@ private enum StoreType { InMemory, RocksDB, Timed }; private LinkedList, Long>> expectedRecords; private LinkedList> records; private Properties streamsConfig; + private String low; + private String high; + private String middle; + private String innerLow; + private String innerHigh; + private String innerLowBetween; + private String innerHighBetween; + private String storeName; private TimeWindowedKStream windowedStream; @@ -98,7 +111,31 @@ public WindowStoreFetchTest(final StoreType storeType, final boolean enableLoggi // expected the count of each key is 2 final long windowStartTime = i < m ? 0 : WINDOW_SIZE; expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); + high = key; + if (low == null) { + low = key; + } + if (i == m) { + middle = key; + } + if (i == 1) { + innerLow = key; + final int index = i * 2 - 1; + innerLowBetween = "key-" + index; + } + if (i == DATA_SIZE - 2) { + innerHigh = key; + final int index = i * 2 + 1; + innerHighBetween = "key-" + index; + } } + Assert.assertNotNull(low); + Assert.assertNotNull(high); + Assert.assertNotNull(middle); + Assert.assertNotNull(innerLow); + Assert.assertNotNull(innerHigh); + Assert.assertNotNull(innerLowBetween); + Assert.assertNotNull(innerHighBetween); } @Rule @@ -161,6 +198,50 @@ public void testStoreConfig() { TestUtils.checkEquals(scanIterator, dataIterator); } + + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetch(null, null, 0, Long.MAX_VALUE) : + stateStore.backwardFetch(null, null, 0, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + + testRange("range", stateStore, innerLow, innerHigh, forward); + testRange("until", stateStore, null, middle, forward); + testRange("from", stateStore, middle, null, forward); + + testRange("untilBetween", stateStore, null, innerHighBetween, forward); + testRange("fromBetween", stateStore, innerLowBetween, null, forward); + } + } + + private List, Long>> filterList(final KeyValueIterator, Long> iterator, final String from, final String to) { + final Predicate, Long>> pred = new Predicate, Long>>() { + @Override + public boolean test(final KeyValue, Long> elem) { + if (from != null && elem.key.key().compareTo(from) < 0) { + return false; + } + if (to != null && elem.key.key().compareTo(to) > 0) { + return false; + } + return elem != null; + } + }; + + return Utils.toList(iterator, pred); + } + + private void testRange(final String name, final WindowStore store, final String from, final String to, final boolean forward) { + try (final KeyValueIterator, Long> resultIterator = forward ? store.fetch(from, to, 0, Long.MAX_VALUE) : store.backwardFetch(from, to, 0, Long.MAX_VALUE); + final KeyValueIterator, Long> expectedIterator = forward ? store.fetchAll(0, Long.MAX_VALUE) : store.backwardFetchAll(0, Long.MAX_VALUE)) { + final List, Long>> result = Utils.toList(resultIterator); + final List, Long>> expected = filterList(expectedIterator, from, to); + assertThat(result, is(expected)); } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 8d0bb34bf0308..aa2942baddb6d 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; import java.util.ArrayList; import java.util.Arrays; @@ -44,6 +45,7 @@ import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; public final class StreamsTestUtils { private StreamsTestUtils() {} @@ -161,6 +163,24 @@ public static void verifyKeyValueList(final List> expect } } + public static void verifyAllWindowedKeyValues(final KeyValueIterator, byte[]> iterator, + final List> expectedKeys, + final List expectedValues) { + if (expectedKeys.size() != expectedValues.size()) { + throw new IllegalArgumentException("expectedKeys and expectedValues should have the same size. " + + "expectedKeys size: " + expectedKeys.size() + ", expectedValues size: " + expectedValues.size()); + } + + for (int i = 0; i < expectedKeys.size(); i++) { + verifyWindowedKeyValue( + iterator.next(), + expectedKeys.get(i), + expectedValues.get(i) + ); + } + assertFalse(iterator.hasNext()); + } + public static void verifyWindowedKeyValue(final KeyValue, byte[]> actual, final Windowed expectedKey, final String expectedValue) {