From 626d3ebe1499b9c28c2cd4d701d55089e0d9f6b0 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 27 Apr 2020 13:45:25 -0700 Subject: [PATCH 1/3] disable caching, log warning, add test --- .../apache/kafka/streams/state/Stores.java | 7 ++++--- .../TimestampedWindowStoreBuilder.java | 8 ++++++++ .../state/internals/WindowStoreBuilder.java | 8 ++++++++ .../internals/WindowStoreBuilderTest.java | 19 +++++++++++++++++++ 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 91e767b3e8f91..b884f6d6ebc85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -187,7 +187,7 @@ public String metricsScope() { * is not stored with the records, so this value is used to compute the keys that * the store returns. No effort is made to validate this parameter, so you must be * careful to set it the same as the windowed keys you're actually storing. - * @param retainDuplicates whether or not to retain duplicates. + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */ @@ -226,7 +226,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} */ @@ -251,7 +251,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} */ @@ -321,6 +321,7 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period. * @param windowSize size of the windows (cannot be negative) + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index d5459752ac326..dafc7e6efc1db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -31,9 +31,12 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TimestampedWindowStoreBuilder extends AbstractStoreBuilder, TimestampedWindowStore> { + private final Logger log = LoggerFactory.getLogger(TimestampedWindowStoreBuilder.class); private final WindowBytesStoreSupplier storeSupplier; @@ -56,6 +59,11 @@ public TimestampedWindowStore build() { store = new InMemoryTimestampedWindowStoreMarker(store); } } + if (storeSupplier.retainDuplicates()) { + log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name()); + enableCaching = false; + } + return new MeteredTimestampedWindowStore<>( maybeWrapCaching(maybeWrapLogging(store)), storeSupplier.windowSize(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index ea30d69bd03d4..a73ce7ab49c8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -21,8 +21,11 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WindowStoreBuilder extends AbstractStoreBuilder> { + private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class); private final WindowBytesStoreSupplier storeSupplier; @@ -36,6 +39,11 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, @Override public WindowStore build() { + if (storeSupplier.retainDuplicates()) { + log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name()); + enableCaching = false; + } + return new MeteredWindowStore<>( maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), storeSupplier.windowSize(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index bf29d4ae52d4c..2de34ebe7d13a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -17,10 +17,13 @@ package org.apache.kafka.streams.state.internals; +import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.easymock.EasyMockRunner; @@ -37,6 +40,7 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertFalse; @RunWith(EasyMockRunner.class) public class WindowStoreBuilderTest { @@ -113,6 +117,21 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); } + @Test + public void shouldDisableCachingWithRetainDuplicates() { + supplier = Stores.persistentWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true); + StoreBuilder> builder = new WindowStoreBuilder<>( + supplier, + Serdes.String(), + Serdes.String(), + new MockTime()) + .withCachingEnabled(); + + builder.build(); + + assertFalse(((AbstractStoreBuilder>)builder).enableCaching); + } + @SuppressWarnings("all") @Test(expected = NullPointerException.class) public void shouldThrowNullPointerIfInnerIsNull() { From c461d16b428682499fff8c92c7495bc71c437a2a Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 27 Apr 2020 14:46:24 -0700 Subject: [PATCH 2/3] checkstyle --- .../streams/state/internals/WindowStoreBuilderTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index 2de34ebe7d13a..ed43c4a9dfbdb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -117,19 +117,20 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); } + @SuppressWarnings("unchecked") @Test public void shouldDisableCachingWithRetainDuplicates() { supplier = Stores.persistentWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true); - StoreBuilder> builder = new WindowStoreBuilder<>( + final StoreBuilder> builder = new WindowStoreBuilder<>( supplier, Serdes.String(), Serdes.String(), - new MockTime()) - .withCachingEnabled(); + new MockTime() + ).withCachingEnabled(); builder.build(); - assertFalse(((AbstractStoreBuilder>)builder).enableCaching); + assertFalse(((AbstractStoreBuilder>) builder).enableCaching); } @SuppressWarnings("all") From f354f53dbcc18669cca85f65eafc7ffd41b7b33a Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 27 Apr 2020 17:09:49 -0700 Subject: [PATCH 3/3] github prop --- .../streams/state/internals/TimestampedWindowStoreBuilder.java | 2 +- .../kafka/streams/state/internals/WindowStoreBuilder.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index dafc7e6efc1db..43189395bcc62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -59,7 +59,7 @@ public TimestampedWindowStore build() { store = new InMemoryTimestampedWindowStoreMarker(store); } } - if (storeSupplier.retainDuplicates()) { + if (storeSupplier.retainDuplicates() && enableCaching) { log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name()); enableCaching = false; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index a73ce7ab49c8f..722564572093a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -39,7 +39,7 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, @Override public WindowStore build() { - if (storeSupplier.retainDuplicates()) { + if (storeSupplier.retainDuplicates() && enableCaching) { log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name()); enableCaching = false; }