diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index b18c17491259..86275d10e318 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -408,13 +408,10 @@ public SeekableStreamIndexTaskClient build( autoscaler.start(); supervisor.runInternal(); Thread.sleep(1000); - supervisor.runInternal(); verifyAll(); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); - Assert.assertEquals(SupervisorStateManager.BasicState.IDLE, supervisor.getState()); - KafkaIndexTask task = captured.getValue(); Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ec4de45cac71..b458aa39570f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3616,12 +3616,21 @@ private void checkIfStreamInactiveAndTurnSupervisorIdle() return; } - Map latestSequencesFromStream = getLatestSequencesFromStream(); final long nowTime = DateTimes.nowUtc().getMillis(); + // if it is the first run and there is no lag observed when compared to the offsets from metadata storage stay idle + if (!stateManager.isAtLeastOneSuccessfulRun()) { + // Set previous sequences to the current offsets in metadata store + previousSequencesFromStream.clear(); + previousSequencesFromStream.putAll(getOffsetsFromMetadataStorage()); + + // Force update partition lag since the reporting thread might not have run yet + updatePartitionLagFromStream(); + } + + Map latestSequencesFromStream = getLatestSequencesFromStream(); final boolean idle; final long idleTime; - if (lastActiveTimeMillis > 0 - && previousSequencesFromStream.equals(latestSequencesFromStream) + if (previousSequencesFromStream.equals(latestSequencesFromStream) && computeTotalLag() == 0) { idleTime = nowTime - lastActiveTimeMillis; idle = true; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 00689cee040f..1d71a0ae81ca 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -528,8 +528,8 @@ public Duration getEmissionDuration() replayAll(); - SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); - + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + supervisor.setStreamOffsets(ImmutableMap.of("0", "10")); supervisor.start(); Assert.assertTrue(supervisor.stateManager.isHealthy()); @@ -576,6 +576,93 @@ public Duration getEmissionDuration() verifyAll(); } + @Test + public void testIdleOnStartUpAndTurnsToRunningAfterLagUpdates() + { + Map initialOffsets = ImmutableMap.of("0", "10"); + Map laterOffsets = ImmutableMap.of("0", "20"); + + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + initialOffsets + ) + ) + ).anyTimes(); + EasyMock.reset(spec); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + 1, + new Period("PT1H"), + new Period("PT1S"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + null, + new IdleConfig(true, 200L), + null + ) + { + }).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() + { + @Override + public Duration getEmissionDuration() + { + return new Period("PT1S").toStandardDuration(); + } + }).anyTimes(); + EasyMock.expect(spec.getType()).andReturn("test").anyTimes(); + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(recordSupplier.isOffsetAvailable(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(true) + .anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + replayAll(); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.setStreamOffsets(initialOffsets); + supervisor.runInternal(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + + supervisor.setStreamOffsets(laterOffsets); + supervisor.runInternal(); + + Assert.assertTrue(supervisor.stateManager.isHealthy()); + Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState()); + Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState()); + Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); + Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun()); + } + @Test public void testCreatingTasksFailRecoveryFail() { @@ -2691,6 +2778,8 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor { + Map streamOffsets = new HashMap<>(); + @Override protected void scheduleReporting(ScheduledExecutorService reportingExec) { @@ -2702,6 +2791,17 @@ public LagStats computeLagStats() { return new LagStats(0, 0, 0); } + + @Override + protected Map getLatestSequencesFromStream() + { + return streamOffsets; + } + + public void setStreamOffsets(Map streamOffsets) + { + this.streamOffsets = streamOffsets; + } } private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index e20c2ea20617..8cc8388ba47a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -561,6 +561,26 @@ protected void doTestIndexDataWithIdleConfigEnabled(@Nullable Boolean transactio "wait for no more creation of indexing tasks" ); + indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId()); + indexer.submitSupervisor(taskSpec); + + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.IDLE.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be idle" + ); + ITRetryUtil.retryUntil( + () -> indexer.getRunningTasks() + .stream() + .noneMatch(taskResponseObject -> taskResponseObject.getId().contains(dataSource)), + true, + 1000, + 10, + "wait for no more creation of indexing tasks" + ); + // Start generating remainning half of the data numWritten += streamGenerator.run( generatedTestConfig.getStreamName(), diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index 1ea36229c383..88049f18b087 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -155,11 +155,10 @@ public synchronized void maybeSetState(State proposedState) return; } - // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run + // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run // yet, refuse to switch and prefer the more specific states used for first run (CONNECTING_TO_STREAM, // DISCOVERING_INITIAL_TASKS, CREATING_TASKS, etc.) - if ((healthySteadyState.equals(proposedState) || BasicState.IDLE.equals(proposedState)) - && !atLeastOneSuccessfulRun) { + if (healthySteadyState.equals(proposedState) && !atLeastOneSuccessfulRun) { return; }