Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public String metricsScope() {
* is not stored with the records, so this value is used to compute the keys that
* the store returns. No effort is made to validate this parameter, so you must be
* careful to set it the same as the windowed keys you're actually storing.
* @param retainDuplicates whether or not to retain duplicates.
* @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead
*/
Expand Down Expand Up @@ -226,7 +226,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
Expand All @@ -251,7 +251,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
Expand Down Expand Up @@ -321,6 +321,7 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name,
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates. turning this on will automatically disable caching
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimestampedWindowStoreBuilder<K, V>
extends AbstractStoreBuilder<K, ValueAndTimestamp<V>, TimestampedWindowStore<K, V>> {
private final Logger log = LoggerFactory.getLogger(TimestampedWindowStoreBuilder.class);

private final WindowBytesStoreSupplier storeSupplier;

Expand All @@ -56,6 +59,11 @@ public TimestampedWindowStore<K, V> build() {
store = new InMemoryTimestampedWindowStoreMarker(store);
}
}
if (storeSupplier.retainDuplicates() && enableCaching) {
log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name());
enableCaching = false;
}

return new MeteredTimestampedWindowStore<>(
maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.windowSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class);

private final WindowBytesStoreSupplier storeSupplier;

Expand All @@ -36,6 +39,11 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,

@Override
public WindowStore<K, V> build() {
if (storeSupplier.retainDuplicates() && enableCaching) {
log.warn("Disabling caching for {} since store was configured to retain duplicates", storeSupplier.name());
enableCaching = false;
}

return new MeteredWindowStore<>(
maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.windowSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.kafka.streams.state.internals;

import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.easymock.EasyMockRunner;
Expand All @@ -37,6 +40,7 @@
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertFalse;

@RunWith(EasyMockRunner.class)
public class WindowStoreBuilderTest {
Expand Down Expand Up @@ -113,6 +117,22 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}

@SuppressWarnings("unchecked")
@Test
public void shouldDisableCachingWithRetainDuplicates() {
supplier = Stores.persistentWindowStore("name", Duration.ofMillis(10L), Duration.ofMillis(10L), true);
final StoreBuilder<WindowStore<String, String>> builder = new WindowStoreBuilder<>(
supplier,
Serdes.String(),
Serdes.String(),
new MockTime()
).withCachingEnabled();

builder.build();

assertFalse(((AbstractStoreBuilder<String, String, WindowStore<String, String>>) builder).enableCaching);
}

@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
Expand Down