Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ An example output is shown below:
},
"segmentAvailabilityConfirmed": false,
"segmentAvailabilityWaitTimeMs": 0,
"recordsProcessed": {
"recordsProcessedPerPartition": {
"partition-a": 5789
},
"errorMsg": null
Expand All @@ -101,7 +101,7 @@ For some task types, the indexing task can wait for the newly ingested segments
|---|---|
|`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.|
|`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
|`recordsProcessed`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|
|`recordsProcessedPerPartition`| Partitions that were processed by an ingestion task and includes count of records processed from each partition.|

### Live report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3234,8 +3234,8 @@ public void testCompletionReportPartitionStats() throws Exception

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessed().values().iterator().next(), (Long) 6L);
Assert.assertEquals(reportData.getRecordsProcessedPerPartition().size(), 1);
Assert.assertEquals(reportData.getRecordsProcessedPerPartition().values().iterator().next(), (Long) 6L);
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -3282,8 +3282,8 @@ public void testCompletionReportMultiplePartitionStats() throws Exception

Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(reportData.getRecordsProcessed().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessed().values().containsAll(ImmutableSet.of(6L, 2L)));
Assert.assertEquals(reportData.getRecordsProcessedPerPartition().size(), 2);
Assert.assertTrue(reportData.getRecordsProcessedPerPartition().values().containsAll(ImmutableSet.of(6L, 2L)));
}

public static class TestKafkaInputFormat implements InputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class IngestionStatsAndErrorsTaskReportData
private long segmentAvailabilityWaitTimeMs;

@JsonProperty
private Map<String, Long> recordsProcessed;
private Map<String, Long> recordsProcessedPerPartition;

public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("ingestionState") IngestionState ingestionState,
Expand All @@ -57,7 +57,7 @@ public IngestionStatsAndErrorsTaskReportData(
@JsonProperty("errorMsg") @Nullable String errorMsg,
@JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed,
@JsonProperty("segmentAvailabilityWaitTimeMs") long segmentAvailabilityWaitTimeMs,
@JsonProperty("recordsProcessed") Map<String, Long> recordsProcessed
@JsonProperty("recordsProcessedPerPartition") Map<String, Long> recordsProcessedPerPartition
)
{
this.ingestionState = ingestionState;
Expand All @@ -66,7 +66,7 @@ public IngestionStatsAndErrorsTaskReportData(
this.errorMsg = errorMsg;
this.segmentAvailabilityConfirmed = segmentAvailabilityConfirmed;
this.segmentAvailabilityWaitTimeMs = segmentAvailabilityWaitTimeMs;
this.recordsProcessed = recordsProcessed;
this.recordsProcessedPerPartition = recordsProcessedPerPartition;
}

@JsonProperty
Expand Down Expand Up @@ -108,9 +108,9 @@ public long getSegmentAvailabilityWaitTimeMs()

@JsonProperty
@Nullable
public Map<String, Long> getRecordsProcessed()
public Map<String, Long> getRecordsProcessedPerPartition()
{
return recordsProcessed;
return recordsProcessedPerPartition;
}

public static IngestionStatsAndErrorsTaskReportData getPayloadFromTaskReports(
Expand All @@ -137,7 +137,7 @@ public boolean equals(Object o)
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(isSegmentAvailabilityConfirmed(), that.isSegmentAvailabilityConfirmed()) &&
Objects.equals(getSegmentAvailabilityWaitTimeMs(), that.getSegmentAvailabilityWaitTimeMs()) &&
Objects.equals(getRecordsProcessed(), that.getRecordsProcessed());
Objects.equals(getRecordsProcessedPerPartition(), that.getRecordsProcessedPerPartition());
}

@Override
Expand All @@ -150,7 +150,7 @@ public int hashCode()
getErrorMsg(),
isSegmentAvailabilityConfirmed(),
getSegmentAvailabilityWaitTimeMs(),
getRecordsProcessed()
getRecordsProcessedPerPartition()
);
}

Expand All @@ -164,7 +164,7 @@ public String toString()
", errorMsg='" + errorMsg + '\'' +
", segmentAvailabilityConfoirmed=" + segmentAvailabilityConfirmed +
", segmentAvailabilityWaitTimeMs=" + segmentAvailabilityWaitTimeMs +
", recordsProcessed=" + recordsProcessed +
", recordsProcessedPerPartition=" + recordsProcessedPerPartition +
'}';
}
}