diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index d24eabcffb16d..76c12c0aceaa8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -32,15 +32,18 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.test.IntegrationTest; - import org.apache.kafka.test.TestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; +import java.util.Arrays; +import java.util.Collection; import java.util.Locale; import java.util.Optional; import java.util.Properties; @@ -51,7 +54,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.junit.Assert.assertFalse; @@ -60,9 +62,21 @@ /** * Test the unclean shutdown behavior around state store cleanup. */ +@RunWith(Parameterized.class) @Category(IntegrationTest.class) public class EOSUncleanShutdownIntegrationTest { + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); + } + + @Parameterized.Parameter + public String eosConfig; + @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); @@ -82,8 +96,6 @@ public static void setupConfigsAndUtils() { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); - - STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath()); } @@ -91,6 +103,7 @@ public static void setupConfigsAndUtils() { public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException { final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test"; STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); final String input = "input-topic"; cleanStateBeforeTest(CLUSTER, input); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 6dec94b373e85..a87bbd3c132e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -48,14 +48,18 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +@RunWith(Parameterized.class) @Category({IntegrationTest.class}) public class GlobalKTableEOSIntegrationTest { private static final int NUM_BROKERS = 1; @@ -70,6 +74,17 @@ public class GlobalKTableEOSIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); + } + + @Parameterized.Parameter + public String eosConfig; + private static volatile AtomicInteger testNo = new AtomicInteger(0); private final MockTime mockTime = CLUSTER.time; private final KeyValueMapper keyMapper = (key, value) -> value; @@ -97,7 +112,7 @@ public void before() throws Exception { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.>as(globalStore) .withKeySerde(Serdes.Long()) @@ -319,15 +334,9 @@ private void produceAbortedMessages() throws Exception { } private void produceInitialGlobalTableValues() throws Exception { - produceInitialGlobalTableValues(true); - } - - private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception { final Properties properties = new Properties(); - if (enableTransactions) { - properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); - properties.put(ProducerConfig.RETRIES_CONFIG, 1); - } + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); + properties.put(ProducerConfig.RETRIES_CONFIG, 1); IntegrationTestUtils.produceKeyValuesSynchronously( globalTableTopic, Arrays.asList( @@ -342,7 +351,7 @@ private void produceInitialGlobalTableValues(final boolean enableTransactions) t StringSerializer.class, properties), mockTime, - enableTransactions); + true); } private void produceGlobalTableValues() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 3ec239fab911a..693a7182ba260 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -130,9 +130,19 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDi } @Test - public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception { + public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(); + } + + @Test + public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosBetaEnabled() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA); + shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(); + } + + private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception { try { - STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG); streamsOne.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java index 955106c789f77..8aa393540952e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java @@ -40,8 +40,8 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -51,8 +51,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; @@ -77,20 +75,19 @@ public class ResetPartitionTimeIntegrationTest { private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final Serde STRING_SERDE = Serdes.String(); private static final int DEFAULT_TIMEOUT = 100; - private final boolean eosEnabled; private static long lastRecordedTimestamp = -2L; - @Parameters(name = "{index}: eosEnabled={0}") - public static Collection parameters() { - return asList( - new Object[] {false}, - new Object[] {true} - ); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.AT_LEAST_ONCE}, + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); } - public ResetPartitionTimeIntegrationTest(final boolean eosEnabled) { - this.eosEnabled = eosEnabled; - } + @Parameterized.Parameter + public String processingGuarantee; @Test public void shouldPreservePartitionTimeOnKafkaStreamRestart() { @@ -112,7 +109,7 @@ public void shouldPreservePartitionTimeOnKafkaStreamRestart() { streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT)); - streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE); + streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); KafkaStreams kafkaStreams = getStartedStreams(streamsConfig, builder, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 34cc428bb958a..0849bdd68b9dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -101,8 +101,9 @@ public class RocksDBMetricsIntegrationTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { + {StreamsConfig.AT_LEAST_ONCE}, {StreamsConfig.EXACTLY_ONCE}, - {StreamsConfig.AT_LEAST_ONCE} + {StreamsConfig.EXACTLY_ONCE_BETA} }); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index a95b4e9a3ad3e..beb9ec7fe6099 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -49,10 +49,10 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -69,8 +69,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; @@ -94,19 +92,18 @@ public class SuppressionDurabilityIntegrationTest { private static final Serde STRING_SERDE = Serdes.String(); private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final int COMMIT_INTERVAL = 100; - private final boolean eosEnabled; - @Parameters(name = "{index}: eosEnabled={0}") - public static Collection parameters() { - return asList( - new Object[] {false}, - new Object[] {true} - ); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.AT_LEAST_ONCE}, + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); } - public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) { - this.eosEnabled = eosEnabled; - } + @Parameterized.Parameter + public String processingGuaranteee; @Test public void shouldRecoverBufferAfterShutdown() { @@ -153,7 +150,7 @@ public void shouldRecoverBufferAfterShutdown() { mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index da8f434c21d52..7aad8d6d6d1ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -70,7 +70,6 @@ import java.util.Set; import java.util.UUID; -import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -361,7 +360,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.ACTIVE, - EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), + StreamThread.eosEnabled(streamsConfig), logContext, stateDirectory, new StoreChangelogReader(