From c6f84bfff78cfd4697b3a69851fd4d64e3c85c2c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 5 Aug 2024 17:49:44 -0500 Subject: [PATCH 1/9] Check if supervisor is idle on startup --- .../supervisor/SeekableStreamSupervisor.java | 25 +++++++++++++++++++ .../indexer/AbstractStreamIndexingTest.java | 20 +++++++++++++++ .../supervisor/SupervisorStateManager.java | 5 ++-- 3 files changed, 47 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ec4de45cac71..7edb650a21fd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1260,6 +1260,26 @@ public void tryInit() } } + public boolean markSuperviosrIdleIfInactiveOnStartup() + { + if (!idleConfig.isEnabled()) { + return false; + } + + updatePartitionLagFromStream(); + Map comittedOffsets = getOffsetsFromMetadataStorage(); + Map latestSequencesFromStream = getLatestSequencesFromStream(); + + if (!comittedOffsets.isEmpty() && comittedOffsets.equals(latestSequencesFromStream)) { + stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); + previousSequencesFromStream.putAll(comittedOffsets); + lastActiveTimeMillis = DateTime.now().getMillis(); + return true; + } + + return false; + } + public Runnable buildDynamicAllocationTask(Callable scaleAction, ServiceEmitter emitter) { return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); @@ -1640,6 +1660,11 @@ public void runInternal() return; // if we can't connect to the stream and this is the first run, stop and wait to retry the connection } + if (!stateManager.isAtLeastOneSuccessfulRun() && markSuperviosrIdleIfInactiveOnStartup()) { + // if it is the first run and there is no lag observed when compared to the offsets from metadata storage stay idle + return; + } + stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS); discoverTasks(); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index e20c2ea20617..8cc8388ba47a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -561,6 +561,26 @@ protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean transactio "wait for no more creation of indexing tasks" ); + indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); + indexer.submitSupervisor(taskSpec); + + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.IDLE.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be idle" + ); + ITRetryUtil.retryUntil( + () -> indexer.getRunningTasks() + .stream() + .noneMatch(taskResponseObject -> taskResponseObject.getId().contains(dataSource)), + true, + 1000, + 10, + "wait for no more creation of indexing tasks" + ); + // Start generating remainning half of the data numWritten += streamGenerator.run( generatedTestConfig.getStreamName(), diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index 1ea36229c383..88049f18b087 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -155,11 +155,10 @@ public synchronized void maybeSetState(State proposedState) return; } - // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run + // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run // yet, refuse to switch and prefer the more specific states used for first run (CONNECTING_TO_STREAM, // DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.) - if ((healthySteadyState.equals(proposedState) || BasicState.IDLE.equals(proposedState)) - && !atLeastOneSuccessfulRun) { + if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) { return; } From a4c696230d80bb8d3aa263476641f440081edde4 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 5 Aug 2024 18:03:49 -0500 Subject: [PATCH 2/9] forbidden api check --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 7edb650a21fd..b81dd36a01ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1273,7 +1273,7 @@ public boolean markSuperviosrIdleIfInactiveOnStartup() if (!comittedOffsets.isEmpty() && comittedOffsets.equals(latestSequencesFromStream)) { stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); previousSequencesFromStream.putAll(comittedOffsets); - lastActiveTimeMillis = DateTime.now().getMillis(); + lastActiveTimeMillis = DateTimes.nowUtc().getMillis(); return true; } From c34ad093cc1daff80a60658d19cff502b1c927f5 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 5 Aug 2024 22:38:16 -0500 Subject: [PATCH 3/9] also skip suspended --- .../supervisor/SeekableStreamSupervisor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index b81dd36a01ba..f3f6643d30ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1262,15 +1262,19 @@ public void tryInit() public boolean markSuperviosrIdleIfInactiveOnStartup() { - if (!idleConfig.isEnabled()) { + if (!idleConfig.isEnabled() || spec.isSuspended()) { return false; } - updatePartitionLagFromStream(); Map comittedOffsets = getOffsetsFromMetadataStorage(); - Map latestSequencesFromStream = getLatestSequencesFromStream(); + if (comittedOffsets.isEmpty()) { + // since there is no previous offsets info available we can skip checking for idleness + return false; + } - if (!comittedOffsets.isEmpty() && comittedOffsets.equals(latestSequencesFromStream)) { + updatePartitionLagFromStream(); + Map latestSequencesFromStream = getLatestSequencesFromStream(); + if (comittedOffsets.equals(latestSequencesFromStream)) { stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); previousSequencesFromStream.putAll(comittedOffsets); lastActiveTimeMillis = DateTimes.nowUtc().getMillis(); From a08cfbb1ce9eeae762efda7dce4ac08e9ab46e9f Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 6 Aug 2024 00:42:54 -0500 Subject: [PATCH 4/9] coverage --- .../SeekableStreamSupervisorStateTest.java | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 00689cee040f..f2f8bc637132 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -576,6 +576,93 @@ public Duration getEmissionDuration() verifyAll(); } + @Test + public void testIdleOnStartUpAndTurnsToRunningAfterLagUpdates() + { + Map initialOffsets = ImmutableMap.of("0", "10"); + Map laterOffsets = ImmutableMap.of("0", "20"); + + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + initialOffsets + ) + ) + ).anyTimes(); + EasyMock.reset(spec); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + 1, + new Period("PT1H"), + new Period("PT1S"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + null, + new IdleConfig(true, 200L), + null + ) + { + }).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() + { + @Override + public Duration getEmissionDuration() + { + return new Period("PT1S").toStandardDuration(); + } + }).anyTimes(); + EasyMock.expect(spec.getType()).andReturn("test").anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(recordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(true) + .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + replayAll(); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.setStreamOffsets(initialOffsets); + supervisor.runInternal(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.setStreamOffsets(laterOffsets); + supervisor.runInternal(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + } + @Test public void testCreatingTasksFailRecoveryFail() { @@ -2691,6 +2778,8 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor { + Map streamOffsets; + @Override protected void scheduleReporting(ScheduledExecutorService reportingExec) { @@ -2702,6 +2791,17 @@ public LagStats computeLagStats() { return new LagStats(0, 0, 0); } + + @Override + protected Map getLatestSequencesFromStream() + { + return streamOffsets; + } + + public void setStreamOffsets(Map streamOffsets) + { + this.streamOffsets = streamOffsets; + } } private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor From 3a9491703481ea76d447583dbdd5e83871b9bcfe Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 6 Aug 2024 09:44:51 -0500 Subject: [PATCH 5/9] bug --- .../supervisor/SeekableStreamSupervisorStateTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index f2f8bc637132..82dc01175226 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2778,7 +2778,7 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor { - Map streamOffsets; + Map streamOffsets = new HashMap<>(); @Override protected void scheduleReporting(ScheduledExecutorService reportingExec) From 928fecb962dad6cf1b0e8a98b95aef059efe0238 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 7 Aug 2024 11:54:50 -0500 Subject: [PATCH 6/9] comments --- .../supervisor/SeekableStreamSupervisor.java | 49 +++++++------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f3f6643d30ba..6692490583f3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1260,30 +1260,6 @@ public void tryInit() } } - public boolean markSuperviosrIdleIfInactiveOnStartup() - { - if (!idleConfig.isEnabled() || spec.isSuspended()) { - return false; - } - - Map comittedOffsets = getOffsetsFromMetadataStorage(); - if (comittedOffsets.isEmpty()) { - // since there is no previous offsets info available we can skip checking for idleness - return false; - } - - updatePartitionLagFromStream(); - Map latestSequencesFromStream = getLatestSequencesFromStream(); - if (comittedOffsets.equals(latestSequencesFromStream)) { - stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); - previousSequencesFromStream.putAll(comittedOffsets); - lastActiveTimeMillis = DateTimes.nowUtc().getMillis(); - return true; - } - - return false; - } - public Runnable buildDynamicAllocationTask(Callable scaleAction, ServiceEmitter emitter) { return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); @@ -1664,11 +1640,6 @@ public void runInternal() return; // if we can't connect to the stream and this is the first run, stop and wait to retry the connection } - if (!stateManager.isAtLeastOneSuccessfulRun() && markSuperviosrIdleIfInactiveOnStartup()) { - // if it is the first run and there is no lag observed when compared to the offsets from metadata storage stay idle - return; - } - stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS); discoverTasks(); @@ -3645,8 +3616,26 @@ private void checkIfStreamInactiveAndTurnSupervisorIdle() return; } - Map latestSequencesFromStream = getLatestSequencesFromStream(); final long nowTime = DateTimes.nowUtc().getMillis(); + // if it is the first run and there is no lag observed when compared to the offsets from metadata storage stay idle + if (!stateManager.isAtLeastOneSuccessfulRun()) { + lastActiveTimeMillis = nowTime; + Map comittedOffsets = getOffsetsFromMetadataStorage(); + if (comittedOffsets.isEmpty()) { + // since there is no previous offsets info available we can skip checking for idleness + return; + } + + updatePartitionLagFromStream(); + Map latestSequencesFromStream = getLatestSequencesFromStream(); + if (comittedOffsets.equals(latestSequencesFromStream)) { + stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); + previousSequencesFromStream.putAll(comittedOffsets); + return; + } + } + + Map latestSequencesFromStream = getLatestSequencesFromStream(); final boolean idle; final long idleTime; if (lastActiveTimeMillis > 0 From 2034e99783243ec3e8c35f5dee13d7f1615a50e1 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 7 Aug 2024 12:12:34 -0500 Subject: [PATCH 7/9] fix --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 6692490583f3..75d82f1221ea 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3631,8 +3631,8 @@ private void checkIfStreamInactiveAndTurnSupervisorIdle() if (comittedOffsets.equals(latestSequencesFromStream)) { stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); previousSequencesFromStream.putAll(comittedOffsets); - return; } + return; } Map latestSequencesFromStream = getLatestSequencesFromStream(); From 1914555bba725abff29fa5cbbed0c116d1f04b0b Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 8 Aug 2024 10:29:53 -0500 Subject: [PATCH 8/9] comments --- .../supervisor/SeekableStreamSupervisor.java | 19 +++++-------------- .../SeekableStreamSupervisorStateTest.java | 4 ++-- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 75d82f1221ea..b458aa39570f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3619,27 +3619,18 @@ private void checkIfStreamInactiveAndTurnSupervisorIdle() final long nowTime = DateTimes.nowUtc().getMillis(); // if it is the first run and there is no lag observed when compared to the offsets from metadata storage stay idle if (!stateManager.isAtLeastOneSuccessfulRun()) { - lastActiveTimeMillis = nowTime; - Map comittedOffsets = getOffsetsFromMetadataStorage(); - if (comittedOffsets.isEmpty()) { - // since there is no previous offsets info available we can skip checking for idleness - return; - } + // Set previous sequences to the current offsets in metadata store + previousSequencesFromStream.clear(); + previousSequencesFromStream.putAll(getOffsetsFromMetadataStorage()); + // Force update partition lag since the reporting thread might not have run yet updatePartitionLagFromStream(); - Map latestSequencesFromStream = getLatestSequencesFromStream(); - if (comittedOffsets.equals(latestSequencesFromStream)) { - stateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); - previousSequencesFromStream.putAll(comittedOffsets); - } - return; } Map latestSequencesFromStream = getLatestSequencesFromStream(); final boolean idle; final long idleTime; - if (lastActiveTimeMillis > 0 - && previousSequencesFromStream.equals(latestSequencesFromStream) + if (previousSequencesFromStream.equals(latestSequencesFromStream) && computeTotalLag() == 0) { idleTime = nowTime - lastActiveTimeMillis; idle = true; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 82dc01175226..1d71a0ae81ca 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -528,8 +528,8 @@ public Duration getEmissionDuration() replayAll(); - SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); - + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + supervisor.setStreamOffsets(ImmutableMap.of("0", "10")); supervisor.start(); Assert.assertTrue(supervisor.stateManager.isHealthy()); From 23c439af7481c5fd503124c0093587678a952a01 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 8 Aug 2024 14:33:56 -0500 Subject: [PATCH 9/9] remove invalid assert --- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index b18c17491259..86275d10e318 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -408,13 +408,10 @@ public SeekableStreamIndexTaskClient build( autoscaler.start(); supervisor.runInternal(); Thread.sleep(1000); - supervisor.runInternal(); verifyAll(); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); - Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState()); - KafkaIndexTask task = captured.getValue(); Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema());