diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index aecbe2fa67caf..7bf89ce37ef75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -72,7 +72,7 @@ public class StandbyTask extends AbstractTask implements Task { processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics); - this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); + eosEnabled = StreamThread.eosEnabled(config); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 961f01579a48c..1493cdd9bf457 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -57,6 +56,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; @@ -547,7 +548,7 @@ public void run() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - private void runLoop() { + void runLoop() { subscribeConsumer(); // if the thread is still in the middle of a rebalance, we should keep polling @@ -569,6 +570,13 @@ private void runLoop() { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); + taskManager.commit( + taskManager.tasks() + .values() + .stream() + .filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) + .collect(Collectors.toSet()) + ); taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); } catch (final TaskMigratedException e) { log.warn("Detected that the thread is being fenced. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 97205e4435faa..2f86538a5283f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -738,11 +738,10 @@ private Stream standbyTaskStream() { * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit */ int commitAll() { - return commitInternal(tasks.values()); + return commit(tasks.values()); } - private int commitInternal(final Collection tasks) { - + int commit(final Collection tasks) { if (rebalanceInProgress) { return -1; } else { @@ -758,7 +757,9 @@ private int commitInternal(final Collection tasks) { } } - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { + commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + } for (final Task task : tasks) { if (task.commitNeeded()) { @@ -781,7 +782,7 @@ int maybeCommitActiveTasksPerUserRequested() { } else { for (final Task task : activeTaskIterable()) { if (task.commitRequested() && task.commitNeeded()) { - return commitInternal(activeTaskIterable()); + return commit(activeTaskIterable()); } } return 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index b0840188bbd8a..b82d75ac68092 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -35,10 +35,14 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -52,8 +56,21 @@ * An integration test to verify the conversion of a dirty-closed EOS * task towards a standby task is safe across restarts of the application. */ +@RunWith(Parameterized.class) public class StandbyTaskEOSIntegrationTest { + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new String[][] { + {StreamsConfig.EXACTLY_ONCE}, + {StreamsConfig.EXACTLY_ONCE_BETA} + }); + } + + @Parameterized.Parameter + public String eosConfig; + + private final String appId = "eos-test-app"; private final String inputTopic = "input"; @ClassRule @@ -61,6 +78,7 @@ public class StandbyTaskEOSIntegrationTest { @Before public void createTopics() throws Exception { + CLUSTER.deleteTopicsAndWait(inputTopic, appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"); CLUSTER.createTopic(inputTopic, 1, 3); } @@ -77,14 +95,13 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted new Properties()), 10L); - final String appId = "eos-test-app"; final String stateDirPath = TestUtils.tempDirectory(appId).getPath(); final CountDownLatch instanceLatch = new CountDownLatch(1); try ( - final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch); - final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch); + final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-1/", instanceLatch); + final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(stateDirPath + "/" + appId + "-2/", instanceLatch); ) { @@ -102,17 +119,19 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted streamInstanceOne.close(Duration.ZERO); streamInstanceTwo.close(Duration.ZERO); + + streamInstanceOne.cleanUp(); + streamInstanceTwo.cleanUp(); } } - private KafkaStreams buildStreamWithDirtyStateDir(final String appId, - final String stateDirPath, + private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, final CountDownLatch recordProcessLatch) throws IOException { final StreamsBuilder builder = new StreamsBuilder(); final TaskId taskId = new TaskId(0, 0); - final Properties props = props(appId, stateDirPath); + final Properties props = props(stateDirPath); final StateDirectory stateDirectory = new StateDirectory( new StreamsConfig(props), new MockTime(), true); @@ -133,14 +152,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId, return new KafkaStreams(builder.build(), props); } - private Properties props(final String appId, final String stateDirPath) { + private Properties props(final String stateDirPath) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDirPath); streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); - streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 34a1f5fcb8799..c993d1da7a266 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -414,7 +414,7 @@ public void shouldCloseStateManagerOnTaskCreated() { } @Test - public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() { + public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() { stateManager.close(); EasyMock.expectLastCall(); @@ -442,6 +442,35 @@ public void shouldDeleteStateDirOnTaskCreatedAndEOSUncleanClose() { assertEquals(Task.State.CLOSED, task.state()); } + @Test + public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() { + stateManager.close(); + EasyMock.expectLastCall(); + + EasyMock.expect(stateManager.baseDir()).andReturn(baseDir); + + EasyMock.replay(stateManager); + + final MetricName metricName = setupCloseTaskMetric(); + + config = new StreamsConfig(mkProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA) + ))); + + task = createStandbyTask(); + + task.closeDirty(); + + final double expectedCloseTaskMetric = 1.0; + verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); + + EasyMock.verify(stateManager); + + assertEquals(Task.State.CLOSED, task.state()); + } + private StandbyTask createStandbyTask() { return new StandbyTask(taskId, Collections.singleton(partition), topology, config, streamsMetrics, stateManager, stateDirectory); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d0488f62d5b3d..a539ec8441b51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -47,6 +46,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; @@ -93,16 +93,21 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.verify; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.startsWith; @@ -465,7 +470,7 @@ public void shouldNotCommitBeforeTheCommitInterval() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -509,7 +514,7 @@ public void shouldEnforceRebalanceAfterNextScheduledProbingRebalanceTime() throw "Thread never started."); TestUtils.retryOnExceptionWithTimeout( - () -> EasyMock.verify(mockConsumer) + () -> verify(mockConsumer) ); thread.shutdown(); @@ -691,7 +696,7 @@ public void shouldNotCauseExceptionIfNothingCommitted() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -729,7 +734,7 @@ public void shouldCommitAfterTheCommitInterval() { thread.setNow(mockTime.milliseconds()); thread.maybeCommit(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -920,7 +925,7 @@ public void shouldShutdownTaskManagerOnClose() { } }); thread.run(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -929,7 +934,7 @@ public void shouldNotReturnDataAfterTaskMigrated() { internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class); - EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); + expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); @@ -977,7 +982,7 @@ public void restore() { final IllegalStateException thrown = assertThrows( IllegalStateException.class, thread::run); - EasyMock.verify(taskManager); + verify(taskManager); // The Mock consumer shall throw as the assignment has been wiped out, but records are assigned. assertEquals("No current assignment for partition topic1-1", thrown.getMessage()); @@ -1011,7 +1016,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { new AtomicLong(Long.MAX_VALUE) ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); thread.shutdown(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -1043,7 +1048,7 @@ public void shouldOnlyShutdownOnce() { thread.shutdown(); // Execute the run method. Verification of the mock will check that shutdown was only done once thread.run(); - EasyMock.verify(taskManager); + verify(taskManager); } @Test @@ -1905,6 +1910,60 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { assertThrows(TaskMigratedException.class, thread::runOnce); } + @Test + public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + final Consumer consumer = mock(Consumer.class); + final Task task1 = mock(Task.class); + final Task task2 = mock(Task.class); + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 2); + + final Map> corruptedTasksWithChangelogs = mkMap( + mkEntry(taskId1, emptySet()) + ); + + expect(task1.id()).andReturn(taskId1).anyTimes(); + expect(task2.id()).andReturn(taskId2).anyTimes(); + + expect(taskManager.tasks()).andReturn(mkMap( + mkEntry(taskId1, task1), + mkEntry(taskId2, task2) + )).anyTimes(); + expect(taskManager.commit(singleton(task2))).andReturn(0); + + EasyMock.replay(task1, task2, taskManager); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + null, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger(), + new AtomicLong(Long.MAX_VALUE) + ) { + @Override + void runOnce() { + setState(State.PENDING_SHUTDOWN); + throw new TaskCorruptedException(corruptedTasksWithChangelogs); + } + }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + thread.setState(StreamThread.State.STARTING); + thread.runLoop(); + + verify(taskManager); + } + @Test public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() { shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24); @@ -2045,7 +2104,7 @@ public void shouldTransmitTaskManagerMetrics() { new MockTime()); final Map dummyProducerMetrics = singletonMap(testMetricName, testMetric); - EasyMock.expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics); + expect(taskManager.producerMetrics()).andReturn(dummyProducerMetrics); EasyMock.replay(taskManager, consumer); final StreamsMetricsImpl streamsMetrics = @@ -2118,7 +2177,7 @@ private TaskManager mockTaskManagerCommit(final Consumer consume final int numberOfCommits, final int commits) { final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits); + expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits); EasyMock.replay(taskManager, consumer); return taskManager; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index ab185700abd7a..998ff7a89c2a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -127,6 +127,14 @@ public class TaskManagerTest { private final TopicPartition t1p3 = new TopicPartition(topic1, 3); private final Set taskId03Partitions = mkSet(t1p3); + private final TaskId taskId04 = new TaskId(0, 4); + private final TopicPartition t1p4 = new TopicPartition(topic1, 4); + private final Set taskId04Partitions = mkSet(t1p4); + + private final TaskId taskId05 = new TaskId(0, 5); + private final TopicPartition t1p5 = new TopicPartition(topic1, 5); + private final Set taskId05Partitions = mkSet(t1p5); + private final TaskId taskId10 = new TaskId(1, 0); private final TopicPartition t2p0 = new TopicPartition(topic2, 0); private final Set taskId10Partitions = mkSet(t2p0); @@ -1528,6 +1536,77 @@ public void shouldCommitActiveAndStandbyTasks() { assertThat(task01.commitNeeded, is(false)); } + @Test + public void shouldCommitProvidedTasksIfNeeded() { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true); + final StateMachineTask task03 = new StateMachineTask(taskId03, taskId03Partitions, false); + final StateMachineTask task04 = new StateMachineTask(taskId04, taskId04Partitions, false); + final StateMachineTask task05 = new StateMachineTask(taskId05, taskId05Partitions, false); + + final Map> assignmentActive = mkMap( + mkEntry(taskId00, taskId00Partitions), + mkEntry(taskId01, taskId01Partitions), + mkEntry(taskId02, taskId02Partitions) + ); + final Map> assignmentStandby = mkMap( + mkEntry(taskId03, taskId03Partitions), + mkEntry(taskId04, taskId04Partitions), + mkEntry(taskId05, taskId05Partitions) + ); + + expectRestoreToBeCompleted(consumer, changeLogReader); + expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) + .andReturn(Arrays.asList(task00, task01, task02)).anyTimes(); + expect(standbyTaskCreator.createTasks(eq(assignmentStandby))) + .andReturn(Arrays.asList(task03, task04, task05)).anyTimes(); + expectLastCall(); + + replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + + taskManager.handleAssignment(assignmentActive, assignmentStandby); + assertThat(taskManager.tryToCompleteRestoration(), is(true)); + + assertThat(task00.state(), is(Task.State.RUNNING)); + assertThat(task01.state(), is(Task.State.RUNNING)); + + task00.setCommitNeeded(); + task01.setCommitNeeded(); + task03.setCommitNeeded(); + task04.setCommitNeeded(); + + assertThat(taskManager.commit(Arrays.asList(task00, task02, task03, task05)), equalTo(2)); + assertThat(task00.commitNeeded, is(false)); + assertThat(task01.commitNeeded, is(true)); + assertThat(task02.commitNeeded, is(false)); + assertThat(task03.commitNeeded, is(false)); + assertThat(task04.commitNeeded, is(true)); + assertThat(task05.commitNeeded, is(false)); + } + + @Test + public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() { + final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, false); + + expectRestoreToBeCompleted(consumer, changeLogReader); + expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))) + .andReturn(singletonList(task00)).anyTimes(); + expectLastCall(); + + replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader); + + taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); + assertThat(taskManager.tryToCompleteRestoration(), is(true)); + + assertThat(task00.state(), is(Task.State.RUNNING)); + + task00.setCommitNeeded(); + + assertThat(taskManager.commitAll(), equalTo(1)); + assertThat(task00.commitNeeded, is(false)); + } + @Test public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws IOException { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); @@ -2271,10 +2350,12 @@ private static void expectConsumerAssignmentPaused(final Consumer offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new CommitFailedException()); replay(consumer); @@ -2291,10 +2372,12 @@ public void shouldThrowTaskMigratedExceptionOnCommitFailed() { @Test public void shouldThrowStreamsExceptionOnCommitTimeout() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new TimeoutException()); replay(consumer); @@ -2311,10 +2394,12 @@ public void shouldThrowStreamsExceptionOnCommitTimeout() { @Test public void shouldStreamsExceptionOnCommitError() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new KafkaException()); replay(consumer); @@ -2331,10 +2416,12 @@ public void shouldStreamsExceptionOnCommitError() { @Test public void shouldFailOnCommitFatal() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); + task01.setCommittableOffsetsAndMetadata(offsets); task01.setCommitNeeded(); taskManager.tasks().put(taskId01, task01); - consumer.commitSync(Collections.emptyMap()); + consumer.commitSync(offsets); expectLastCall().andThrow(new RuntimeException("KABOOM")); replay(consumer); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index 6e7afa8a6c2d9..092ad20f2a745 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -16,21 +16,17 @@ */ package org.apache.kafka.streams.tests; -import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.state.KeyValueStore; +import java.time.Duration; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -75,26 +71,20 @@ public void start() { uncaughtException = false; streams = createKafkaStreams(properties); - streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - System.out.println(System.currentTimeMillis()); - System.out.println("EOS-TEST-CLIENT-EXCEPTION"); - e.printStackTrace(); - System.out.flush(); - uncaughtException = true; - } + streams.setUncaughtExceptionHandler((t, e) -> { + System.out.println(System.currentTimeMillis()); + System.out.println("EOS-TEST-CLIENT-EXCEPTION"); + e.printStackTrace(); + System.out.flush(); + uncaughtException = true; }); - streams.setStateListener(new KafkaStreams.StateListener() { - @Override - public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { - // don't remove this -- it's required test output - System.out.println(System.currentTimeMillis()); - System.out.println("StateChange: " + oldState + " -> " + newState); - System.out.flush(); - if (newState == KafkaStreams.State.NOT_RUNNING) { - notRunningCallbackReceived.set(true); - } + streams.setStateListener((newState, oldState) -> { + // don't remove this -- it's required test output + System.out.println(System.currentTimeMillis()); + System.out.println("StateChange: " + oldState + " -> " + newState); + System.out.flush(); + if (newState == KafkaStreams.State.NOT_RUNNING) { + notRunningCallbackReceived.set(true); } }); streams.start(); @@ -112,8 +102,8 @@ private KafkaStreams createKafkaStreams(final Properties props) { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); - props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); // increase commit interval to make sure a client is killed having an open transaction props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); @@ -127,41 +117,17 @@ private KafkaStreams createKafkaStreams(final Properties props) { // min groupedData .aggregate( - new Initializer() { - @Override - public Integer apply() { - return Integer.MAX_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, - final Integer value, - final Integer aggregate) { - return (value < aggregate) ? value : aggregate; - } - }, - Materialized.>with(null, intSerde)) + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, + Materialized.with(null, intSerde)) .toStream() .to("min", Produced.with(stringSerde, intSerde)); // sum groupedData.aggregate( - new Initializer() { - @Override - public Long apply() { - return 0L; - } - }, - new Aggregator() { - @Override - public Long apply(final String aggKey, - final Integer value, - final Long aggregate) { - return (long) value + aggregate; - } - }, - Materialized.>with(null, longSerde)) + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, + Materialized.with(null, longSerde)) .toStream() .to("sum", Produced.with(stringSerde, longSerde)); @@ -174,21 +140,9 @@ public Long apply(final String aggKey, // max groupedDataAfterRepartitioning .aggregate( - new Initializer() { - @Override - public Integer apply() { - return Integer.MIN_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, - final Integer value, - final Integer aggregate) { - return (value > aggregate) ? value : aggregate; - } - }, - Materialized.>with(null, intSerde)) + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, + Materialized.with(null, intSerde)) .toStream() .to("max", Produced.with(stringSerde, intSerde)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 7dc006f092096..e95c3541e63ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -46,11 +45,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -59,7 +60,8 @@ public class EosTestDriver extends SmokeTestUtil { private static final int MAX_NUMBER_OF_KEYS = 20000; private static final long MAX_IDLE_TIME_MS = 600000L; - private static boolean isRunning = true; + private volatile static boolean isRunning = true; + private static CountDownLatch terminated = new CountDownLatch(1); private static int numRecordsProduced = 0; @@ -68,11 +70,22 @@ private static synchronized void updateNumRecordsProduces(final int delta) { } static void generate(final String kafka) { - Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { System.out.println("Terminating"); - System.out.flush(); isRunning = false; + + try { + if (terminated.await(5L, TimeUnit.MINUTES)) { + System.out.println("Terminated"); + } else { + System.out.println("Terminated with timeout"); + } + } catch (final InterruptedException swallow) { + swallow.printStackTrace(System.err); + System.out.println("Terminated with error"); + } + System.err.flush(); + System.out.flush(); }); final Properties producerProps = new Properties(); @@ -82,58 +95,81 @@ static void generate(final String kafka) { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - final KafkaProducer producer = new KafkaProducer<>(producerProps); - - final Random rand = new Random(System.currentTimeMillis()); - - while (isRunning) { - final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); - final int value = rand.nextInt(10000); - - final ProducerRecord record = new ProducerRecord<>("data", key, value); + final Map> offsets = new HashMap<>(); - producer.send(record, (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - updateNumRecordsProduces(-expired); - } catch (final Exception ignore) { } + try { + try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { + final Random rand = new Random(System.currentTimeMillis()); + + while (isRunning) { + final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); + final int value = rand.nextInt(10000); + + final ProducerRecord record = new ProducerRecord<>("data", key, value); + + producer.send(record, (metadata, exception) -> { + if (exception != null) { + exception.printStackTrace(System.err); + System.err.flush(); + if (exception instanceof TimeoutException) { + try { + // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time + final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); + updateNumRecordsProduces(-expired); + } catch (final Exception ignore) { + } + } + } else { + offsets.getOrDefault(metadata.partition(), new LinkedList<>()).add(metadata.offset()); + } + }); + + updateNumRecordsProduces(1); + if (numRecordsProduced % 1000 == 0) { + System.out.println(numRecordsProduced + " records produced"); + System.out.flush(); } + Utils.sleep(rand.nextInt(10)); } - }); + } + System.out.println("Producer closed: " + numRecordsProduced + " records produced"); + System.out.flush(); - updateNumRecordsProduces(1); - if (numRecordsProduced % 1000 == 0) { - System.out.println(numRecordsProduced + " records produced"); - System.out.flush(); + // verify offsets + for (final Map.Entry> offsetsOfPartition : offsets.entrySet()) { + offsetsOfPartition.getValue().sort(Long::compareTo); + for (int i = 0; i < offsetsOfPartition.getValue().size() - 1; ++i) { + if (offsetsOfPartition.getValue().get(i) != i) { + System.err.println("Offset for partition " + offsetsOfPartition.getKey() + " is not " + i + " as expected but " + offsetsOfPartition.getValue().get(i)); + System.err.flush(); + } + } + System.out.println("Max offset of partition " + offsetsOfPartition.getKey() + " is " + offsetsOfPartition.getValue().get(offsetsOfPartition.getValue().size() - 1)); } - Utils.sleep(rand.nextInt(10)); - } - producer.close(); - System.out.println("Producer closed: " + numRecordsProduced + " records produced"); - final Properties props = new Properties(); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + final Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, "data"); - System.out.println("Partitions: " + partitions); - consumer.assign(partitions); - consumer.seekToEnd(partitions); + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + final List partitions = getAllPartitions(consumer, "data"); + System.out.println("Partitions: " + partitions); + System.out.flush(); + consumer.assign(partitions); + consumer.seekToEnd(partitions); - for (final TopicPartition tp : partitions) { - System.out.println("End-offset for " + tp + " is " + consumer.position(tp)); + for (final TopicPartition tp : partitions) { + System.out.println("End-offset for " + tp + " is " + consumer.position(tp)); + System.out.flush(); + } } + System.out.flush(); + } finally { + terminated.countDown(); } - System.out.flush(); } public static void verify(final String kafka, final boolean withRepartitioning) { @@ -144,6 +180,14 @@ public static void verify(final String kafka, final boolean withRepartitioning) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + verifyAllTransactionFinished(consumer, kafka, withRepartitioning); + } catch (final Exception e) { + e.printStackTrace(System.err); + System.out.println("FAILED"); + return; + } + final Map committedOffsets; try (final Admin adminClient = Admin.create(props)) { ensureStreamsApplicationDown(adminClient); @@ -200,18 +244,6 @@ public static void verify(final String kafka, final boolean withRepartitioning) verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt")); } - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, allOutputTopics); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - - verifyAllTransactionFinished(consumer, kafka, withRepartitioning); - } catch (final Exception e) { - e.printStackTrace(System.err); - System.out.println("FAILED"); - return; - } - // do not modify: required test output System.out.println("ALL-RECORDS-DELIVERED"); System.out.flush(); @@ -226,11 +258,11 @@ private static void ensureStreamsApplicationDown(final Admin adminClient) { if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { throw new RuntimeException( - "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds. " + + "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000L) + " seconds. " + "Group: " + description ); } - sleep(1000); + sleep(1000L); } while (!description.members().isEmpty()); } @@ -242,7 +274,7 @@ private static Map getCommittedOffsets(final Admin adminCl try { final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID); topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + } catch (final Exception e) { e.printStackTrace(); throw new RuntimeException(e); } @@ -263,7 +295,7 @@ private static Map readEndOffsets, final boolean withRepartitioning, final boolean isInputTopic) { - System.err.println("read end offset: " + readEndOffsets); + System.out.println("read end offset: " + readEndOffsets); final Map>>> recordPerTopicPerPartition = new HashMap<>(); final Map maxReceivedOffsetPerPartition = new HashMap<>(); final Map maxConsumerPositionPerPartition = new HashMap<>(); @@ -271,7 +303,7 @@ private static Map receivedRecords = consumer.poll(Duration.ofMillis(100)); + final ConsumerRecords receivedRecords = consumer.poll(Duration.ofSeconds(1L)); for (final ConsumerRecord record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; @@ -300,7 +332,7 @@ private static Map producer = new KafkaProducer<>(producerProps)) { - for (final TopicPartition tp : partitions) { - final ProducerRecord record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value"); + final Map topicEndOffsets; - producer.send(record, (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - Exit.exit(1); - } - }); - } + try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { + topicEndOffsets = consumerUncommitted.endOffsets(partitions); } - final StringDeserializer stringDeserializer = new StringDeserializer(); + final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + while (!topicEndOffsets.isEmpty() && System.currentTimeMillis() < maxWaitTime) { + consumer.seekToEnd(partitions); - long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - if (records.isEmpty()) { - System.out.println("No data received."); - for (final TopicPartition tp : partitions) { - System.out.println(tp + " at position " + consumer.position(tp)); - } - } - for (final ConsumerRecord record : records) { - maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - final String topic = record.topic(); - final TopicPartition tp = new TopicPartition(topic, record.partition()); - - try { - final String key = stringDeserializer.deserialize(topic, record.key()); - final String value = stringDeserializer.deserialize(topic, record.value()); - - if (!("key".equals(key) && "value".equals(value) && partitions.remove(tp))) { - throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " + - "Expected record <'key','value'> from one of " + partitions + " but got" - + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]"); - } else { - System.out.println("Verifying " + tp + " successful."); - } - } catch (final SerializationException e) { - throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " + - "Expected record <'key','value'> from one of " + partitions + " but got " + record, e); + final Iterator iterator = partitions.iterator(); + while (iterator.hasNext()) { + final TopicPartition topicPartition = iterator.next(); + final long position = consumer.position(topicPartition); + + if (position == topicEndOffsets.get(topicPartition)) { + iterator.remove(); + topicEndOffsets.remove(topicPartition); + System.out.println("Removing " + topicPartition + " at position " + position); + } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) { + throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); + } else { + System.out.println("Retry " + topicPartition + " at position " + position); } - } + sleep(1000L); } - if (!partitions.isEmpty()) { - throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000) + " sec."); + + if (!topicEndOffsets.isEmpty()) { + throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index ab9a14ea274a0..441448c98c69c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import java.io.File; import java.time.Instant; public class SmokeTestUtil { @@ -45,12 +44,17 @@ static ProcessorSupplier printProcessorSupplier(final String top public Processor get() { return new AbstractProcessor() { private int numRecordsProcessed = 0; + private long smallestOffset = Long.MAX_VALUE; + private long largestOffset = Long.MIN_VALUE; @Override public void init(final ProcessorContext context) { super.init(context); System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); numRecordsProcessed = 0; + smallestOffset = Long.MAX_VALUE; + largestOffset = Long.MIN_VALUE; } @Override @@ -60,6 +64,27 @@ public void process(final Object key, final Object value) { System.out.printf("%s: %s%n", name, Instant.now()); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } + + if (smallestOffset > context().offset()) { + smallestOffset = context().offset(); + } + if (largestOffset < context().offset()) { + largestOffset = context().offset(); + } + } + + @Override + public void close() { + System.out.printf("Close processor for task %s", context().taskId()); + System.out.println("processed " + numRecordsProcessed + " records"); + final long processed; + if (largestOffset >= smallestOffset) { + processed = 1L + largestOffset - smallestOffset; + } else { + processed = 0L; + } + System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed); + System.out.flush(); } }; } @@ -76,39 +101,19 @@ public K apply(final Windowed winKey, final V value) { public static class Agg { KeyValueMapper> selector() { - return new KeyValueMapper>() { - @Override - public KeyValue apply(final String key, final Long value) { - return new KeyValue<>(value == null ? null : Long.toString(value), 1L); - } - }; + return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L); } public Initializer init() { - return new Initializer() { - @Override - public Long apply() { - return 0L; - } - }; + return () -> 0L; } Aggregator adder() { - return new Aggregator() { - @Override - public Long apply(final String aggKey, final Long value, final Long aggregate) { - return aggregate + value; - } - }; + return (aggKey, value, aggregate) -> aggregate + value; } Aggregator remover() { - return new Aggregator() { - @Override - public Long apply(final String aggKey, final Long value, final Long aggregate) { - return aggregate - value; - } - }; + return (aggKey, value, aggregate) -> aggregate - value; } } @@ -120,14 +125,6 @@ public Long apply(final String aggKey, final Long value, final Long aggregate) { static Serde doubleSerde = Serdes.Double(); - static File createDir(final File parent, final String child) { - final File dir = new File(parent, child); - - dir.mkdir(); - - return dir; - } - public static void sleep(final long duration) { try { Thread.sleep(duration); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java index 47a78bdbb9520..52af996edaa2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java @@ -39,12 +39,22 @@ public static void main(final String[] args) throws IOException { final Properties streamsProperties = Utils.loadProps(propFileName); final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); if (kafka == null) { System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); System.exit(1); } + if ("process".equals(command) || "process-complex".equals(command)) { + if (!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) && + !StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) { + + System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " + StreamsConfig.EXACTLY_ONCE_BETA); + System.exit(1); + } + } + System.out.println("StreamsTest instance started"); System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 59f24a04ff6e9..e8788829e68bd 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -324,11 +324,20 @@ class StreamsEosTestBaseService(StreamsTestBaseService): clean_node_enabled = True - def __init__(self, test_context, kafka, command): + def __init__(self, test_context, kafka, processing_guarantee, command): super(StreamsEosTestBaseService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsEosTest", command) + self.PROCESSING_GUARANTEE = processing_guarantee + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE} + + cfg = KafkaConfig(**properties) + return cfg.render() def clean_node(self, node): if self.clean_node_enabled: @@ -366,25 +375,25 @@ def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3): class StreamsEosTestDriverService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run") + super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "not-required", "run") class StreamsEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process") + def __init__(self, test_context, kafka, processing_guarantee): + super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process") class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex") + def __init__(self, test_context, kafka, processing_guarantee): + super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process-complex") class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify") + super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify") class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex") + super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify-complex") class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index c3d7f51af806a..bf07cb4106246 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.mark import parametrize from ducktape.mark.resource import cluster from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \ @@ -37,17 +38,21 @@ def __init__(self, test_context): self.test_context = test_context @cluster(num_nodes=9) - def test_rebalance_simple(self): - self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_rebalance_simple(self, processing_guarantee): + self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - def test_rebalance_complex(self): - self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_rebalance_complex(self, processing_guarantee): + self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) def run_rebalance(self, processor1, processor2, processor3, verifier): @@ -77,17 +82,21 @@ def run_rebalance(self, processor1, processor2, processor3, verifier): verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) @cluster(num_nodes=9) - def test_failure_and_recovery(self): - self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_failure_and_recovery(self, processing_guarantee): + self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - def test_failure_and_recovery_complex(self): - self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + @parametrize(processing_guarantee="exactly_once") + @parametrize(processing_guarantee="exactly_once_beta") + def test_failure_and_recovery_complex(self, processing_guarantee): + self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):