diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 91e767b3e8f91..b884f6d6ebc85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -187,7 +187,7 @@ public String metricsScope() { * is not stored with the records, so this value is used to compute the keys that * the store returns. No effort is made to validate this parameter, so you must be * careful to set it the same as the windowed keys you're actually storing. - * @param retainDuplicates whether or not to retain duplicates. + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */ @@ -226,7 +226,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} */ @@ -251,7 +251,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} */ @@ -321,6 +321,7 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period. * @param windowSize size of the windows (cannot be negative) + * @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index d5459752ac326..43189395bcc62 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -31,9 +31,12 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TimestampedWindowStoreBuilder extends AbstractStoreBuilder, TimestampedWindowStore> { + private final Logger log = LoggerFactory.getLogger(TimestampedWindowStoreBuilder.class); private final WindowBytesStoreSupplier storeSupplier; @@ -56,6 +59,11 @@ public TimestampedWindowStore build() { store = new InMemoryTimestampedWindowStoreMarker(store); } } + if (storeSupplier.retainDuplicates() && enableCaching) { + log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name()); + enableCaching = false; + } + return new MeteredTimestampedWindowStore<>( maybeWrapCaching(maybeWrapLogging(store)), storeSupplier.windowSize(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index ea30d69bd03d4..722564572093a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -21,8 +21,11 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WindowStoreBuilder extends AbstractStoreBuilder> { + private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class); private final WindowBytesStoreSupplier storeSupplier; @@ -36,6 +39,11 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, @Override public WindowStore build() { + if (storeSupplier.retainDuplicates() && enableCaching) { + log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name()); + enableCaching = false; + } + return new MeteredWindowStore<>( maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), storeSupplier.windowSize(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index bf29d4ae52d4c..ed43c4a9dfbdb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -17,10 +17,13 @@ package org.apache.kafka.streams.state.internals; +import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.easymock.EasyMockRunner; @@ -37,6 +40,7 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertFalse; @RunWith(EasyMockRunner.class) public class WindowStoreBuilderTest { @@ -113,6 +117,22 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); } + @SuppressWarnings("unchecked") + @Test + public void shouldDisableCachingWithRetainDuplicates() { + supplier = Stores.persistentWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true); + final StoreBuilder> builder = new WindowStoreBuilder<>( + supplier, + Serdes.String(), + Serdes.String(), + new MockTime() + ).withCachingEnabled(); + + builder.build(); + + assertFalse(((AbstractStoreBuilder>) builder).enableCaching); + } + @SuppressWarnings("all") @Test(expected = NullPointerException.class) public void shouldThrowNullPointerIfInnerIsNull() {