From eca8613f6078f93a256030150fd11e9b51c766f0 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 20 Aug 2024 23:29:39 -0700 Subject: [PATCH 1/2] Track IngestionState more accurately in realtime tasks. Previously, SeekableStreamIndexTaskRunner set ingestion state to COMPLETED when it finished reading data from Kafka. This is incorrect. After the changes in this patch, the transitions go: 1) The task stays in BUILD_SEGMENTS after it finishes reading from Kafka, while it is building its final set of segments to publish. 2) The task transitions to SEGMENT_AVAILABILITY_WAIT after publishing, while waiting for handoff. 3) The task transitions to COMPLETED immediately before exiting, when truly done. --- docs/ingestion/tasks.md | 1 + .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 3291abb01a0c..743a46cc8549 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task reached. Possible sta - `NOT_STARTED`: The task has not begun reading any rows - `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning - `BUILD_SEGMENTS`: The task is processing rows to construct segments +- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is waiting for them to become available. - `COMPLETED`: The task has finished its work. Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS phase. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index d347fd815038..df8e220145b0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -768,7 +768,6 @@ public void onFailure(Throwable t) } } } - ingestionState = IngestionState.COMPLETED; } catch (Exception e) { // (1) catch all exceptions while reading from kafka @@ -835,6 +834,7 @@ public void onFailure(Throwable t) // failed to persist sequences. It might also return null if handoff failed, but was recoverable. // See publishAndRegisterHandoff() for details. List handedOffList = Collections.emptyList(); + ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT; if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { @@ -928,6 +928,7 @@ public void onFailure(Throwable t) } } + ingestionState = IngestionState.COMPLETED; toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs)); return TaskStatus.success(task.getId()); } From 4222460d1951f4085505c468a0b05244cbf6951c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 21 Aug 2024 08:20:39 -0700 Subject: [PATCH 2/2] Add tests. --- .../indexing/kafka/KafkaIndexTaskTest.java | 34 +++++++++++++++++-- .../kinesis/KinesisIndexTaskTest.java | 9 +++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 15b77be307d8..f8c6b23aae90 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -52,6 +52,7 @@ import org.apache.druid.data.input.kafka.KafkaTopicPartition; import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.report.IngestionStatsAndErrors; @@ -1617,6 +1618,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1697,6 +1702,10 @@ public void testMultipleParseExceptionsFailure() throws Exception IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); + Assert.assertNotNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -3057,9 +3066,13 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception newDataSchemaMetadata() ); - // Verify unparseable data IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + + // Verify unparseable data ParseExceptionReport parseExceptionReport = ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); @@ -3190,9 +3203,14 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception Assert.assertEquals(ImmutableList.of(), publishedDescriptors()); Assert.assertNull(newDataSchemaMetadata()); + // Verify ingestion state and error message + final IngestionStatsAndErrors reportData = getTaskReportData(); + Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); + Assert.assertNotNull(reportData.getErrorMsg()); + // Verify there is no unparseable data in the report since we've 0 saved parse exceptions ParseExceptionReport parseExceptionReport = - ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS); + ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS); Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages()); } @@ -3231,6 +3249,12 @@ public void testCompletionReportPartitionStats() throws Exception Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); IngestionStatsAndErrors reportData = getTaskReportData(); + + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + + // Verify report metrics Assert.assertEquals(reportData.getRecordsProcessed().size(), 1); Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L); } @@ -3279,6 +3303,12 @@ public void testCompletionReportMultiplePartitionStats() throws Exception Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode()); IngestionStatsAndErrors reportData = getTaskReportData(); + + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + + // Verify report metrics Assert.assertEquals(reportData.getRecordsProcessed().size(), 2); Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L))); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 80bded2031d4..510eaa797e07 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -43,6 +43,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.kinesis.KinesisRecordEntity; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.report.IngestionStatsAndErrors; @@ -1186,6 +1187,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); + Assert.assertNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1272,6 +1277,10 @@ public void testMultipleParseExceptionsFailure() throws Exception IngestionStatsAndErrors reportData = getTaskReportData(); + // Verify ingestion state and error message + Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); + Assert.assertNotNull(reportData.getErrorMsg()); + Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of(