Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class StandbyTask extends AbstractTask implements Task {

processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
eosEnabled = StreamThread.eosEnabled(config);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actually bug-fix. StandbyTasks did not set the eos flag to true for eos-beta and thus did not wipe out their stores in case of failure.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -57,6 +56,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
Expand Down Expand Up @@ -547,7 +548,7 @@ public void run() {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
private void runLoop() {
void runLoop() {
subscribeConsumer();

// if the thread is still in the middle of a rebalance, we should keep polling
Expand All @@ -569,6 +570,13 @@ private void runLoop() {
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);

taskManager.commit(
taskManager.tasks()
.values()
.stream()
.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
.collect(Collectors.toSet())
);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we hit a TaskCorruptedException, we know that only a task in restore mode could be affected and those don't have anything to be committed (their commitNeeded flag should be set to false). Hence, we just commit all non-corrupted tasks. Afterwards we can safely call handleCorruption() (if we don't commit, we might abort a pending transaction for eos-beta incorrectly within handleCorruption())

\cc @abbccdda @guozhangwang

taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,11 +738,10 @@ private Stream<Task> standbyTaskStream() {
* @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
*/
int commitAll() {
return commitInternal(tasks.values());
return commit(tasks.values());
}

private int commitInternal(final Collection<Task> tasks) {

int commit(final Collection<Task> tasks) {
if (rebalanceInProgress) {
return -1;
} else {
Expand All @@ -758,7 +757,9 @@ private int commitInternal(final Collection<Task> tasks) {
}
}

commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
}
Comment on lines 760 to 762
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actually bug fix: consumedOffsetsAndMetadataPerTask could be empty, if only standby tasks (but no active tasks) are assigned to a thread.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate more on why committing an empty map will fail?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only have StandbyTasks assigned, the RecordCollector would not be initialized and thus the KafkaProducer would not initialize transactions and hence the offset commit would fail as we cannot begin a new transaction.


for (final Task task : tasks) {
if (task.commitNeeded()) {
Expand All @@ -781,7 +782,7 @@ int maybeCommitActiveTasksPerUserRequested() {
} else {
for (final Task task : activeTaskIterable()) {
if (task.commitRequested() && task.commitNeeded()) {
return commitInternal(activeTaskIterable());
return commit(activeTaskIterable());
}
}
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand All @@ -52,15 +56,29 @@
* An integration test to verify the conversion of a dirty-closed EOS
* task towards a standby task is safe across restarts of the application.
*/
@RunWith(Parameterized.class)
public class StandbyTaskEOSIntegrationTest {

@Parameterized.Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
{StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_BETA}
});
}

@Parameterized.Parameter
public String eosConfig;

private final String appId = "eos-test-app";
private final String inputTopic = "input";

@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

@Before
public void createTopics() throws Exception {
CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.createTopic(inputTopic, 1, 3);
}

Expand All @@ -77,14 +95,13 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted
new Properties()),
10L);

final String appId = "eos-test-app";
final String stateDirPath = TestUtils.tempDirectory(appId).getPath();

final CountDownLatch instanceLatch = new CountDownLatch(1);

try (
final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch);
final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch);
final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch);
final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch);
) {


Expand All @@ -102,17 +119,19 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted

streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);

streamInstanceOne.cleanUp();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to add cleanUp now? Is this missing originally?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the test was executed only once. Now it's executed twice. Because we use the same appId for both runs it seems better to add the cleanup (even if we might get two different temp directories anyway, so it might not be required)

streamInstanceTwo.cleanUp();
}
}

private KafkaStreams buildStreamWithDirtyStateDir(final String appId,
final String stateDirPath,
private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath,
final CountDownLatch recordProcessLatch) throws IOException {

final StreamsBuilder builder = new StreamsBuilder();
final TaskId taskId = new TaskId(0, 0);

final Properties props = props(appId, stateDirPath);
final Properties props = props(stateDirPath);

final StateDirectory stateDirectory = new StateDirectory(
new StreamsConfig(props), new MockTime(), true);
Expand All @@ -133,14 +152,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId,
return new KafkaStreams(builder.build(), props);
}

private Properties props(final String appId, final String stateDirPath) {
private Properties props(final String stateDirPath) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ public void shouldCloseStateManagerOnTaskCreated() {
}

@Test
public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() {
public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
stateManager.close();
EasyMock.expectLastCall();

Expand Down Expand Up @@ -442,6 +442,35 @@ public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() {
assertEquals(Task.State.CLOSED, task.state());
}

@Test
public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() {
stateManager.close();
EasyMock.expectLastCall();

EasyMock.expect(stateManager.baseDir()).andReturn(baseDir);

EasyMock.replay(stateManager);

final MetricName metricName = setupCloseTaskMetric();

config = new StreamsConfig(mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA)
)));

task = createStandbyTask();

task.closeDirty();

final double expectedCloseTaskMetric = 1.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);

EasyMock.verify(stateManager);

assertEquals(Task.State.CLOSED, task.state());
}

private StandbyTask createStandbyTask() {
return new StandbyTask(taskId, Collections.singleton(partition), topology, config, streamsMetrics, stateManager, stateDirectory);
}
Expand Down
Loading