From f270e187e6d2cbea2f032bfa5f1664dab4d09a64 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 18 Aug 2021 17:05:04 +0800 Subject: [PATCH 1/7] KAFKA-13211: add support for infinite range query for WindowStore --- checkstyle/suppressions.xml | 3 + .../streams/state/ReadOnlyWindowStore.java | 6 +- .../kafka/streams/state/WindowStore.java | 3 +- .../AbstractRocksDBSegmentedBytesStore.java | 6 +- .../state/internals/CachingWindowStore.java | 12 +- .../CompositeReadOnlyWindowStore.java | 4 - .../state/internals/InMemoryWindowStore.java | 23 ++- .../state/internals/MeteredWindowStore.java | 4 - .../state/internals/SegmentIterator.java | 2 +- .../AbstractWindowBytesStoreTest.java | 60 +++++- .../CachingPersistentWindowStoreTest.java | 192 +++++++++++++++++- .../CompositeReadOnlyWindowStoreTest.java | 103 +++++++++- .../internals/ReadOnlyWindowStoreStub.java | 32 ++- .../state/internals/SegmentIteratorTest.java | 145 +++++++++++++ 14 files changed, 531 insertions(+), 64 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 5dd9187bfe2cd..0c6d7a949904d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -160,6 +160,9 @@ + + diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java index aa84bfcf5f9fe..3af53c1c939db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java @@ -128,12 +128,13 @@ default WindowStoreIterator backwardFetch(K key, Instant timeFrom, Instant ti * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive), where iteration starts. * @param timeTo time range end (inclusive), where iteration ends. * @return an iterator over windowed key-value pairs {@code , value>}, from beginning to end of time. * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if {@code null} is used for any key. * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} */ KeyValueIterator, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo) @@ -146,12 +147,13 @@ KeyValueIterator, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Ins * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive), where iteration ends. * @param timeTo time range end (inclusive), where iteration starts. * @return an iterator over windowed key-value pairs {@code , value>}, from end to beginning of time. * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if {@code null} is used for any key. * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} */ default KeyValueIterator, V> backwardFetch(K keyFrom, K keyTo, Instant timeFrom, Instant timeTo) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 31c6eb107cd68..9bd959e1466db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -117,12 +117,13 @@ default WindowStoreIterator backwardFetch(final K key, * This iterator must be closed after use. * * @param keyFrom the first key in the range + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive) * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if one of the given keys is {@code null} */ // WindowStore keeps a long-based implementation of ReadOnlyWindowStore#fetch Instant-based // if super#fetch is removed, keep this implementation as it serves PAPI Stores. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index f7aef116a91ca..bfee6b2754a8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -114,7 +114,7 @@ KeyValueIterator fetch(final Bytes keyFrom, final long from, final long to, final boolean forward) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -124,8 +124,8 @@ KeyValueIterator fetch(final Bytes keyFrom, final List searchSpace = keySchema.segmentsToSearch(segments, from, to, forward); - final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from); - final Bytes binaryTo = keySchema.upperRange(keyTo, to); + final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from); + final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to); return new SegmentIterator<>( searchSpace.iterator(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 59604a502bd00..41152979cd4f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -244,7 +244,7 @@ public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, final long timeTo) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -289,7 +289,7 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr final Bytes keyTo, final long timeFrom, final long timeTo) { - if (keyFrom.compareTo(keyTo) > 0) { + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + "Note that the built-in numerical serdes do not follow this for negative numbers"); @@ -573,12 +573,14 @@ private void setCacheKeyRange(final long lowerRangeEndTime, final long upperRang throw new IllegalStateException("Error iterating over segments: segment interval has changed"); } - if (keyFrom.equals(keyTo)) { + if (keyFrom != null && keyTo != null && keyFrom.equals(keyTo)) { cacheKeyFrom = cacheFunction.cacheKey(segmentLowerRangeFixedSize(keyFrom, lowerRangeEndTime)); cacheKeyTo = cacheFunction.cacheKey(segmentUpperRangeFixedSize(keyTo, upperRangeEndTime)); } else { - cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); - cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); + cacheKeyFrom = keyFrom == null ? null : + cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, lowerRangeEndTime), currentSegmentId); + cacheKeyTo = keyTo == null ? null : + cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo), currentSegmentId); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java index 42278559a9042..c6b0b604f14a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java @@ -115,8 +115,6 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo, final Instant timeFrom, final Instant timeTo) { - Objects.requireNonNull(keyFrom, "keyFrom can't be null"); - Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlyWindowStore> nextIteratorFunction = store -> store.fetch(keyFrom, keyTo, timeFrom, timeTo); return new DelegatingPeekingKeyValueIterator<>( @@ -131,8 +129,6 @@ public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo, final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException { - Objects.requireNonNull(keyFrom, "keyFrom can't be null"); - Objects.requireNonNull(keyTo, "keyTo can't be null"); final NextIteratorFunction, V, ReadOnlyWindowStore> nextIteratorFunction = store -> store.backwardFetch(keyFrom, keyTo, timeFrom, timeTo); return new DelegatingPeekingKeyValueIterator<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 7c0d21c7c7d18..d568f82c38bb3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -213,12 +213,9 @@ KeyValueIterator, byte[]> fetch(final Bytes from, final long timeFrom, final long timeTo, final boolean forward) { - Objects.requireNonNull(from, "from key cannot be null"); - Objects.requireNonNull(to, "to key cannot be null"); - removeExpiredSegments(); - if (from.compareTo(to) > 0) { + if (from != null && to != null && from.compareTo(to) > 0) { LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + "This may be due to range arguments set in the wrong order, " + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + @@ -397,7 +394,8 @@ private WrappedWindowedKeyValueIterator registerNewWindowedKeyValueIterator(fina final Bytes to = (retainDuplicates && keyTo != null) ? wrapForDups(keyTo, Integer.MAX_VALUE) : keyTo; final WrappedWindowedKeyValueIterator iterator = - new WrappedWindowedKeyValueIterator(from, + new WrappedWindowedKeyValueIterator( + from, to, segmentIterator, openIterators::remove, @@ -462,7 +460,7 @@ public boolean hasNext() { } final Bytes key = getKey(next.key); - if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) { + if (isKeyWithinRange(key)) { return true; } else { next = null; @@ -470,6 +468,13 @@ public boolean hasNext() { } } + private boolean isKeyWithinRange(final Bytes key) { + return (keyFrom == null && keyTo == null) || // fetch all + (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) || // start from the beginning + (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) || // end to the last + (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0); + } + public void close() { next = null; recordIterator = null; @@ -499,8 +504,12 @@ Iterator> setRecordIterator() { final Map.Entry> currentSegment = segmentIterator.next(); currentTime = currentSegment.getKey(); - if (allKeys) { + if (allKeys) { // keyFrom == null && keyTo == null return currentSegment.getValue().entrySet().iterator(); + } else if (keyFrom == null) { + return currentSegment.getValue().headMap(keyTo, true).entrySet().iterator(); + } else if (keyTo == null) { + return currentSegment.getValue().tailMap(keyFrom, true).entrySet().iterator(); } else { return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 24c17f2c2b534..2322da530d8ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -245,8 +245,6 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo, final long timeFrom, final long timeTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo), fetchSensor, @@ -260,8 +258,6 @@ public KeyValueIterator, V> backwardFetch(final K keyFrom, final K keyTo, final long timeFrom, final long timeTo) { - Objects.requireNonNull(keyFrom, "keyFrom cannot be null"); - Objects.requireNonNull(keyTo, "keyTo cannot be null"); return new MeteredWindowedKeyValueIterator<>( wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo), fetchSensor, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 03b66a634f2db..d84a1b958b14c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -74,7 +74,7 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { - if (from == null || to == null) { + if (from == null && to == null) { if (forward) { currentIterator = currentSegment.all(); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 02db7e74a8146..df0595889a999 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -490,7 +490,31 @@ public void testFetchRange() { 0, 3, ofEpochMilli(startTime + 3L), - ofEpochMilli(startTime + WINDOW_SIZE + 5))) + ofEpochMilli(startTime + WINDOW_SIZE + 5L))) + ); + assertEquals( + asList(zero, one, two), + toList(windowStore.fetch( + null, + 2, + ofEpochMilli(startTime + 0L - WINDOW_SIZE), + ofEpochMilli(startTime + WINDOW_SIZE + 2L))) + ); + assertEquals( + asList(two, four, five), + toList(windowStore.fetch( + 2, + null, + ofEpochMilli(startTime + 0L - WINDOW_SIZE), + ofEpochMilli(startTime + WINDOW_SIZE + 5L))) + ); + assertEquals( + asList(zero, one, two, four, five), + toList(windowStore.fetch( + null, + null, + ofEpochMilli(startTime + 0L - WINDOW_SIZE), + ofEpochMilli(startTime + WINDOW_SIZE + 5L))) ); } @@ -570,6 +594,30 @@ public void testBackwardFetchRange() { ofEpochMilli(startTime + 3L), ofEpochMilli(startTime + WINDOW_SIZE + 5))) ); + assertEquals( + asList(two, one, zero), + toList(windowStore.backwardFetch( + null, + 2, + ofEpochMilli(startTime + 0L - WINDOW_SIZE), + ofEpochMilli(startTime + WINDOW_SIZE + 2L))) + ); + assertEquals( + asList(five, four, two), + toList(windowStore.backwardFetch( + 2, + null, + ofEpochMilli(startTime + 0L - WINDOW_SIZE), + ofEpochMilli(startTime + WINDOW_SIZE + 5L))) + ); + assertEquals( + asList(five, four, two, one, zero), + toList(windowStore.backwardFetch( + null, + null, + ofEpochMilli(startTime + 0L - WINDOW_SIZE), + ofEpochMilli(startTime + WINDOW_SIZE + 5L))) + ); } @Test @@ -931,16 +979,6 @@ public void shouldThrowNullPointerExceptionOnGetNullKey() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } - @Test - public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { - assertThrows(NullPointerException.class, () -> windowStore.fetch(null, 2, ofEpochMilli(1L), ofEpochMilli(2L))); - } - - @Test - public void shouldThrowNullPointerExceptionOnRangeNullToKey() { - assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L))); - } - @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final WindowStore windowStore = buildWindowStore(RETENTION_PERIOD, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 023d69a6a9f4d..c9461a05031de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -283,7 +283,7 @@ public void shouldPutFetchRangeFromCache() { cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final KeyValueIterator, byte[]> iterator = - cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) { + cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) { verifyWindowedKeyValue( iterator.next(), new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), @@ -297,6 +297,186 @@ public void shouldPutFetchRangeFromCache() { } } + @Test + public void shouldPutFetchRangeFromCacheForNullKeyFrom() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "a"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "b"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + "c"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "d"); + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "b"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + "c"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "d"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "e"); + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "a"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "b"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + "c"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "d"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "e"); + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFetch(null, bytesKey("c"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + "c"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "b"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "a"); + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFetch(bytesKey("c"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "e"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "d"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + "c"); + assertFalse(iterator.hasNext()); + } + } + + @Test + public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() { + cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP); + cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 10L); + cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 20L); + cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 20L); + + try (final KeyValueIterator, byte[]> iterator = + cachingStore.backwardFetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "e"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), + "d"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), + "c"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "b"); + verifyWindowedKeyValue( + iterator.next(), + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), + "a"); + assertFalse(iterator.hasNext()); + } + } + @Test public void shouldGetAllFromCache() { cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP); @@ -813,16 +993,6 @@ public void shouldThrowNullPointerExceptionOnFetchNullKey() { assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, ofEpochMilli(1L), ofEpochMilli(2L))); } - @Test - public void shouldThrowNullPointerExceptionOnRangeNullFromKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(null, bytesKey("anyTo"), ofEpochMilli(1L), ofEpochMilli(2L))); - } - - @Test - public void shouldThrowNullPointerExceptionOnRangeNullToKey() { - assertThrows(NullPointerException.class, () -> cachingStore.fetch(bytesKey("anyFrom"), null, ofEpochMilli(1L), ofEpochMilli(2L))); - } - @Test public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { final Bytes keyFrom = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java index 3c486c34fd60f..51f58cd33de76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java @@ -90,6 +90,7 @@ public void shouldFetchValuesFromWindowStore() { } } + @Test public void shouldBackwardFetchValuesFromWindowStore() { underlyingWindowStore.put("my-key", "my-value", 0L); @@ -361,6 +362,97 @@ public void shouldFetchKeyRangeAcrossStores() { KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")))); } + @Test + public void shouldFetchKeyRangeAcrossStoresWithNullKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c")))); + } + + @Test + public void shouldFetchKeyRangeAcrossStoresWithNullKeyFrom() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.fetch(null, "c", ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c")))); + } + + @Test + public void shouldFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.fetch(null, null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c")))); + } + + @Test + public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.backwardFetch("a", null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")))); + } + + @Test + public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFrom() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.backwardFetch(null, "c", ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b") + ))); + } + + @Test + public void shouldBackwardFetchKeyRangeAcrossStoresWithNullKeyFromKeyTo() { + final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); + stubProviderTwo.addStore(storeName, secondUnderlying); + underlyingWindowStore.put("a", "a", 0L); + secondUnderlying.put("b", "b", 10L); + secondUnderlying.put("c", "c", 10L); + final List, String>> results = + StreamsTestUtils.toList(windowStore.backwardFetch(null, null, ofEpochMilli(0), ofEpochMilli(10))); + assertThat(results, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("a", new TimeWindow(0, WINDOW_SIZE)), "a"), + KeyValue.pair(new Windowed<>("c", new TimeWindow(10, 10 + WINDOW_SIZE)), "c"), + KeyValue.pair(new Windowed<>("b", new TimeWindow(10, 10 + WINDOW_SIZE)), "b")))); + } + @Test public void shouldBackwardFetchKeyRangeAcrossStores() { final ReadOnlyWindowStoreStub secondUnderlying = new ReadOnlyWindowStoreStub<>(WINDOW_SIZE); @@ -444,15 +536,4 @@ public void shouldBackwardFetchAllAcrossStores() { public void shouldThrowNPEIfKeyIsNull() { assertThrows(NullPointerException.class, () -> windowStore.fetch(null, ofEpochMilli(0), ofEpochMilli(0))); } - - @Test - public void shouldThrowNPEIfFromKeyIsNull() { - assertThrows(NullPointerException.class, () -> windowStore.fetch(null, "a", ofEpochMilli(0), ofEpochMilli(0))); - } - - @Test - public void shouldThrowNPEIfToKeyIsNull() { - assertThrows(NullPointerException.class, () -> windowStore.fetch("a", null, ofEpochMilli(0), ofEpochMilli(0))); - } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 79d22296fad88..752334d171e0d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -266,7 +266,19 @@ public KeyValueIterator, V> fetch(final K keyFrom, final K keyTo, fi for (long now = timeFrom.toEpochMilli(); now <= timeTo.toEpochMilli(); now++) { final NavigableMap kvMap = data.get(now); if (kvMap != null) { - for (final Entry entry : kvMap.subMap(keyFrom, true, keyTo, true).entrySet()) { + final NavigableMap kvSubMap; + if (keyFrom == null && keyFrom == null) { + kvSubMap = kvMap; + } else if (keyFrom == null) { + kvSubMap = kvMap.headMap(keyTo, true); + } else if (keyTo == null) { + kvSubMap = kvMap.tailMap(keyFrom, true); + } else { + // keyFrom != null and KeyTo != null + kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true); + } + + for (final Entry entry : kvSubMap.entrySet()) { results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue())); } } @@ -297,8 +309,8 @@ public KeyValue, V> next() { } @Override - public KeyValueIterator, V> backwardFetch(final K from, - final K to, + public KeyValueIterator, V> backwardFetch(final K keyFrom, + final K keyTo, final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException { final long timeFromTs = ApiUtils.validateMillisecondInstant(timeFrom, prepareMillisCheckFailMsgPrefix(timeFrom, "timeFrom")); @@ -310,7 +322,19 @@ public KeyValueIterator, V> backwardFetch(final K from, for (long now = timeToTs; now >= timeFromTs; now--) { final NavigableMap kvMap = data.get(now); if (kvMap != null) { - for (final Entry entry : kvMap.subMap(from, true, to, true).descendingMap().entrySet()) { + final NavigableMap kvSubMap; + if (keyFrom == null && keyFrom == null) { + kvSubMap = kvMap; + } else if (keyFrom == null) { + kvSubMap = kvMap.headMap(keyTo, true); + } else if (keyTo == null) { + kvSubMap = kvMap.tailMap(keyFrom, true); + } else { + // keyFrom != null and KeyTo != null + kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true); + } + + for (final Entry entry : kvSubMap.descendingMap().entrySet()) { results.add(new KeyValue<>(new Windowed<>(entry.getKey(), new TimeWindow(now, now + windowSize)), entry.getValue())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index c7e59247e1376..31ce3c696368a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -111,6 +111,34 @@ public void shouldIterateOverAllSegments() { assertFalse(iterator.hasNext()); } + @Test + public void shouldIterateOverAllSegmentsWhenNullKeyFromKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + null, + null, + true); + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldIterateBackwardOverAllSegments() { iterator = new SegmentIterator<>( @@ -139,6 +167,34 @@ public void shouldIterateBackwardOverAllSegments() { assertFalse(iterator.hasNext()); } + @Test + public void shouldIterateBackwardOverAllSegmentsWhenNullKeyFromKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentTwo, segmentOne).iterator(), //store should pass the segments in the right order + hasNextCondition, + null, + null, + false); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() { iterator = new SegmentIterator<>( @@ -173,6 +229,47 @@ public void shouldOnlyIterateOverSegmentsInBackwardRange() { assertFalse(iterator.hasNext()); } + @Test + public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyFrom() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + null, + Bytes.wrap("b".getBytes()), + false); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldOnlyIterateOverSegmentsInBackwardRangeWhenNullKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + Bytes.wrap("c".getBytes()), + null, + false); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldOnlyIterateOverSegmentsInRange() { iterator = new SegmentIterator<>( @@ -193,6 +290,54 @@ public void shouldOnlyIterateOverSegmentsInRange() { assertFalse(iterator.hasNext()); } + @Test + public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyFrom() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + null, + Bytes.wrap("c".getBytes()), + true); + + assertTrue(iterator.hasNext()); + assertEquals("a", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("a", "1"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + + @Test + public void shouldOnlyIterateOverSegmentsInRangeWhenNullKeyTo() { + iterator = new SegmentIterator<>( + Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + Bytes.wrap("b".getBytes()), + null, + true); + + assertTrue(iterator.hasNext()); + assertEquals("b", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("b", "2"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("c", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("c", "3"), toStringKeyValue(iterator.next())); + + assertTrue(iterator.hasNext()); + assertEquals("d", new String(iterator.peekNextKey().get())); + assertEquals(KeyValue.pair("d", "4"), toStringKeyValue(iterator.next())); + + assertFalse(iterator.hasNext()); + } + @Test public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() { iterator = new SegmentIterator<>( From 55551f3d84e4d7e6feac415122b090ccba8e956e Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 25 Aug 2021 21:33:28 +0800 Subject: [PATCH 2/7] KAFKA-13211: refactor codes --- checkstyle/suppressions.xml | 3 - .../streams/state/ReadOnlyWindowStore.java | 8 +- .../kafka/streams/state/WindowStore.java | 4 +- .../state/internals/InMemoryWindowStore.java | 21 ++- .../state/internals/SegmentIterator.java | 14 +- .../CachingPersistentWindowStoreTest.java | 143 +++++++----------- .../apache/kafka/test/StreamsTestUtils.java | 20 +++ 7 files changed, 97 insertions(+), 116 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 0c6d7a949904d..5dd9187bfe2cd 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -160,9 +160,6 @@ - - diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java index 3af53c1c939db..3df170d5ab620 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java @@ -128,9 +128,9 @@ default WindowStoreIterator backwardFetch(K key, Instant timeFrom, Instant ti * This iterator must be closed after use. * * @param keyFrom the first key in the range - * A null value indicates a starting position from the first element in the store. + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range - * A null value indicates that the range ends with the last element in the store. + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive), where iteration starts. * @param timeTo time range end (inclusive), where iteration ends. * @return an iterator over windowed key-value pairs {@code , value>}, from beginning to end of time. @@ -147,9 +147,9 @@ KeyValueIterator, V> fetch(K keyFrom, K keyTo, Instant timeFrom, Ins * This iterator must be closed after use. * * @param keyFrom the first key in the range - * A null value indicates a starting position from the first element in the store. + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range - * A null value indicates that the range ends with the last element in the store. + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive), where iteration ends. * @param timeTo time range end (inclusive), where iteration starts. * @return an iterator over windowed key-value pairs {@code , value>}, from end to beginning of time. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 9bd959e1466db..86c82fa55bf2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -117,9 +117,9 @@ default WindowStoreIterator backwardFetch(final K key, * This iterator must be closed after use. * * @param keyFrom the first key in the range - * A null value indicates a starting position from the first element in the store. + * A null value indicates a starting position from the first element in the store. * @param keyTo the last key in the range - * A null value indicates that the range ends with the last element in the store. + * A null value indicates that the range ends with the last element in the store. * @param timeFrom time range start (inclusive) * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index d568f82c38bb3..a3acb57e8efbe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -469,10 +469,23 @@ public boolean hasNext() { } private boolean isKeyWithinRange(final Bytes key) { - return (keyFrom == null && keyTo == null) || // fetch all - (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) || // start from the beginning - (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) || // end to the last - (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0); + boolean isKeyInRange = false; + // split all cases for readability and avoid BooleanExpressionComplexity checkstyle warning + if (keyFrom == null && keyTo == null) { + // fetch all + isKeyInRange = true; + } else if (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) { + // start from the beginning + isKeyInRange = true; + } else if (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) { + // end to the last + isKeyInRange = true; + } else if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) { + // key is within the range + isKeyInRange = true; + } + + return isKeyInRange; } public void close() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index d84a1b958b14c..6191c4988877e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -74,18 +74,10 @@ public boolean hasNext() { close(); currentSegment = segments.next(); try { - if (from == null && to == null) { - if (forward) { - currentIterator = currentSegment.all(); - } else { - currentIterator = currentSegment.reverseAll(); - } + if (forward) { + currentIterator = currentSegment.range(from, to); } else { - if (forward) { - currentIterator = currentSegment.range(from, to); - } else { - currentIterator = currentSegment.reverseRange(from, to); - } + currentIterator = currentSegment.reverseRange(from, to); } } catch (final InvalidStateStoreException e) { // segment may have been closed so we ignore it. diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index c9461a05031de..2d64a44aa063a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -54,6 +54,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -64,6 +65,7 @@ import static java.util.Arrays.asList; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.apache.kafka.test.StreamsTestUtils.toList; +import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.hamcrest.CoreMatchers.equalTo; @@ -284,15 +286,14 @@ public void shouldPutFetchRangeFromCache() { try (final KeyValueIterator, byte[]> iterator = cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "a"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("a", "b"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); assertEquals(2, cache.size()); } } @@ -307,23 +308,16 @@ public void shouldPutFetchRangeFromCacheForNullKeyFrom() { try (final KeyValueIterator, byte[]> iterator = cachingStore.fetch(null, bytesKey("d"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "a"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), - "c"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "d"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("a", "b", "c", "d"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); } } @@ -337,23 +331,16 @@ public void shouldPutFetchRangeFromCacheForNullKeyTo() { try (final KeyValueIterator, byte[]> iterator = cachingStore.fetch(bytesKey("b"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), - "c"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "d"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "e"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("b", "c", "d", "e"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); } } @@ -367,27 +354,17 @@ public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() { try (final KeyValueIterator, byte[]> iterator = cachingStore.fetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "a"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), - "c"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "d"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "e"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("a", "b", "c", "d", "e"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); } } @@ -401,19 +378,15 @@ public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() { try (final KeyValueIterator, byte[]> iterator = cachingStore.backwardFetch(null, bytesKey("c"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), - "c"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "a"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("c", "b", "a"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); } } @@ -427,19 +400,15 @@ public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() { try (final KeyValueIterator, byte[]> iterator = cachingStore.backwardFetch(bytesKey("c"), null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "e"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "d"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), - "c"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("e", "d", "c"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); } } @@ -453,27 +422,17 @@ public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() { try (final KeyValueIterator, byte[]> iterator = cachingStore.backwardFetch(null, null, ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) { - verifyWindowedKeyValue( - iterator.next(), + final List> expectedKeys = Arrays.asList( new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "e"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP + 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)), - "d"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP + 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)), - "c"); - verifyWindowedKeyValue( - iterator.next(), new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "b"); - verifyWindowedKeyValue( - iterator.next(), - new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), - "a"); - assertFalse(iterator.hasNext()); + new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)) + ); + + final List expectedValues = Arrays.asList("e", "d", "c", "b", "a"); + + verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues); } } diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 8d0bb34bf0308..aa2942baddb6d 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; import java.util.ArrayList; import java.util.Arrays; @@ -44,6 +45,7 @@ import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; public final class StreamsTestUtils { private StreamsTestUtils() {} @@ -161,6 +163,24 @@ public static void verifyKeyValueList(final List> expect } } + public static void verifyAllWindowedKeyValues(final KeyValueIterator, byte[]> iterator, + final List> expectedKeys, + final List expectedValues) { + if (expectedKeys.size() != expectedValues.size()) { + throw new IllegalArgumentException("expectedKeys and expectedValues should have the same size. " + + "expectedKeys size: " + expectedKeys.size() + ", expectedValues size: " + expectedValues.size()); + } + + for (int i = 0; i < expectedKeys.size(); i++) { + verifyWindowedKeyValue( + iterator.next(), + expectedKeys.get(i), + expectedValues.get(i) + ); + } + assertFalse(iterator.hasNext()); + } + public static void verifyWindowedKeyValue(final KeyValue, byte[]> actual, final Windowed expectedKey, final String expectedValue) { From 88cc4100a96a267c54d83ebcad9728a9af6c5fd3 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 26 Aug 2021 15:30:42 +0800 Subject: [PATCH 3/7] KAFKA-13211: fix bug and improve test coverage --- .../state/internals/MeteredWindowStore.java | 12 +- ...bstractRocksDBSegmentedBytesStoreTest.java | 136 +++++++++++++++++- .../internals/MeteredWindowStoreTest.java | 39 +++-- 3 files changed, 157 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 2322da530d8ed..48b42fbabe6ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -246,7 +246,11 @@ public KeyValueIterator, V> fetch(final K keyFrom, final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>( - wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo), + wrapped().fetch( + keyFrom == null ? null : keyBytes(keyFrom), + keyTo == null ? null : keyBytes(keyTo), + timeFrom, + timeTo), fetchSensor, streamsMetrics, serdes, @@ -259,7 +263,11 @@ public KeyValueIterator, V> backwardFetch(final K keyFrom, final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>( - wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo), timeFrom, timeTo), + wrapped().backwardFetch( + keyFrom == null ? null : keyBytes(keyFrom), + keyTo == null ? null : keyBytes(keyTo), + timeFrom, + timeTo), fetchSensor, streamsMetrics, serdes, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java index 21aaf0fd8378f..e1dda05bfefb9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java @@ -145,16 +145,138 @@ public void close() { @Test public void shouldPutAndFetch() { - final String key = "a"; - bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(10)); - bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(50)); - bytesStore.put(serializeKey(new Windowed<>(key, windows[2])), serializeValue(100)); + final String keyA = "a"; + final String keyB = "b"; + final String keyC = "c"; + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100)); + bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200)); - try (final KeyValueIterator values = bytesStore.fetch(Bytes.wrap(key.getBytes()), 0, 500)) { + try (final KeyValueIterator values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { final List, Long>> expected = Arrays.asList( - KeyValue.pair(new Windowed<>(key, windows[0]), 10L), - KeyValue.pair(new Windowed<>(key, windows[1]), 50L) + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.fetch( + null, null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L) + ); + + assertEquals(expected, toList(values)); + } + } + + @Test + public void shouldPutAndBackwardFetch() { + final String keyA = "a"; + final String keyB = "b"; + final String keyC = "c"; + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), serializeValue(50)); + bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), serializeValue(100)); + bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), serializeValue(200)); + + try (final KeyValueIterator values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + Bytes.wrap(keyA.getBytes()), Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + null, Bytes.wrap(keyB.getBytes()), 0, windows[2].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + Bytes.wrap(keyB.getBytes()), null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + try (final KeyValueIterator values = bytesStore.backwardFetch( + null, null, 0, windows[3].start())) { + + final List, Long>> expected = Arrays.asList( + KeyValue.pair(new Windowed<>(keyC, windows[3]), 200L), + KeyValue.pair(new Windowed<>(keyB, windows[2]), 100L), + KeyValue.pair(new Windowed<>(keyA, windows[1]), 50L), + KeyValue.pair(new Windowed<>(keyA, windows[0]), 10L) ); assertEquals(expected, toList(values)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 649e159c81330..ca6a518eb4cb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -274,10 +274,19 @@ public void shouldFetchFromInnerStoreAndRecordFetchMetrics() { public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.fetch(null, null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); replay(innerStoreMock); store.init((StateStoreContext) context, store); store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.fetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.fetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.fetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; // it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor // and the sensor is tested elsewhere @@ -306,10 +315,19 @@ public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() { public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() { expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)) .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); + expect(innerStoreMock.backwardFetch(null, null, 1, 1)) + .andReturn(KeyValueIterators.emptyIterator()); replay(innerStoreMock); store.init((StateStoreContext) context, store); store.backwardFetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.backwardFetch(null, "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.backwardFetch("a", null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; + store.backwardFetch(null, null, ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; // it suffices to verify one fetch metric since all fetch metrics are recorded by the same sensor // and the sensor is tested elsewhere @@ -454,27 +472,6 @@ public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() { assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L)); } - @Test - public void shouldThrowNullPointerOnFetchRangeIfFromIsNull() { - assertThrows(NullPointerException.class, () -> store.fetch(null, "to", 0L, 1L)); - } - - @Test - public void shouldThrowNullPointerOnFetchRangeIfToIsNull() { - assertThrows(NullPointerException.class, () -> store.fetch("from", null, 0L, 1L)); - } - - - @Test - public void shouldThrowNullPointerOnbackwardFetchRangeIfFromIsNull() { - assertThrows(NullPointerException.class, () -> store.backwardFetch(null, "to", 0L, 1L)); - } - - @Test - public void shouldThrowNullPointerOnbackwardFetchRangeIfToIsNull() { - assertThrows(NullPointerException.class, () -> store.backwardFetch("from", null, 0L, 1L)); - } - private KafkaMetric metric(final String name) { return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", tags)); } From 688c4222eee8e34e67a8976c49256005b4b5af58 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sat, 28 Aug 2021 16:38:28 +0800 Subject: [PATCH 4/7] KAFKA-13211: add integration tests --- .../state/internals/CachingWindowStore.java | 8 +- .../KStreamWindowStoreRangeQueryTest.java | 299 ++++++++++++++++ ...ngeQueryForWindowStoreIntegrationTest.java | 338 ++++++++++++++++++ 3 files changed, 641 insertions(+), 4 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 41152979cd4f6..fa04ac8141a29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -266,8 +266,8 @@ public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, true) : context.cache().range( cacheName, - cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), - cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) + keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), + keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo); @@ -310,8 +310,8 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr new CacheIteratorWrapper(keyFrom, keyTo, timeFrom, timeTo, false) : context.cache().reverseRange( cacheName, - cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), - cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) + keyFrom == null ? null : cacheFunction.cacheKey(keySchema.lowerRange(keyFrom, timeFrom)), + keyTo == null ? null : cacheFunction.cacheKey(keySchema.upperRange(keyTo, timeTo)) ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java new file mode 100644 index 0000000000000..37dad76548a0e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@RunWith(Parameterized.class) +public class KStreamWindowStoreRangeQueryTest { + private enum StoreType { InMemory, RocksDB, Timed }; + private static final String STORE_NAME = "store"; + private static final int DATA_SIZE = 5; + private static final long WINDOW_SIZE = 500L; + private static final long RETENTION_MS = 10000L; + + private StoreType storeType; + private boolean enableLogging; + private boolean enableCaching; + private boolean forward; + + private LinkedList, Long>> expectedRecords; + private LinkedList> records; + private String low; + private String high; + private String middle; + private String innerLow; + private String innerHigh; + private String innerLowBetween; + private String innerHighBetween; + + private Properties streamsConfig; + + private TimeWindowedKStream windowedStream; + + public KStreamWindowStoreRangeQueryTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { + this.storeType = storeType; + this.enableLogging = enableLogging; + this.enableCaching = enableCaching; + this.forward = forward; + + this.records = new LinkedList<>(); + this.expectedRecords = new LinkedList<>(); + final int m = DATA_SIZE / 2; + for (int i = 0; i < DATA_SIZE; i++) { + final String key = "key-" + i * 2; + final String value = "val-" + i * 2; + final KeyValue r = new KeyValue<>(key, value); + records.add(r); + records.add(r); + // expected the count of each key is 2 + final long windowStartTime = i < m ? 0 : WINDOW_SIZE; + expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); + high = key; + if (low == null) { + low = key; + } + if (i == m) { + middle = key; + } + if (i == 1) { + innerLow = key; + final int index = i * 2 - 1; + innerLowBetween = "key-" + index; + } + if (i == DATA_SIZE - 2) { + innerHigh = key; + final int index = i * 2 + 1; + innerHighBetween = "key-" + index; + } + } + Assert.assertNotNull(low); + Assert.assertNotNull(high); + Assert.assertNotNull(middle); + Assert.assertNotNull(innerLow); + Assert.assertNotNull(innerHigh); + Assert.assertNotNull(innerLowBetween); + Assert.assertNotNull(innerHighBetween); + } + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") + public static Collection data() { + final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); + final List logging = Arrays.asList(true, false); + final List caching = Arrays.asList(true, false); + final List forward = Arrays.asList(true, false); + return buildParameters(types, logging, caching, forward); + } + + @Before + public void setup() { + streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) + )); + } + + @Test + public void testStoreConfig() { + final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); + //Create topology: table from input topic + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + stream. + groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE))) + .count(stateStoreConfig) + .toStream() + .to("output"); + + final Topology topology = builder.build(); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { + //get input topic and stateStore + final TestInputTopic input = driver + .createInputTopic("input", new StringSerializer(), new StringSerializer()); + final WindowStore stateStore = driver.getWindowStore(STORE_NAME); + + //write some data + final int medium = DATA_SIZE / 2 * 2; + for (int i = 0; i < records.size(); i++) { + final KeyValue kv = records.get(i); + final long windowStartTime = i < medium ? 0 : WINDOW_SIZE; + input.pipeInput(kv.key, kv.value, windowStartTime); + } + + // query the state store + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetch(null, null, 0, Long.MAX_VALUE) : + stateStore.backwardFetch(null, null, 0, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + + testRange("range", stateStore, innerLow, innerHigh, forward); + testRange("until", stateStore, null, middle, forward); + testRange("from", stateStore, middle, null, forward); + + testRange("untilBetween", stateStore, null, innerHighBetween, forward); + testRange("fromBetween", stateStore, innerLowBetween, null, forward); + } + } + + private List, Long>> filterList(final KeyValueIterator, Long> iterator, final String from, final String to) { + final Predicate, Long>> pred = new Predicate, Long>>() { + @Override + public boolean test(final KeyValue, Long> elem) { + if (from != null && elem.key.key().compareTo(from) < 0) { + return false; + } + if (to != null && elem.key.key().compareTo(to) > 0) { + return false; + } + return elem != null; + } + }; + + return Utils.toList(iterator, pred); + } + + private void testRange(final String name, final WindowStore store, final String from, final String to, final boolean forward) { + try (final KeyValueIterator, Long> resultIterator = forward ? store.fetch(from, to, 0, Long.MAX_VALUE) : store.backwardFetch(from, to, 0, Long.MAX_VALUE); + final KeyValueIterator, Long> expectedIterator = forward ? store.fetchAll(0, Long.MAX_VALUE) : store.backwardFetchAll(0, Long.MAX_VALUE)) { + final List, Long>> result = Utils.toList(resultIterator); + final List, Long>> expected = filterList(expectedIterator, from, to); + assertThat(result, is(expected)); + } + } + + private static Collection buildParameters(final List... argOptions) { + List result = new LinkedList<>(); + result.add(new Object[0]); + + for (final List argOption : argOptions) { + result = times(result, argOption); + } + + return result; + } + + private static List times(final List left, final List right) { + final List result = new LinkedList<>(); + for (final Object[] args : left) { + for (final Object rightElem : right) { + final Object[] resArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, resArgs, 0, args.length); + resArgs[args.length] = rightElem; + result.add(resArgs); + } + } + return result; + } + + private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + final Supplier createStore = () -> { + if (type == StoreType.InMemory) { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.RocksDB) { + return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.Timed) { + return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } + }; + + final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); + final Materialized> stateStoreConfig = Materialized + .as(stateStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()); + if (cachingEnabled) { + stateStoreConfig.withCachingEnabled(); + } else { + stateStoreConfig.withCachingDisabled(); + } + if (loggingEnabled) { + stateStoreConfig.withLoggingEnabled(new HashMap()); + } else { + stateStoreConfig.withLoggingDisabled(); + } + return stateStoreConfig; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java new file mode 100644 index 0000000000000..d057285488283 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + + +@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. +@RunWith(Parameterized.class) +@Category({IntegrationTest.class}) +public class RangeQueryForWindowStoreIntegrationTest { + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static final Properties STREAMS_CONFIG = new Properties(); + private static final String APP_ID = "range-query-integration-test"; + private static final Long COMMIT_INTERVAL = 100L; + private static final String INPUT_STREAM = "input"; + private static final String STORE_NAME = "store"; + private static final int DATA_SIZE = 5; + private static final long WINDOW_SIZE = 500L; + private static final long RETENTION_MS = 300000L; + private static int storeNameCount; + + private enum StoreType { InMemory, RocksDB, Timed }; + private StoreType storeType; + private boolean enableLogging; + private boolean enableCaching; + private boolean forward; + private KafkaStreams kafkaStreams; + + private LinkedList, Long>> expectedRecords; + private LinkedList> records; + private String low; + private String high; + private String middle; + private String innerLow; + private String innerHigh; + private String innerLowBetween; + private String innerHighBetween; + private String storeName; + + public RangeQueryForWindowStoreIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { + this.storeType = storeType; + this.enableLogging = enableLogging; + this.enableCaching = enableCaching; + this.forward = forward; + + this.records = new LinkedList<>(); + this.expectedRecords = new LinkedList<>(); + final int m = DATA_SIZE / 2; + for (int i = 0; i < DATA_SIZE; i++) { + final String key = "key-" + i * 2; + final String value = "val-" + i * 2; + final KeyValue r = new KeyValue<>(key, value); + records.add(r); + records.add(r); + // expected the count of each key is 2 + expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(0L, WINDOW_SIZE)), 2L)); + high = key; + if (low == null) { + low = key; + } + if (i == m) { + middle = key; + } + if (i == 1) { + innerLow = key; + final int index = i * 2 - 1; + innerLowBetween = "key-" + index; + } + if (i == DATA_SIZE - 2) { + innerHigh = key; + final int index = i * 2 + 1; + innerHighBetween = "key-" + index; + } + } + Assert.assertNotNull(low); + Assert.assertNotNull(high); + Assert.assertNotNull(middle); + Assert.assertNotNull(innerLow); + Assert.assertNotNull(innerHigh); + Assert.assertNotNull(innerLowBetween); + Assert.assertNotNull(innerHighBetween); + } + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") + public static Collection data() { + final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); + final List logging = Arrays.asList(true, false); + final List caching = Arrays.asList(true, false); + final List forward = Arrays.asList(true, false); + return buildParameters(types, logging, caching, forward); + } + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Before + public void setupTopics() throws Exception { + CLUSTER.createTopic(INPUT_STREAM); + storeName = STORE_NAME + storeNameCount++; + } + + @After + public void cleanup() throws InterruptedException { + CLUSTER.deleteAllTopicsAndWait(120000); + } + + @Test + public void testStoreConfig() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final Materialized> stateStoreConfig = getStoreConfig(storeType, storeName, enableLogging, enableCaching); + final KStream stream = builder.stream(INPUT_STREAM, Consumed.with(Serdes.String(), Serdes.String())); + stream. + groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE))) + .count(stateStoreConfig) + .toStream() + .to("output"); + + + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) { + final List kafkaStreamsList = Arrays.asList(kafkaStreams); + + IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); + + writeInputData(); + + final ReadOnlyWindowStore stateStore = IntegrationTestUtils.getStore(1000_000L, storeName, kafkaStreams, QueryableStoreTypes.windowStore()); + + TestUtils.retryOnExceptionWithTimeout(60000, () -> { + //query the state store + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetch(null, null, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)) : + stateStore.backwardFetch(null, null, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE))) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + }); + + testRange("range", stateStore, innerLow, innerHigh, forward); + testRange("until", stateStore, null, middle, forward); + testRange("from", stateStore, middle, null, forward); + + testRange("untilBetween", stateStore, null, innerHighBetween, forward); + testRange("fromBetween", stateStore, innerLowBetween, null, forward); + } + } + + private void writeInputData() { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + INPUT_STREAM, + records, + TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), + 0L + ); + } + + private List, Long>> filterList(final KeyValueIterator, Long> iterator, final String from, final String to) { + final Predicate, Long>> pred = new Predicate, Long>>() { + @Override + public boolean test(final KeyValue, Long> elem) { + if (from != null && elem.key.key().compareTo(from) < 0) { + return false; + } + if (to != null && elem.key.key().compareTo(to) > 0) { + return false; + } + return elem != null; + } + }; + + return Utils.toList(iterator, pred); + } + + private void testRange(final String name, final ReadOnlyWindowStore store, final String from, final String to, final boolean forward) { + try (final KeyValueIterator, Long> resultIterator = forward ? + store.fetch(from, to, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)) : + store.backwardFetch(from, to, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)); + final KeyValueIterator, Long> expectedIterator = forward ? store.all() : store.backwardAll()) { + final List, Long>> result = Utils.toList(resultIterator); + final List, Long>> expected = filterList(expectedIterator, from, to); + assertThat(result, is(expected)); + } + } + +// private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + final Supplier createStore = () -> { + if (type == StoreType.InMemory) { + return Stores.inMemoryWindowStore(name, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.RocksDB) { + return Stores.persistentWindowStore(name, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.Timed) { + return Stores.persistentTimestampedWindowStore(name, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else { + return Stores.inMemoryWindowStore(name, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } + }; + + final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); + final Materialized> stateStoreConfig = Materialized + .as(stateStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()); + if (cachingEnabled) { + stateStoreConfig.withCachingEnabled(); + } else { + stateStoreConfig.withCachingDisabled(); + } + if (loggingEnabled) { + stateStoreConfig.withLoggingEnabled(new HashMap()); + } else { + stateStoreConfig.withLoggingDisabled(); + } + return stateStoreConfig; + } + + private static Collection buildParameters(final List... argOptions) { + List result = new LinkedList<>(); + result.add(new Object[0]); + + for (final List argOption : argOptions) { + result = times(result, argOption); + } + + return result; + } + + private static List times(final List left, final List right) { + final List result = new LinkedList<>(); + for (final Object[] args : left) { + for (final Object rightElem : right) { + final Object[] resArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, resArgs, 0, args.length); + resArgs[args.length] = rightElem; + result.add(resArgs); + } + } + return result; + } +} From db0c2992336095df0969254d27b62230b4a6ed77 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 2 Sep 2021 16:09:24 +0800 Subject: [PATCH 5/7] KAFKA-13264: fix inMemoryStateStore backward fetch not in reversed order --- .../state/internals/InMemoryWindowStore.java | 15 +- .../WindowStoreFetchIntegrationTest.java | 229 ++++++++++++++++++ 2 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index a3acb57e8efbe..6d02d0b732427 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -517,14 +517,21 @@ Iterator> setRecordIterator() { final Map.Entry> currentSegment = segmentIterator.next(); currentTime = currentSegment.getKey(); + ConcurrentNavigableMap subMap; if (allKeys) { // keyFrom == null && keyTo == null - return currentSegment.getValue().entrySet().iterator(); + subMap = currentSegment.getValue(); } else if (keyFrom == null) { - return currentSegment.getValue().headMap(keyTo, true).entrySet().iterator(); + subMap = currentSegment.getValue().headMap(keyTo, true); } else if (keyTo == null) { - return currentSegment.getValue().tailMap(keyFrom, true).entrySet().iterator(); + subMap = currentSegment.getValue().tailMap(keyFrom, true); } else { - return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); + subMap = currentSegment.getValue().subMap(keyFrom, true, keyTo, true); + } + + if (forward) { + return subMap.entrySet().iterator(); + } else { + return subMap.descendingMap().entrySet().iterator(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java new file mode 100644 index 0000000000000..d39836640a21a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +@RunWith(Parameterized.class) +public class WindowStoreFetchIntegrationTest { + private enum StoreType { InMemory, RocksDB, Timed }; + private static final String STORE_NAME = "store"; + private static final int DATA_SIZE = 5; + private static final long WINDOW_SIZE = 500L; + private static final long RETENTION_MS = 10000L; + + private StoreType storeType; + private boolean enableLogging; + private boolean enableCaching; + private boolean forward; + + private LinkedList, Long>> expectedRecords; + private LinkedList> records; + private Properties streamsConfig; + + private TimeWindowedKStream windowedStream; + + public WindowStoreFetchIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { + this.storeType = storeType; + this.enableLogging = enableLogging; + this.enableCaching = enableCaching; + this.forward = forward; + + this.records = new LinkedList<>(); + this.expectedRecords = new LinkedList<>(); + final int m = DATA_SIZE / 2; + for (int i = 0; i < DATA_SIZE; i++) { + final String key = "key-" + i * 2; + final String value = "val-" + i * 2; + final KeyValue r = new KeyValue<>(key, value); + records.add(r); + records.add(r); + // expected the count of each key is 2 + final long windowStartTime = i < m ? 0 : WINDOW_SIZE; + expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); + } + } + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") + public static Collection data() { + final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); + final List logging = Arrays.asList(true, false); + final List caching = Arrays.asList(true, false); + final List forward = Arrays.asList(true, false); + return buildParameters(types, logging, caching, forward); + } + + @Before + public void setup() { + streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) + )); + } + + @Test + public void testStoreConfig() { + final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); + //Create topology: table from input topic + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + stream. + groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE))) + .count(stateStoreConfig) + .toStream() + .to("output"); + + final Topology topology = builder.build(); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { + //get input topic and stateStore + final TestInputTopic input = driver + .createInputTopic("input", new StringSerializer(), new StringSerializer()); + final WindowStore stateStore = driver.getWindowStore(STORE_NAME); + + //write some data + final int medium = DATA_SIZE / 2 * 2; + for (int i = 0; i < records.size(); i++) { + final KeyValue kv = records.get(i); + final long windowStartTime = i < medium ? 0 : WINDOW_SIZE; + input.pipeInput(kv.key, kv.value, windowStartTime + i); + } + + // query the state store + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetchAll(0, Long.MAX_VALUE) : + stateStore.backwardFetchAll(0, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + } + } + + private static Collection buildParameters(final List... argOptions) { + List result = new LinkedList<>(); + result.add(new Object[0]); + + for (final List argOption : argOptions) { + result = times(result, argOption); + } + + return result; + } + + private static List times(final List left, final List right) { + final List result = new LinkedList<>(); + for (final Object[] args : left) { + for (final Object rightElem : right) { + final Object[] resArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, resArgs, 0, args.length); + resArgs[args.length] = rightElem; + result.add(resArgs); + } + } + return result; + } + + private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + final Supplier createStore = () -> { + if (type == StoreType.InMemory) { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.RocksDB) { + return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.Timed) { + return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } + }; + + final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); + final Materialized> stateStoreConfig = Materialized + .as(stateStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()); + if (cachingEnabled) { + stateStoreConfig.withCachingEnabled(); + } else { + stateStoreConfig.withCachingDisabled(); + } + if (loggingEnabled) { + stateStoreConfig.withLoggingEnabled(new HashMap()); + } else { + stateStoreConfig.withLoggingDisabled(); + } + return stateStoreConfig; + } +} From eb19d1224b4c3a9bfd3da26e3cfe75bc49c56376 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 14 Sep 2021 20:53:32 +0800 Subject: [PATCH 6/7] KAFKA-13211: fix inMemory state store backward fetch issue --- .../state/internals/InMemoryWindowStore.java | 2 +- .../KStreamWindowStoreRangeQueryTest.java | 299 ------------------ .../WindowStoreFetchIntegrationTest.java | 229 -------------- .../AbstractWindowBytesStoreTest.java | 8 +- .../state/internals/WindowStoreFetchTest.java | 81 +++++ 5 files changed, 86 insertions(+), 533 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java delete mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 6d02d0b732427..3da5a04edd79d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -517,7 +517,7 @@ Iterator> setRecordIterator() { final Map.Entry> currentSegment = segmentIterator.next(); currentTime = currentSegment.getKey(); - ConcurrentNavigableMap subMap; + final ConcurrentNavigableMap subMap; if (allKeys) { // keyFrom == null && keyTo == null subMap = currentSegment.getValue(); } else if (keyFrom == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java deleted file mode 100644 index 37dad76548a0e..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamWindowStoreRangeQueryTest.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.integration; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.WindowBytesStoreSupplier; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.function.Predicate; -import java.util.function.Supplier; - -import static java.time.Duration.ofMillis; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -@RunWith(Parameterized.class) -public class KStreamWindowStoreRangeQueryTest { - private enum StoreType { InMemory, RocksDB, Timed }; - private static final String STORE_NAME = "store"; - private static final int DATA_SIZE = 5; - private static final long WINDOW_SIZE = 500L; - private static final long RETENTION_MS = 10000L; - - private StoreType storeType; - private boolean enableLogging; - private boolean enableCaching; - private boolean forward; - - private LinkedList, Long>> expectedRecords; - private LinkedList> records; - private String low; - private String high; - private String middle; - private String innerLow; - private String innerHigh; - private String innerLowBetween; - private String innerHighBetween; - - private Properties streamsConfig; - - private TimeWindowedKStream windowedStream; - - public KStreamWindowStoreRangeQueryTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { - this.storeType = storeType; - this.enableLogging = enableLogging; - this.enableCaching = enableCaching; - this.forward = forward; - - this.records = new LinkedList<>(); - this.expectedRecords = new LinkedList<>(); - final int m = DATA_SIZE / 2; - for (int i = 0; i < DATA_SIZE; i++) { - final String key = "key-" + i * 2; - final String value = "val-" + i * 2; - final KeyValue r = new KeyValue<>(key, value); - records.add(r); - records.add(r); - // expected the count of each key is 2 - final long windowStartTime = i < m ? 0 : WINDOW_SIZE; - expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); - high = key; - if (low == null) { - low = key; - } - if (i == m) { - middle = key; - } - if (i == 1) { - innerLow = key; - final int index = i * 2 - 1; - innerLowBetween = "key-" + index; - } - if (i == DATA_SIZE - 2) { - innerHigh = key; - final int index = i * 2 + 1; - innerHighBetween = "key-" + index; - } - } - Assert.assertNotNull(low); - Assert.assertNotNull(high); - Assert.assertNotNull(middle); - Assert.assertNotNull(innerLow); - Assert.assertNotNull(innerHigh); - Assert.assertNotNull(innerLowBetween); - Assert.assertNotNull(innerHighBetween); - } - - @Rule - public TestName testName = new TestName(); - - @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") - public static Collection data() { - final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); - final List logging = Arrays.asList(true, false); - final List caching = Arrays.asList(true, false); - final List forward = Arrays.asList(true, false); - return buildParameters(types, logging, caching, forward); - } - - @Before - public void setup() { - streamsConfig = mkProperties(mkMap( - mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) - )); - } - - @Test - public void testStoreConfig() { - final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); - //Create topology: table from input topic - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); - stream. - groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE))) - .count(stateStoreConfig) - .toStream() - .to("output"); - - final Topology topology = builder.build(); - - try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { - //get input topic and stateStore - final TestInputTopic input = driver - .createInputTopic("input", new StringSerializer(), new StringSerializer()); - final WindowStore stateStore = driver.getWindowStore(STORE_NAME); - - //write some data - final int medium = DATA_SIZE / 2 * 2; - for (int i = 0; i < records.size(); i++) { - final KeyValue kv = records.get(i); - final long windowStartTime = i < medium ? 0 : WINDOW_SIZE; - input.pipeInput(kv.key, kv.value, windowStartTime); - } - - // query the state store - try (final KeyValueIterator, Long> scanIterator = forward ? - stateStore.fetch(null, null, 0, Long.MAX_VALUE) : - stateStore.backwardFetch(null, null, 0, Long.MAX_VALUE)) { - - final Iterator, Long>> dataIterator = forward ? - expectedRecords.iterator() : - expectedRecords.descendingIterator(); - - TestUtils.checkEquals(scanIterator, dataIterator); - } - - testRange("range", stateStore, innerLow, innerHigh, forward); - testRange("until", stateStore, null, middle, forward); - testRange("from", stateStore, middle, null, forward); - - testRange("untilBetween", stateStore, null, innerHighBetween, forward); - testRange("fromBetween", stateStore, innerLowBetween, null, forward); - } - } - - private List, Long>> filterList(final KeyValueIterator, Long> iterator, final String from, final String to) { - final Predicate, Long>> pred = new Predicate, Long>>() { - @Override - public boolean test(final KeyValue, Long> elem) { - if (from != null && elem.key.key().compareTo(from) < 0) { - return false; - } - if (to != null && elem.key.key().compareTo(to) > 0) { - return false; - } - return elem != null; - } - }; - - return Utils.toList(iterator, pred); - } - - private void testRange(final String name, final WindowStore store, final String from, final String to, final boolean forward) { - try (final KeyValueIterator, Long> resultIterator = forward ? store.fetch(from, to, 0, Long.MAX_VALUE) : store.backwardFetch(from, to, 0, Long.MAX_VALUE); - final KeyValueIterator, Long> expectedIterator = forward ? store.fetchAll(0, Long.MAX_VALUE) : store.backwardFetchAll(0, Long.MAX_VALUE)) { - final List, Long>> result = Utils.toList(resultIterator); - final List, Long>> expected = filterList(expectedIterator, from, to); - assertThat(result, is(expected)); - } - } - - private static Collection buildParameters(final List... argOptions) { - List result = new LinkedList<>(); - result.add(new Object[0]); - - for (final List argOption : argOptions) { - result = times(result, argOption); - } - - return result; - } - - private static List times(final List left, final List right) { - final List result = new LinkedList<>(); - for (final Object[] args : left) { - for (final Object rightElem : right) { - final Object[] resArgs = new Object[args.length + 1]; - System.arraycopy(args, 0, resArgs, 0, args.length); - resArgs[args.length] = rightElem; - result.add(resArgs); - } - } - return result; - } - - private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { - final Supplier createStore = () -> { - if (type == StoreType.InMemory) { - return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else if (type == StoreType.RocksDB) { - return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else if (type == StoreType.Timed) { - return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else { - return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } - }; - - final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); - final Materialized> stateStoreConfig = Materialized - .as(stateStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Long()); - if (cachingEnabled) { - stateStoreConfig.withCachingEnabled(); - } else { - stateStoreConfig.withCachingDisabled(); - } - if (loggingEnabled) { - stateStoreConfig.withLoggingEnabled(new HashMap()); - } else { - stateStoreConfig.withLoggingDisabled(); - } - return stateStoreConfig; - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java deleted file mode 100644 index d39836640a21a..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.integration; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.WindowBytesStoreSupplier; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.function.Supplier; - -import static java.time.Duration.ofMillis; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkProperties; - -@RunWith(Parameterized.class) -public class WindowStoreFetchIntegrationTest { - private enum StoreType { InMemory, RocksDB, Timed }; - private static final String STORE_NAME = "store"; - private static final int DATA_SIZE = 5; - private static final long WINDOW_SIZE = 500L; - private static final long RETENTION_MS = 10000L; - - private StoreType storeType; - private boolean enableLogging; - private boolean enableCaching; - private boolean forward; - - private LinkedList, Long>> expectedRecords; - private LinkedList> records; - private Properties streamsConfig; - - private TimeWindowedKStream windowedStream; - - public WindowStoreFetchIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { - this.storeType = storeType; - this.enableLogging = enableLogging; - this.enableCaching = enableCaching; - this.forward = forward; - - this.records = new LinkedList<>(); - this.expectedRecords = new LinkedList<>(); - final int m = DATA_SIZE / 2; - for (int i = 0; i < DATA_SIZE; i++) { - final String key = "key-" + i * 2; - final String value = "val-" + i * 2; - final KeyValue r = new KeyValue<>(key, value); - records.add(r); - records.add(r); - // expected the count of each key is 2 - final long windowStartTime = i < m ? 0 : WINDOW_SIZE; - expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); - } - } - - @Rule - public TestName testName = new TestName(); - - @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") - public static Collection data() { - final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); - final List logging = Arrays.asList(true, false); - final List caching = Arrays.asList(true, false); - final List forward = Arrays.asList(true, false); - return buildParameters(types, logging, caching, forward); - } - - @Before - public void setup() { - streamsConfig = mkProperties(mkMap( - mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) - )); - } - - @Test - public void testStoreConfig() { - final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); - //Create topology: table from input topic - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); - stream. - groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE))) - .count(stateStoreConfig) - .toStream() - .to("output"); - - final Topology topology = builder.build(); - - try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { - //get input topic and stateStore - final TestInputTopic input = driver - .createInputTopic("input", new StringSerializer(), new StringSerializer()); - final WindowStore stateStore = driver.getWindowStore(STORE_NAME); - - //write some data - final int medium = DATA_SIZE / 2 * 2; - for (int i = 0; i < records.size(); i++) { - final KeyValue kv = records.get(i); - final long windowStartTime = i < medium ? 0 : WINDOW_SIZE; - input.pipeInput(kv.key, kv.value, windowStartTime + i); - } - - // query the state store - try (final KeyValueIterator, Long> scanIterator = forward ? - stateStore.fetchAll(0, Long.MAX_VALUE) : - stateStore.backwardFetchAll(0, Long.MAX_VALUE)) { - - final Iterator, Long>> dataIterator = forward ? - expectedRecords.iterator() : - expectedRecords.descendingIterator(); - - TestUtils.checkEquals(scanIterator, dataIterator); - } - } - } - - private static Collection buildParameters(final List... argOptions) { - List result = new LinkedList<>(); - result.add(new Object[0]); - - for (final List argOption : argOptions) { - result = times(result, argOption); - } - - return result; - } - - private static List times(final List left, final List right) { - final List result = new LinkedList<>(); - for (final Object[] args : left) { - for (final Object rightElem : right) { - final Object[] resArgs = new Object[args.length + 1]; - System.arraycopy(args, 0, resArgs, 0, args.length); - resArgs[args.length] = rightElem; - result.add(resArgs); - } - } - return result; - } - - private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { - final Supplier createStore = () -> { - if (type == StoreType.InMemory) { - return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else if (type == StoreType.RocksDB) { - return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else if (type == StoreType.Timed) { - return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else { - return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } - }; - - final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); - final Materialized> stateStoreConfig = Materialized - .as(stateStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Long()); - if (cachingEnabled) { - stateStoreConfig.withCachingEnabled(); - } else { - stateStoreConfig.withCachingDisabled(); - } - if (loggingEnabled) { - stateStoreConfig.withLoggingEnabled(new HashMap()); - } else { - stateStoreConfig.withLoggingDisabled(); - } - return stateStoreConfig; - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index a436b1444109c..e93f758c5cffc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -445,7 +445,7 @@ public void testFetchRange() { ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L))) ); assertEquals( - asList(two, four, five), + asList(two, three, four, five), toList(windowStore.fetch( 2, null, @@ -453,7 +453,7 @@ public void testFetchRange() { ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L))) ); assertEquals( - asList(zero, one, two, four, five), + asList(zero, one, two, three, four, five), toList(windowStore.fetch( null, null, @@ -539,7 +539,7 @@ public void testBackwardFetchRange() { ofEpochMilli(defaultStartTime + WINDOW_SIZE + 2L))) ); assertEquals( - asList(five, four, two), + asList(five, four, three, two), toList(windowStore.backwardFetch( 2, null, @@ -547,7 +547,7 @@ public void testBackwardFetchRange() { ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5L))) ); assertEquals( - asList(five, four, two, one, zero), + asList(five, four, three, two, one, zero), toList(windowStore.backwardFetch( null, null, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java index 833ab6a074cc4..a429256ad9fd1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -39,6 +40,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.TestUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -54,12 +56,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.function.Predicate; import java.util.function.Supplier; import static java.time.Duration.ofMillis; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; @RunWith(Parameterized.class) public class WindowStoreFetchTest { @@ -77,6 +82,14 @@ private enum StoreType { InMemory, RocksDB, Timed }; private LinkedList, Long>> expectedRecords; private LinkedList> records; private Properties streamsConfig; + private String low; + private String high; + private String middle; + private String innerLow; + private String innerHigh; + private String innerLowBetween; + private String innerHighBetween; + private String storeName; private TimeWindowedKStream windowedStream; @@ -98,7 +111,31 @@ public WindowStoreFetchTest(final StoreType storeType, final boolean enableLoggi // expected the count of each key is 2 final long windowStartTime = i < m ? 0 : WINDOW_SIZE; expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); + high = key; + if (low == null) { + low = key; + } + if (i == m) { + middle = key; + } + if (i == 1) { + innerLow = key; + final int index = i * 2 - 1; + innerLowBetween = "key-" + index; + } + if (i == DATA_SIZE - 2) { + innerHigh = key; + final int index = i * 2 + 1; + innerHighBetween = "key-" + index; + } } + Assert.assertNotNull(low); + Assert.assertNotNull(high); + Assert.assertNotNull(middle); + Assert.assertNotNull(innerLow); + Assert.assertNotNull(innerHigh); + Assert.assertNotNull(innerLowBetween); + Assert.assertNotNull(innerHighBetween); } @Rule @@ -161,6 +198,50 @@ public void testStoreConfig() { TestUtils.checkEquals(scanIterator, dataIterator); } + + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetch(null, null, 0, Long.MAX_VALUE) : + stateStore.backwardFetch(null, null, 0, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + + testRange("range", stateStore, innerLow, innerHigh, forward); + testRange("until", stateStore, null, middle, forward); + testRange("from", stateStore, middle, null, forward); + + testRange("untilBetween", stateStore, null, innerHighBetween, forward); + testRange("fromBetween", stateStore, innerLowBetween, null, forward); + } + } + + private List, Long>> filterList(final KeyValueIterator, Long> iterator, final String from, final String to) { + final Predicate, Long>> pred = new Predicate, Long>>() { + @Override + public boolean test(final KeyValue, Long> elem) { + if (from != null && elem.key.key().compareTo(from) < 0) { + return false; + } + if (to != null && elem.key.key().compareTo(to) > 0) { + return false; + } + return elem != null; + } + }; + + return Utils.toList(iterator, pred); + } + + private void testRange(final String name, final WindowStore store, final String from, final String to, final boolean forward) { + try (final KeyValueIterator, Long> resultIterator = forward ? store.fetch(from, to, 0, Long.MAX_VALUE) : store.backwardFetch(from, to, 0, Long.MAX_VALUE); + final KeyValueIterator, Long> expectedIterator = forward ? store.fetchAll(0, Long.MAX_VALUE) : store.backwardFetchAll(0, Long.MAX_VALUE)) { + final List, Long>> result = Utils.toList(resultIterator); + final List, Long>> expected = filterList(expectedIterator, from, to); + assertThat(result, is(expected)); } } From d3cee059c05100e705f3fd6d691c5e6a6e3c114c Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 22 Sep 2021 15:35:55 +0800 Subject: [PATCH 7/7] KAFKA-13211: refactor --- .../state/internals/InMemoryWindowStore.java | 17 +- ...ngeQueryForWindowStoreIntegrationTest.java | 338 ------------------ 2 files changed, 7 insertions(+), 348 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 3da5a04edd79d..5327e75f1ece0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -469,23 +469,20 @@ public boolean hasNext() { } private boolean isKeyWithinRange(final Bytes key) { - boolean isKeyInRange = false; // split all cases for readability and avoid BooleanExpressionComplexity checkstyle warning if (keyFrom == null && keyTo == null) { // fetch all - isKeyInRange = true; - } else if (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) { + return true; + } else if (keyFrom == null) { // start from the beginning - isKeyInRange = true; - } else if (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) { + return key.compareTo(getKey(keyTo)) <= 0; + } else if (keyTo == null) { // end to the last - isKeyInRange = true; - } else if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) { + return key.compareTo(getKey(keyFrom)) >= 0; + } else { // key is within the range - isKeyInRange = true; + return key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0; } - - return isKeyInRange; } public void close() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java deleted file mode 100644 index d057285488283..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryForWindowStoreIntegrationTest.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.integration; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyWindowStore; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.WindowBytesStoreSupplier; -import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.function.Predicate; -import java.util.function.Supplier; - -import static java.time.Duration.ofMillis; -import static java.time.Instant.ofEpochMilli; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - - -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -@RunWith(Parameterized.class) -@Category({IntegrationTest.class}) -public class RangeQueryForWindowStoreIntegrationTest { - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - private static final Properties STREAMS_CONFIG = new Properties(); - private static final String APP_ID = "range-query-integration-test"; - private static final Long COMMIT_INTERVAL = 100L; - private static final String INPUT_STREAM = "input"; - private static final String STORE_NAME = "store"; - private static final int DATA_SIZE = 5; - private static final long WINDOW_SIZE = 500L; - private static final long RETENTION_MS = 300000L; - private static int storeNameCount; - - private enum StoreType { InMemory, RocksDB, Timed }; - private StoreType storeType; - private boolean enableLogging; - private boolean enableCaching; - private boolean forward; - private KafkaStreams kafkaStreams; - - private LinkedList, Long>> expectedRecords; - private LinkedList> records; - private String low; - private String high; - private String middle; - private String innerLow; - private String innerHigh; - private String innerLowBetween; - private String innerHighBetween; - private String storeName; - - public RangeQueryForWindowStoreIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { - this.storeType = storeType; - this.enableLogging = enableLogging; - this.enableCaching = enableCaching; - this.forward = forward; - - this.records = new LinkedList<>(); - this.expectedRecords = new LinkedList<>(); - final int m = DATA_SIZE / 2; - for (int i = 0; i < DATA_SIZE; i++) { - final String key = "key-" + i * 2; - final String value = "val-" + i * 2; - final KeyValue r = new KeyValue<>(key, value); - records.add(r); - records.add(r); - // expected the count of each key is 2 - expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(0L, WINDOW_SIZE)), 2L)); - high = key; - if (low == null) { - low = key; - } - if (i == m) { - middle = key; - } - if (i == 1) { - innerLow = key; - final int index = i * 2 - 1; - innerLowBetween = "key-" + index; - } - if (i == DATA_SIZE - 2) { - innerHigh = key; - final int index = i * 2 + 1; - innerHighBetween = "key-" + index; - } - } - Assert.assertNotNull(low); - Assert.assertNotNull(high); - Assert.assertNotNull(middle); - Assert.assertNotNull(innerLow); - Assert.assertNotNull(innerHigh); - Assert.assertNotNull(innerLowBetween); - Assert.assertNotNull(innerHighBetween); - } - - @Rule - public TestName testName = new TestName(); - - @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") - public static Collection data() { - final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); - final List logging = Arrays.asList(true, false); - final List caching = Arrays.asList(true, false); - final List forward = Arrays.asList(true, false); - return buildParameters(types, logging, caching, forward); - } - - @BeforeClass - public static void startCluster() throws IOException { - CLUSTER.start(); - STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); - STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - } - - @AfterClass - public static void closeCluster() { - CLUSTER.stop(); - } - - @Before - public void setupTopics() throws Exception { - CLUSTER.createTopic(INPUT_STREAM); - storeName = STORE_NAME + storeNameCount++; - } - - @After - public void cleanup() throws InterruptedException { - CLUSTER.deleteAllTopicsAndWait(120000); - } - - @Test - public void testStoreConfig() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - final Materialized> stateStoreConfig = getStoreConfig(storeType, storeName, enableLogging, enableCaching); - final KStream stream = builder.stream(INPUT_STREAM, Consumed.with(Serdes.String(), Serdes.String())); - stream. - groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE))) - .count(stateStoreConfig) - .toStream() - .to("output"); - - - try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) { - final List kafkaStreamsList = Arrays.asList(kafkaStreams); - - IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); - - writeInputData(); - - final ReadOnlyWindowStore stateStore = IntegrationTestUtils.getStore(1000_000L, storeName, kafkaStreams, QueryableStoreTypes.windowStore()); - - TestUtils.retryOnExceptionWithTimeout(60000, () -> { - //query the state store - try (final KeyValueIterator, Long> scanIterator = forward ? - stateStore.fetch(null, null, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)) : - stateStore.backwardFetch(null, null, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE))) { - - final Iterator, Long>> dataIterator = forward ? - expectedRecords.iterator() : - expectedRecords.descendingIterator(); - - TestUtils.checkEquals(scanIterator, dataIterator); - } - }); - - testRange("range", stateStore, innerLow, innerHigh, forward); - testRange("until", stateStore, null, middle, forward); - testRange("from", stateStore, middle, null, forward); - - testRange("untilBetween", stateStore, null, innerHighBetween, forward); - testRange("fromBetween", stateStore, innerLowBetween, null, forward); - } - } - - private void writeInputData() { - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - INPUT_STREAM, - records, - TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), - 0L - ); - } - - private List, Long>> filterList(final KeyValueIterator, Long> iterator, final String from, final String to) { - final Predicate, Long>> pred = new Predicate, Long>>() { - @Override - public boolean test(final KeyValue, Long> elem) { - if (from != null && elem.key.key().compareTo(from) < 0) { - return false; - } - if (to != null && elem.key.key().compareTo(to) > 0) { - return false; - } - return elem != null; - } - }; - - return Utils.toList(iterator, pred); - } - - private void testRange(final String name, final ReadOnlyWindowStore store, final String from, final String to, final boolean forward) { - try (final KeyValueIterator, Long> resultIterator = forward ? - store.fetch(from, to, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)) : - store.backwardFetch(from, to, ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)); - final KeyValueIterator, Long> expectedIterator = forward ? store.all() : store.backwardAll()) { - final List, Long>> result = Utils.toList(resultIterator); - final List, Long>> expected = filterList(expectedIterator, from, to); - assertThat(result, is(expected)); - } - } - -// private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { - private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { - final Supplier createStore = () -> { - if (type == StoreType.InMemory) { - return Stores.inMemoryWindowStore(name, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else if (type == StoreType.RocksDB) { - return Stores.persistentWindowStore(name, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else if (type == StoreType.Timed) { - return Stores.persistentTimestampedWindowStore(name, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } else { - return Stores.inMemoryWindowStore(name, Duration.ofMillis(RETENTION_MS), - Duration.ofMillis(WINDOW_SIZE), - false); - } - }; - - final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); - final Materialized> stateStoreConfig = Materialized - .as(stateStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Long()); - if (cachingEnabled) { - stateStoreConfig.withCachingEnabled(); - } else { - stateStoreConfig.withCachingDisabled(); - } - if (loggingEnabled) { - stateStoreConfig.withLoggingEnabled(new HashMap()); - } else { - stateStoreConfig.withLoggingDisabled(); - } - return stateStoreConfig; - } - - private static Collection buildParameters(final List... argOptions) { - List result = new LinkedList<>(); - result.add(new Object[0]); - - for (final List argOption : argOptions) { - result = times(result, argOption); - } - - return result; - } - - private static List times(final List left, final List right) { - final List result = new LinkedList<>(); - for (final Object[] args : left) { - for (final Object rightElem : right) { - final Object[] resArgs = new Object[args.length + 1]; - System.arraycopy(args, 0, resArgs, 0, args.length); - resArgs[args.length] = rightElem; - result.add(resArgs); - } - } - return result; - } -}