Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ The `ingestionState` shows what step of ingestion the task reached. Possible sta
- `NOT_STARTED`: The task has not begun reading any rows
- `DETERMINE_PARTITIONS`: The task is processing rows to determine partitioning
- `BUILD_SEGMENTS`: The task is processing rows to construct segments
- `SEGMENT_AVAILABILITY_WAIT`: The task has published its segments and is waiting for them to become available.
- `COMPLETED`: The task has finished its work.

Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as those created by the Kafka Indexing Service do not have a DETERMINE_PARTITIONS phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.data.input.kafkainput.KafkaStringHeaderFormat;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
Expand Down Expand Up @@ -1617,6 +1618,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down Expand Up @@ -1697,6 +1702,10 @@ public void testMultipleParseExceptionsFailure() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down Expand Up @@ -3057,9 +3066,13 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception
newDataSchemaMetadata()
);

// Verify unparseable data
IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

// Verify unparseable data
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);

Expand Down Expand Up @@ -3190,9 +3203,14 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception
Assert.assertEquals(ImmutableList.of(), publishedDescriptors());
Assert.assertNull(newDataSchemaMetadata());

// Verify ingestion state and error message
final IngestionStatsAndErrors reportData = getTaskReportData();
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());

// Verify there is no unparseable data in the report since we've 0 saved parse exceptions
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(getTaskReportData(), RowIngestionMeters.BUILD_SEGMENTS);
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);

Assert.assertEquals(ImmutableList.of(), parseExceptionReport.getErrorMessages());
}
Expand Down Expand Up @@ -3231,6 +3249,12 @@ public void testCompletionReportPartitionStats() throws Exception

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

// Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L);
}
Expand Down Expand Up @@ -3279,6 +3303,12 @@ public void testCompletionReportMultiplePartitionStats() throws Exception

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

// Verify report metrics
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.kinesis.KinesisRecordEntity;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.report.IngestionStatsAndErrors;
Expand Down Expand Up @@ -1186,6 +1187,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState());
Assert.assertNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down Expand Up @@ -1272,6 +1277,10 @@ public void testMultipleParseExceptionsFailure() throws Exception

IngestionStatsAndErrors reportData = getTaskReportData();

// Verify ingestion state and error message
Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState());
Assert.assertNotNull(reportData.getErrorMsg());

Map<String, Object> expectedMetrics = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,6 @@ public void onFailure(Throwable t)
}
}
}
ingestionState = IngestionState.COMPLETED;
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
Expand Down Expand Up @@ -835,6 +834,7 @@ public void onFailure(Throwable t)
// failed to persist sequences. It might also return null if handoff failed, but was recoverable.
// See publishAndRegisterHandoff() for details.
List<SegmentsAndCommitMetadata> handedOffList = Collections.emptyList();
ingestionState = IngestionState.SEGMENT_AVAILABILITY_WAIT;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get();
} else {
Expand Down Expand Up @@ -928,6 +928,7 @@ public void onFailure(Throwable t)
}
}

ingestionState = IngestionState.COMPLETED;
toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null, handoffWaitMs));
return TaskStatus.success(task.getId());
}
Expand Down