diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index bfea3d436806c..e77e4ca795d92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1273,7 +1273,7 @@ public static class InternalConfig { public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static boolean getStateUpdaterEnabled(final Map configs) { - return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false); + return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } // Private API to enable processing threads (i.e. polling is decoupled from processing) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 763394611b96d..457508cd20e8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -468,10 +468,10 @@ private void shouldPollWithRightTimeout(final Properties properties) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); } else { if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) - || !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) { - assertEquals(Duration.ZERO, consumer.lastPollTimeout()); - } else { + || (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); + } else { + assertEquals(Duration.ZERO, consumer.lastPollTimeout()); } } }