From dfab7d1520cf8f24654a76ac6cee6c62e8bfe1ac Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 28 May 2024 16:34:56 +0200 Subject: [PATCH] KAFKA-10199: Enable state updater by default We have already enabled the state updater by default once. However, we ran into issues that forced us to disable it again. We think that we fixed those issues. So we want to enable the state updater again by default. --- .../main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +- .../processor/internals/StoreChangelogReaderTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index bfea3d436806c..e77e4ca795d92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1273,7 +1273,7 @@ public static class InternalConfig { public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static boolean getStateUpdaterEnabled(final Map configs) { - return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false); + return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } // Private API to enable processing threads (i.e. polling is decoupled from processing) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 763394611b96d..457508cd20e8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -468,10 +468,10 @@ private void shouldPollWithRightTimeout(final Properties properties) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); } else { if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) - || !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) { - assertEquals(Duration.ZERO, consumer.lastPollTimeout()); - } else { + || (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); + } else { + assertEquals(Duration.ZERO, consumer.lastPollTimeout()); } } }