Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.IntegrationTest;

import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -51,7 +54,6 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.junit.Assert.assertFalse;
Expand All @@ -60,9 +62,21 @@
/**
* Test the unclean shutdown behavior around state store cleanup.
*/
@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
public class EOSUncleanShutdownIntegrationTest {

@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

@Parameterized.Parameter
public String eosConfig;

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

Expand All @@ -82,15 +96,14 @@ public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);

STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TEST_FOLDER.getRoot().getPath());
}

@Test
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-test";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);

final String input = "input-topic";
cleanStateBeforeTest(CLUSTER, input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
private static final int NUM_BROKERS = 1;
Expand All @@ -70,6 +74,17 @@ public class GlobalKTableEOSIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);

@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

@Parameterized.Parameter
public String eosConfig;

private static volatile AtomicInteger testNo = new AtomicInteger(0);
private final MockTime mockTime = CLUSTER.time;
private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
Expand Down Expand Up @@ -97,7 +112,7 @@ public void before() throws Exception {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
.withKeySerde(Serdes.Long())
Expand Down Expand Up @@ -319,15 +334,9 @@ private void produceAbortedMessages() throws Exception {
}

private void produceInitialGlobalTableValues() throws Exception {
produceInitialGlobalTableValues(true);
}

private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always set it to true -- removing unnecessary code.

final Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
}
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
IntegrationTestUtils.produceKeyValuesSynchronously(
globalTableTopic,
Arrays.asList(
Expand All @@ -342,7 +351,7 @@ private void produceInitialGlobalTableValues(final boolean enableTransactions) t
StringSerializer.class,
properties),
mockTime,
enableTransactions);
true);
}

private void produceGlobalTableValues() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,19 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDi
}

@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extract the test portion as a common function?

STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
}

@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosBetaEnabled() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA);
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
}

private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
try {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsOne = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streamsOne.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand All @@ -51,8 +51,6 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
Expand All @@ -77,20 +75,19 @@ public class ResetPartitionTimeIntegrationTest {
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final int DEFAULT_TIMEOUT = 100;
private final boolean eosEnabled;
private static long lastRecordedTimestamp = -2L;

@Parameters(name = "{index}: eosEnabled={0}")
public static Collection<Object[]> parameters() {
return asList(
new Object[] {false},
new Object[] {true}
);
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

public ResetPartitionTimeIntegrationTest(final boolean eosEnabled) {
this.eosEnabled = eosEnabled;
}
@Parameterized.Parameter
public String processingGuarantee;

@Test
public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
Expand All @@ -112,7 +109,7 @@ public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfig.put(StreamsConfig.POLL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT));
streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(DEFAULT_TIMEOUT));
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE);
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());

KafkaStreams kafkaStreams = getStartedStreams(streamsConfig, builder, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ public class RocksDBMetricsIntegrationTest {
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.AT_LEAST_ONCE}
{StreamsConfig.EXACTLY_ONCE_BETA}
});
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: we record the same metric for all three processing modes. Why do you need to parametrize this test? \cc @cadonna

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand All @@ -69,8 +69,6 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
Expand All @@ -94,19 +92,18 @@ public class SuppressionDurabilityIntegrationTest {
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private static final int COMMIT_INTERVAL = 100;
private final boolean eosEnabled;

@Parameters(name = "{index}: eosEnabled={0}")
public static Collection<Object[]> parameters() {
return asList(
new Object[] {false},
new Object[] {true}
);
@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.AT_LEAST_ONCE},
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
this.eosEnabled = eosEnabled;
}
@Parameterized.Parameter
public String processingGuaranteee;

@Test
public void shouldRecoverBufferAfterShutdown() {
Expand Down Expand Up @@ -153,7 +150,7 @@ public void shouldRecoverBufferAfterShutdown() {
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuaranteee),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import java.util.Set;
import java.util.UUID;

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -361,7 +360,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId,
Task.TaskType.ACTIVE,
EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
StreamThread.eosEnabled(streamsConfig),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup

logContext,
stateDirectory,
new StoreChangelogReader(
Expand Down