diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index aa1c25a693ad..e2c957b9cee4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -270,7 +270,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/events/processed`|Number of events processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.| |`ingest/events/processedWithError`|Number of events processed with some partial errors per emission period. Events processed with partial errors are counted towards both this metric and `ingest/events/processed`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| |`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| -|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| +|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`. The `reason` dimension indicates why the event was thrown away.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`, `reason`|0| |`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| |`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.| |`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.| diff --git a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json index 6b8d9a9887d7..bc0c451870f7 100644 --- a/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/dropwizard-emitter/src/main/resources/defaultMetricDimensions.json @@ -126,7 +126,8 @@ }, "ingest/events/thrownAway": { "dimensions": [ - "dataSource" + "dataSource", + "reason" ], "type": "counter" }, diff --git a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json index 311a06e6e4ca..69100697a62a 100644 --- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json @@ -54,7 +54,8 @@ "query/cache/total/timeouts": [], "query/cache/total/errors": [], "ingest/events/thrownAway": [ - "dataSource" + "dataSource", + "reason" ], "ingest/events/unparseable": [ "dataSource" diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json index 82ced1e9abb8..3702039c283e 100644 --- a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json @@ -104,7 +104,7 @@ "ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events successfully processed per emission period." }, "ingest/events/processedWithError" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events processed with some partial errors per emission period." }, "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are unparseable." }, - "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because they are outside the windowPeriod."}, + "ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], "type" : "count", "help": "Number of events rejected because they are null, filtered by transformSpec, or outside the message rejection periods. The `reason` dimension indicates why the event was thrown away."}, "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of events rejected because the events are duplicated."}, "ingest/input/bytes" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of bytes read from input sources, after decompression but prior to parsing." }, "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of Druid rows persisted."}, diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 0e3862f3da56..e6b7f9f594a9 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -38,7 +38,7 @@ "query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" }, "query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" }, - "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" }, + "ingest/events/thrownAway" : { "dimensions" : ["dataSource", "reason"], "type" : "count" }, "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" }, "ingest/events/processed" : { "dimensions" : ["dataSource"], "type" : "count" }, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 96dd04d42cfe..cb359896ee2f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -113,6 +113,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.incremental.RowMeters; @@ -717,7 +718,7 @@ public void testIncrementalHandOff() throws Exception ); long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13, 15); - verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8)); + verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(8)); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -851,7 +852,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception ); long totalBytes = getTotalSizeOfRecords(0, 10) + getTotalSizeOfRecords(13, 15); - verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAway(1).totalProcessed(8)); + verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes).unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(8)); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -1165,7 +1166,7 @@ public void testRunWithMinimumMessageTime() throws Exception // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(2).totalProcessed(3)); + verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 2).totalProcessed(3)); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -1214,7 +1215,7 @@ public void testRunWithMaximumMessageTime() throws Exception // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(2).totalProcessed(3)); + verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 2).totalProcessed(3)); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -1272,7 +1273,7 @@ public void testRunWithTransformSpec() throws Exception // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(4).totalProcessed(1)); + verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 4).totalProcessed(1)); // Check published metadata final List publishedDescriptors = publishedDescriptors(); @@ -1642,7 +1643,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception verifyTaskMetrics(task, RowMeters.with() .bytes(totalRecordBytes) .unparseable(3).errors(3) - .thrownAway(1).totalProcessed(4)); + .thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(4)); // Check published metadata assertEqualsExceptVersion( @@ -1660,6 +1661,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); Assert.assertNull(reportData.getErrorMsg()); + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + Map expectedThrownAwayByReason = Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1667,7 +1670,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception RowIngestionMeters.PROCESSED_BYTES, (int) totalRecordBytes, RowIngestionMeters.PROCESSED_WITH_ERROR, 3, RowIngestionMeters.UNPARSEABLE, 3, - RowIngestionMeters.THROWN_AWAY, 1 + RowIngestionMeters.THROWN_AWAY, 1, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); @@ -1745,6 +1749,8 @@ public void testMultipleParseExceptionsFailure() throws Exception Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); Assert.assertNotNull(reportData.getErrorMsg()); + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + Map expectedThrownAwayByReason = Map.of(); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1752,7 +1758,8 @@ public void testMultipleParseExceptionsFailure() throws Exception RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes, RowIngestionMeters.PROCESSED_WITH_ERROR, 0, RowIngestionMeters.UNPARSEABLE, 3, - RowIngestionMeters.THROWN_AWAY, 0 + RowIngestionMeters.THROWN_AWAY, 0, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); @@ -1887,7 +1894,7 @@ public void testRunConflicting() throws Exception verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3, 10)) - .unparseable(3).thrownAway(1).totalProcessed(3)); + .unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(3)); // Check published segments & metadata, should all be from the first task final List publishedDescriptors = publishedDescriptors(); @@ -1961,7 +1968,7 @@ public void testRunConflictingWithoutTransactions() throws Exception verifyTaskMetrics(task1, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); verifyTaskMetrics(task2, RowMeters.with().bytes(getTotalSizeOfRecords(3, 10)) - .unparseable(3).thrownAway(1).totalProcessed(3)); + .unparseable(3).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(3)); // Check published segments & metadata SegmentDescriptorAndExpectedDim1Values desc3 = sdd("2011/P1D", 1, ImmutableList.of("d", "e")); @@ -2576,7 +2583,7 @@ public void testRunTransactionModeRollback() throws Exception long totalBytes = getTotalSizeOfRecords(0, 2) + getTotalSizeOfRecords(5, 11); verifyTaskMetrics(task, RowMeters.with().bytes(totalBytes) - .unparseable(3).errors(1).thrownAway(1).totalProcessed(3)); + .unparseable(3).errors(1).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 1).totalProcessed(3)); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -3437,7 +3444,7 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency() // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAway(4).totalProcessed(1)); + verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 5)).thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 4).totalProcessed(1)); // Check published metadata final List publishedDescriptors = publishedDescriptors(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 98dfea4333cc..bd2a8aa27121 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -82,6 +82,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowMeters; import org.apache.druid.segment.indexing.DataSchema; @@ -801,7 +802,7 @@ public void testRunWithMinimumMessageTime() throws Exception verifyAll(); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5)) - .thrownAway(2).totalProcessed(3)); + .thrownAwayByReason(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, 2).totalProcessed(3)); // Check published metadata assertEqualsExceptVersion( @@ -864,7 +865,7 @@ public void testRunWithMaximumMessageTime() throws Exception verifyAll(); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5)) - .thrownAway(2).totalProcessed(3)); + .thrownAwayByReason(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, 2).totalProcessed(3)); // Check published metadata and segments in deep storage assertEqualsExceptVersion( @@ -923,7 +924,7 @@ public void testRunWithTransformSpec() throws Exception verifyAll(); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSize(RECORDS, 0, 5)) - .thrownAway(4).totalProcessed(1)); + .thrownAwayByReason(InputRowFilterResult.NULL_OR_EMPTY_RECORD, 4).totalProcessed(1)); // Check published metadata assertEqualsExceptVersion(ImmutableList.of(sdd("2009/P1D", 0)), publishedDescriptors()); @@ -1194,6 +1195,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception Assert.assertEquals(IngestionState.COMPLETED, reportData.getIngestionState()); Assert.assertNull(reportData.getErrorMsg()); + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + Map expectedThrownAwayByReason = Map.of(); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1201,7 +1204,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception RowIngestionMeters.PROCESSED_BYTES, 763, RowIngestionMeters.PROCESSED_WITH_ERROR, 3, RowIngestionMeters.UNPARSEABLE, 4, - RowIngestionMeters.THROWN_AWAY, 0 + RowIngestionMeters.THROWN_AWAY, 0, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); @@ -1284,6 +1288,8 @@ public void testMultipleParseExceptionsFailure() throws Exception Assert.assertEquals(IngestionState.BUILD_SEGMENTS, reportData.getIngestionState()); Assert.assertNotNull(reportData.getErrorMsg()); + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + Map expectedThrownAwayByReason = Map.of(); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1291,7 +1297,8 @@ public void testMultipleParseExceptionsFailure() throws Exception RowIngestionMeters.PROCESSED_BYTES, (int) totalBytes, RowIngestionMeters.PROCESSED_WITH_ERROR, 0, RowIngestionMeters.UNPARSEABLE, 3, - RowIngestionMeters.THROWN_AWAY, 0 + RowIngestionMeters.THROWN_AWAY, 0, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java index e9fe0683f225..49a19b237995 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java @@ -21,9 +21,11 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -37,7 +39,7 @@ public class DropwizardRowIngestionMeters implements RowIngestionMeters private final Meter processedBytes; private final Meter processedWithError; private final Meter unparseable; - private final Meter thrownAway; + private final Meter[] thrownAwayByReason = new Meter[InputRowFilterResult.numValues()]; public DropwizardRowIngestionMeters() { @@ -46,7 +48,9 @@ public DropwizardRowIngestionMeters() this.processedBytes = metricRegistry.meter(PROCESSED_BYTES); this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR); this.unparseable = metricRegistry.meter(UNPARSEABLE); - this.thrownAway = metricRegistry.meter(THROWN_AWAY); + for (InputRowFilterResult reason : InputRowFilterResult.values()) { + this.thrownAwayByReason[reason.ordinal()] = metricRegistry.meter(THROWN_AWAY + "_" + reason.name()); + } } @Override @@ -100,13 +104,30 @@ public void incrementUnparseable() @Override public long getThrownAway() { - return thrownAway.getCount(); + long totalThrownAway = 0; + for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) { + totalThrownAway += thrownAwayByReason[reason.ordinal()].getCount(); + } + return totalThrownAway; } @Override - public void incrementThrownAway() + public void incrementThrownAway(InputRowFilterResult reason) { - thrownAway.mark(); + thrownAwayByReason[reason.ordinal()].mark(); + } + + @Override + public Map getThrownAwayByReason() + { + Map result = new HashMap<>(); + for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) { + long count = thrownAwayByReason[reason.ordinal()].getCount(); + if (count > 0) { + result.put(reason.getReason(), count); + } + } + return result; } @Override @@ -116,7 +137,7 @@ public RowIngestionMetersTotals getTotals() processed.getCount(), processedBytes.getCount(), processedWithError.getCount(), - thrownAway.getCount(), + getThrownAwayByReason(), unparseable.getCount() ); } @@ -131,21 +152,21 @@ public Map getMovingAverages() oneMinute.put(PROCESSED_BYTES, processedBytes.getOneMinuteRate()); oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate()); oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate()); - oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate()); + oneMinute.put(THROWN_AWAY, Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getOneMinuteRate).sum()); Map fiveMinute = new HashMap<>(); fiveMinute.put(PROCESSED, processed.getFiveMinuteRate()); fiveMinute.put(PROCESSED_BYTES, processedBytes.getFiveMinuteRate()); fiveMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFiveMinuteRate()); fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate()); - fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate()); + fiveMinute.put(THROWN_AWAY, Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getFiveMinuteRate).sum()); Map fifteenMinute = new HashMap<>(); fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate()); fifteenMinute.put(PROCESSED_BYTES, processedBytes.getFifteenMinuteRate()); fifteenMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFifteenMinuteRate()); fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate()); - fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate()); + fifteenMinute.put(THROWN_AWAY, Arrays.stream(thrownAwayByReason).mapToDouble(Meter::getFifteenMinuteRate).sum()); movingAverages.put(ONE_MINUTE_NAME, oneMinute); movingAverages.put(FIVE_MINUTE_NAME, fiveMinute); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java index 4895776a796c..462fc2348123 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java @@ -23,10 +23,15 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.realtime.SegmentGenerationMetrics; +import java.util.HashMap; +import java.util.Map; + /** * Emits metrics from {@link SegmentGenerationMetrics} and {@link RowIngestionMeters}. */ @@ -51,7 +56,7 @@ public TaskRealtimeMetricsMonitor( this.rowIngestionMeters = rowIngestionMeters; this.builder = metricEventBuilder; previousSegmentGenerationMetrics = new SegmentGenerationMetrics(); - previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0, 0); + previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, Map.of(), 0); } @Override @@ -60,14 +65,30 @@ public boolean doMonitor(ServiceEmitter emitter) SegmentGenerationMetrics metrics = segmentGenerationMetrics.snapshot(); RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals(); - final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway(); - if (thrownAway > 0) { + // Emit per-reason metrics with the reason dimension + final Map currentThrownAwayByReason = rowIngestionMetersTotals.getThrownAwayByReason(); + final Map previousThrownAwayByReason = previousRowIngestionMetersTotals.getThrownAwayByReason(); + final Map deltaThrownAwayByReason = new HashMap<>(); + for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) { + final long currentCount = currentThrownAwayByReason.getOrDefault(reason.getReason(), 0L); + final long previousCount = previousThrownAwayByReason.getOrDefault(reason.getReason(), 0L); + final long delta = currentCount - previousCount; + if (delta > 0) { + deltaThrownAwayByReason.put(reason.getReason(), delta); + emitter.emit( + builder.setDimension(DruidMetrics.REASON, reason.getReason()) + .setMetric("ingest/events/thrownAway", delta) + ); + } + } + final long totalThrownAway = deltaThrownAwayByReason.values().stream().reduce(0L, Long::sum); + if (totalThrownAway > 0) { log.warn( - "[%,d] events thrown away. Possible causes: null events, events filtered out by transformSpec, or events outside earlyMessageRejectionPeriod / lateMessageRejectionPeriod.", - thrownAway + "[%,d] events thrown away. Breakdown: [%s]", + totalThrownAway, + deltaThrownAwayByReason ); } - emitter.emit(builder.setMetric("ingest/events/thrownAway", thrownAway)); final long unparseable = rowIngestionMetersTotals.getUnparseable() - previousRowIngestionMetersTotals.getUnparseable(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 97dc70f33578..ccdc99dcd059 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -230,7 +230,7 @@ public static FilteringCloseableInputRowIterator inputSourceReader( ); return new FilteringCloseableInputRowIterator( inputSourceReader.read(ingestionMeters), - rowFilter, + InputRowFilter.fromPredicate(rowFilter), ingestionMeters, parseExceptionHandler ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java index 30af8febaaee..074ae6d9f098 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java @@ -22,22 +22,22 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import java.io.IOException; import java.util.NoSuchElementException; -import java.util.function.Predicate; /** * An {@link InputRow} iterator used by ingestion {@link Task}s. It can filter out rows which do not satisfy the given - * {@link #filter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever + * {@link InputRowFilter} or throw {@link ParseException} while parsing them. The relevant metric should be counted whenever * it filters out rows based on the filter. ParseException handling is delegatged to {@link ParseExceptionHandler}. */ public class FilteringCloseableInputRowIterator implements CloseableIterator { private final CloseableIterator delegate; - private final Predicate filter; + private final InputRowFilter rowFilter; private final RowIngestionMeters rowIngestionMeters; private final ParseExceptionHandler parseExceptionHandler; @@ -45,13 +45,13 @@ public class FilteringCloseableInputRowIterator implements CloseableIterator delegate, - Predicate filter, + InputRowFilter rowFilter, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler ) { this.delegate = delegate; - this.filter = filter; + this.rowFilter = rowFilter; this.rowIngestionMeters = rowIngestionMeters; this.parseExceptionHandler = parseExceptionHandler; } @@ -66,11 +66,12 @@ public boolean hasNext() while (next == null && delegate.hasNext()) { // delegate.next() can throw ParseException final InputRow row = delegate.next(); - // filter.test() can throw ParseException - if (filter.test(row)) { + // rowFilter.test() can throw ParseException + final InputRowFilterResult filterResult = rowFilter.test(row); + if (!filterResult.isRejected()) { next = row; } else { - rowIngestionMeters.incrementThrownAway(); + rowIngestionMeters.incrementThrownAway(filterResult); } } break; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java new file mode 100644 index 000000000000..8f65b9bcbe96 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputRowFilter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.incremental.InputRowFilterResult; + +import java.util.function.Predicate; + +/** + * A filter for input rows during ingestion that returns a {@link InputRowFilterResult} per row. + * This is similar to {@link Predicate} but returns an {@link InputRowFilterResult} instead of a boolean. + */ +@FunctionalInterface +public interface InputRowFilter +{ + /** + * Tests whether the given row should be accepted. + * + * @return {@link InputRowFilterResult#ACCEPTED} only if the row should be accepted, otherwise another {@link InputRowFilterResult} value. + */ + InputRowFilterResult test(InputRow row); + + /** + * Creates a {@link InputRowFilter} from a {@link Predicate}. + * Callers wishing to return custom rejection reason logic should implement their own {@link InputRowFilter} directly. + */ + static InputRowFilter fromPredicate(Predicate predicate) + { + return row -> predicate.test(row) ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.CUSTOM_FILTER; + } + + /** + * Fully-permissive {@link InputRowFilter} used mainly for tests. + */ + static InputRowFilter allowAll() + { + return row -> InputRowFilterResult.ACCEPTED; + } + + /** + * Combines this filter with another filter. A row is rejected if either filter rejects it. + * The rejection reason from the first rejecting filter (this filter first) is returned. + */ + default InputRowFilter and(InputRowFilter other) + { + return row -> { + InputRowFilterResult result = this.test(row); + if (result.isRejected()) { + return result; + } + return other.test(row); + }; + } +} + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 9faa4c2013e1..74678086d590 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1602,11 +1602,14 @@ private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object build return (RowIngestionMetersTotals) buildSegmentsRowStats; } else if (buildSegmentsRowStats instanceof Map) { Map buildSegmentsRowStatsMap = (Map) buildSegmentsRowStats; + Map thrownAwayByReason = (Map) buildSegmentsRowStatsMap.get("thrownAwayByReason"); return new RowIngestionMetersTotals( ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(), ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(), ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(), ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(), + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + thrownAwayByReason != null ? CollectionUtils.mapValues(thrownAwayByReason, Integer::longValue) : null, ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue() ); } else { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 16505f761bbb..a448e01f9189 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -419,7 +420,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception inputRowSchema, task.getDataSchema().getTransformSpec(), toolbox.getIndexingTmpDir(), - row -> row != null && withinMinMaxRecordTime(row), + this::ensureRowIsNonNullAndWithinMessageTimeBounds, rowIngestionMeters, parseExceptionHandler ); @@ -2144,26 +2145,33 @@ private void refreshMinMaxMessageTime() ); } - public boolean withinMinMaxRecordTime(final InputRow row) + /** + * Returns {@link InputRowFilterResult#ACCEPTED} if the row should be accepted, + * or a rejection reason otherwise. + */ + InputRowFilterResult ensureRowIsNonNullAndWithinMessageTimeBounds(@Nullable InputRow row) { - final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp()); - final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp()); - - if (log.isDebugEnabled()) { - if (beforeMinimumMessageTime) { + if (row == null) { + return InputRowFilterResult.NULL_OR_EMPTY_RECORD; + } else if (minMessageTime.isAfter(row.getTimestamp())) { + if (log.isDebugEnabled()) { log.debug( - "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", + "CurrentTimeStamp[%s] is before minimumMessageTime[%s]", row.getTimestamp(), minMessageTime ); - } else if (afterMaximumMessageTime) { + } + return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME; + } else if (maxMessageTime.isBefore(row.getTimestamp())) { + if (log.isDebugEnabled()) { log.debug( - "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", + "CurrentTimeStamp[%s] is after maximumMessageTime[%s]", row.getTimestamp(), maxMessageTime ); } + return InputRowFilterResult.AFTER_MAX_MESSAGE_TIME; } - return !beforeMinimumMessageTime && !afterMaximumMessageTime; + return InputRowFilterResult.ACCEPTED; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java index 3ac952c16c2a..ad9886e424af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java @@ -28,9 +28,11 @@ import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator; +import org.apache.druid.indexing.common.task.InputRowFilter; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.transform.TransformSpec; @@ -42,7 +44,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.function.Predicate; /** * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader} @@ -54,7 +55,7 @@ class StreamChunkParser private final InputRowParser parser; @Nullable private final SettableByteEntityReader byteEntityReader; - private final Predicate rowFilter; + private final InputRowFilter rowFilter; private final RowIngestionMeters rowIngestionMeters; private final ParseExceptionHandler parseExceptionHandler; @@ -67,7 +68,7 @@ class StreamChunkParser InputRowSchema inputRowSchema, TransformSpec transformSpec, File indexingTmpDir, - Predicate rowFilter, + InputRowFilter rowFilter, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler ) @@ -96,7 +97,7 @@ class StreamChunkParser StreamChunkParser( @Nullable InputRowParser parser, @Nullable SettableByteEntityReader byteEntityReader, - Predicate rowFilter, + InputRowFilter rowFilter, RowIngestionMeters rowIngestionMeters, ParseExceptionHandler parseExceptionHandler ) @@ -117,7 +118,7 @@ List parse(@Nullable List streamChunk, boolean isEndOfShar if (!isEndOfShard) { // We do not count end of shard record as thrown away event since this is a record created by Druid // Note that this only applies to Kinesis - rowIngestionMeters.incrementThrownAway(); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); } return Collections.emptyList(); } else { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java index 252d265d67d8..bcfebef2a6ce 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java @@ -21,28 +21,22 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; -import org.apache.druid.java.util.emitter.core.Event; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.MonitorUtils; +import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; import org.apache.druid.segment.realtime.SegmentGenerationMetrics; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Answers; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; import java.util.HashMap; +import java.util.List; import java.util.Map; -@RunWith(MockitoJUnitRunner.class) public class TaskRealtimeMetricsMonitorTest { private static final Map DIMENSIONS = ImmutableMap.of( @@ -53,28 +47,18 @@ public class TaskRealtimeMetricsMonitorTest ); private static final Map TAGS = ImmutableMap.of("author", "Author Name", "version", 10); - private SegmentGenerationMetrics segmentGenerationMetrics; - @Mock(answer = Answers.RETURNS_MOCKS) + private SegmentGenerationMetrics segmentGenerationMetrics; private RowIngestionMeters rowIngestionMeters; - @Mock - private ServiceEmitter emitter; - private Map emittedEvents; + private StubServiceEmitter emitter; private TaskRealtimeMetricsMonitor target; @Before public void setUp() { - emittedEvents = new HashMap<>(); segmentGenerationMetrics = new SegmentGenerationMetrics(); - Mockito.doCallRealMethod().when(emitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); - Mockito - .doAnswer(invocation -> { - ServiceMetricEvent e = invocation.getArgument(0); - emittedEvents.put(e.getMetric(), e); - return null; - }) - .when(emitter).emit(ArgumentMatchers.any(Event.class)); + rowIngestionMeters = new SimpleRowIngestionMeters(); + emitter = new StubServiceEmitter(); target = new TaskRealtimeMetricsMonitor( segmentGenerationMetrics, rowIngestionMeters, @@ -86,7 +70,10 @@ public void setUp() public void testdoMonitorShouldEmitUserProvidedTags() { target.doMonitor(emitter); - for (ServiceMetricEvent sme : emittedEvents.values()) { + + List events = emitter.getMetricEvents("ingest/events/unparseable"); + Assert.assertFalse(events.isEmpty()); + for (ServiceMetricEvent sme : events) { Assert.assertEquals(TAGS, sme.getUserDims().get(DruidMetrics.TAGS)); } } @@ -94,12 +81,19 @@ public void testdoMonitorShouldEmitUserProvidedTags() @Test public void testdoMonitorWithoutTagsShouldNotEmitTags() { + ServiceMetricEvent.Builder builderWithoutTags = new ServiceMetricEvent.Builder(); + MonitorUtils.addDimensionsToBuilder(builderWithoutTags, DIMENSIONS); + target = new TaskRealtimeMetricsMonitor( segmentGenerationMetrics, rowIngestionMeters, - createMetricEventBuilder() + builderWithoutTags ); - for (ServiceMetricEvent sme : emittedEvents.values()) { + target.doMonitor(emitter); + + List events = emitter.getMetricEvents("ingest/events/unparseable"); + Assert.assertFalse(events.isEmpty()); + for (ServiceMetricEvent sme : events) { Assert.assertFalse(sme.getUserDims().containsKey(DruidMetrics.TAGS)); } } @@ -107,24 +101,123 @@ public void testdoMonitorWithoutTagsShouldNotEmitTags() @Test public void testMessageGapAggStats() { - target = new TaskRealtimeMetricsMonitor( + target.doMonitor(emitter); + Assert.assertTrue(emitter.getMetricEvents("ingest/events/minMessageGap").isEmpty()); + Assert.assertTrue(emitter.getMetricEvents("ingest/events/maxMessageGap").isEmpty()); + Assert.assertTrue(emitter.getMetricEvents("ingest/events/avgMessageGap").isEmpty()); + + emitter.flush(); + segmentGenerationMetrics.reportMessageGap(1); + target.doMonitor(emitter); + + Assert.assertFalse(emitter.getMetricEvents("ingest/events/minMessageGap").isEmpty()); + Assert.assertFalse(emitter.getMetricEvents("ingest/events/maxMessageGap").isEmpty()); + Assert.assertFalse(emitter.getMetricEvents("ingest/events/avgMessageGap").isEmpty()); + } + + @Test + public void testThrownAwayEmitsReasonDimension() + { + SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters(); + realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME); + realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME); + realMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME); + realMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + + TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor( segmentGenerationMetrics, - rowIngestionMeters, + realMeters, createMetricEventBuilder() ); - target.doMonitor(emitter); - Assert.assertFalse(emittedEvents.containsKey("ingest/events/minMessageGap")); - Assert.assertFalse(emittedEvents.containsKey("ingest/events/maxMessageGap")); - Assert.assertFalse(emittedEvents.containsKey("ingest/events/avgMessageGap")); + monitor.doMonitor(emitter); - emittedEvents.clear(); - segmentGenerationMetrics.reportMessageGap(1); - target.doMonitor(emitter); + Map thrownAwayByReason = new HashMap<>(); + for (ServiceMetricEvent event : emitter.getMetricEvents("ingest/events/thrownAway")) { + Object reason = event.getUserDims().get("reason"); + thrownAwayByReason.put(reason.toString(), event.getValue().longValue()); + } + + Assert.assertEquals(Long.valueOf(2), thrownAwayByReason.get("null")); + Assert.assertEquals(Long.valueOf(3), thrownAwayByReason.get("beforeMinimumMessageTime")); + Assert.assertEquals(Long.valueOf(1), thrownAwayByReason.get("afterMaximumMessageTime")); + Assert.assertEquals(Long.valueOf(4), thrownAwayByReason.get("filtered")); + } + + @Test + public void testThrownAwayReasonDimensionOnlyEmittedWhenNonZero() + { + SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters(); + realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + + TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor( + segmentGenerationMetrics, + realMeters, + createMetricEventBuilder() + ); + + monitor.doMonitor(emitter); + + Map thrownAwayByReason = new HashMap<>(); + for (ServiceMetricEvent event : emitter.getMetricEvents("ingest/events/thrownAway")) { + Object reason = event.getUserDims().get("reason"); + thrownAwayByReason.put(reason.toString(), event.getValue().longValue()); + } + + // Only reasons with non-zero counts should be emitted + Assert.assertEquals(2, thrownAwayByReason.size()); + Assert.assertTrue(thrownAwayByReason.containsKey("null")); + Assert.assertTrue(thrownAwayByReason.containsKey("filtered")); + Assert.assertFalse(thrownAwayByReason.containsKey("beforeMinimumMessageTime")); + Assert.assertFalse(thrownAwayByReason.containsKey("afterMaximumMessageTime")); + } + + @Test + public void testThrownAwayReasonDeltaAcrossMonitorCalls() + { + SimpleRowIngestionMeters realMeters = new SimpleRowIngestionMeters(); + + TaskRealtimeMetricsMonitor monitor = new TaskRealtimeMetricsMonitor( + segmentGenerationMetrics, + realMeters, + createMetricEventBuilder() + ); + + realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + monitor.doMonitor(emitter); + + long firstCallNullCount = 0; + for (ServiceMetricEvent event : emitter.getMetricEvents("ingest/events/thrownAway")) { + if ("null".equals(event.getUserDims().get("reason"))) { + firstCallNullCount = event.getValue().longValue(); + } + } + Assert.assertEquals(2, firstCallNullCount); + + emitter.flush(); + realMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + realMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + monitor.doMonitor(emitter); + + // Find counts from second call - should be deltas only + Map secondCallCounts = new HashMap<>(); + for (ServiceMetricEvent event : emitter.getMetricEvents("ingest/events/thrownAway")) { + Object reason = event.getUserDims().get("reason"); + secondCallCounts.put(reason.toString(), event.getValue().longValue()); + } - Assert.assertTrue(emittedEvents.containsKey("ingest/events/minMessageGap")); - Assert.assertTrue(emittedEvents.containsKey("ingest/events/maxMessageGap")); - Assert.assertTrue(emittedEvents.containsKey("ingest/events/avgMessageGap")); + // Should emit only the delta (1 more NULL, 2 new FILTERED) + Assert.assertEquals(Long.valueOf(1), secondCallCounts.get("null")); + Assert.assertEquals(Long.valueOf(2), secondCallCounts.get("filtered")); } private ServiceMetricEvent.Builder createMetricEventBuilder() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java new file mode 100644 index 000000000000..d92276700000 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMetersTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.stats; + +import org.apache.druid.segment.incremental.InputRowFilterResult; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class DropwizardRowIngestionMetersTest +{ + @Test + public void testBasicIncrements() + { + DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters(); + meters.incrementProcessed(); + meters.incrementProcessedBytes(100); + meters.incrementProcessedWithError(); + meters.incrementUnparseable(); + meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + + Assert.assertEquals(1, meters.getProcessed()); + Assert.assertEquals(100, meters.getProcessedBytes()); + Assert.assertEquals(1, meters.getProcessedWithError()); + Assert.assertEquals(1, meters.getUnparseable()); + Assert.assertEquals(1, meters.getThrownAway()); + + RowIngestionMetersTotals totals = meters.getTotals(); + Assert.assertEquals(1, totals.getProcessed()); + Assert.assertEquals(100, totals.getProcessedBytes()); + Assert.assertEquals(1, totals.getProcessedWithError()); + Assert.assertEquals(1, totals.getUnparseable()); + Assert.assertEquals(1, totals.getThrownAway()); + } + + @Test + public void testIncrementThrownAwayWithReason() + { + DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters(); + + meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + meters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + meters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME); + meters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME); + meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + + // Total thrownAway should be sum of all reasons + Assert.assertEquals(7, meters.getThrownAway()); + + // Check per-reason counts + Map byReason = meters.getThrownAwayByReason(); + Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason())); + Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason())); + Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason())); + Assert.assertEquals(Long.valueOf(3), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); + } + + @Test + public void testGetThrownAwayByReasonReturnsAllReasons() + { + DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters(); + + // With no increments, all reasons should be present with 0 counts + Map byReason = meters.getThrownAwayByReason(); + Assert.assertTrue(byReason.isEmpty()); + } + + @Test + public void testMovingAverages() + { + DropwizardRowIngestionMeters meters = new DropwizardRowIngestionMeters(); + + meters.incrementProcessed(); + meters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + + Map movingAverages = meters.getMovingAverages(); + Assert.assertNotNull(movingAverages); + Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.ONE_MINUTE_NAME)); + Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.FIVE_MINUTE_NAME)); + Assert.assertTrue(movingAverages.containsKey(DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME)); + } +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java index 2b4ffe26f31b..e037d9adf720 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIteratorTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; @@ -42,6 +43,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -78,7 +80,7 @@ public void testFilterOutRows() final Predicate filter = row -> (Integer) row.getRaw("dim1") == 10; final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( CloseableIterators.withEmptyBaggage(ROWS.iterator()), - filter, + InputRowFilter.fromPredicate(filter), rowIngestionMeters, parseExceptionHandler ); @@ -125,7 +127,7 @@ public void close() final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( parseExceptionThrowingIterator, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -163,7 +165,7 @@ public boolean test(InputRow inputRow) final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( parseExceptionThrowingIterator, - filter, + InputRowFilter.fromPredicate(filter), rowIngestionMeters, parseExceptionHandler ); @@ -214,7 +216,7 @@ public void close() final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( parseExceptionThrowingIterator, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -260,7 +262,7 @@ public void close() final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( parseExceptionThrowingIterator, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -281,7 +283,7 @@ public void testCloseDelegateIsClosed() throws IOException ); final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( delegate, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -330,7 +332,7 @@ public void close() final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( parseExceptionThrowingIterator, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -350,6 +352,103 @@ public void close() } + @Test + public void testRowFilterWithReasons() + { + // RowFilter that returns different reasons based on dim1 value + final InputRowFilter rowFilter = row -> { + int dim1 = (Integer) row.getRaw("dim1"); + if (dim1 == 10) { + return InputRowFilterResult.ACCEPTED; + } else if (dim1 == 20) { + return InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME; + } else { + return InputRowFilterResult.CUSTOM_FILTER; + } + }; + + final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( + CloseableIterators.withEmptyBaggage(ROWS.iterator()), + rowFilter, + rowIngestionMeters, + parseExceptionHandler + ); + + final List filteredRows = new ArrayList<>(); + rowIterator.forEachRemaining(filteredRows::add); + + // Only rows with dim1=10 should pass + Assert.assertEquals(4, filteredRows.size()); + for (InputRow row : filteredRows) { + Assert.assertEquals(10, row.getRaw("dim1")); + } + + // Check total thrown away + Assert.assertEquals(2, rowIngestionMeters.getThrownAway()); + + // Check per-reason counts + Map byReason = rowIngestionMeters.getThrownAwayByReason(); + Assert.assertEquals(2, byReason.size()); + Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason())); // dim1=20 + Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); // dim1=30 + } + + @Test + public void testRowFilterFromPredicate() + { + // Use the static helper to convert a Predicate to RowFilter + final Predicate predicate = row -> (Integer) row.getRaw("dim1") == 10; + final InputRowFilter rowFilter = InputRowFilter.fromPredicate(predicate); + + final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( + CloseableIterators.withEmptyBaggage(ROWS.iterator()), + rowFilter, + rowIngestionMeters, + parseExceptionHandler + ); + + final List filteredRows = new ArrayList<>(); + rowIterator.forEachRemaining(filteredRows::add); + + Assert.assertEquals(4, filteredRows.size()); + Assert.assertEquals(2, rowIngestionMeters.getThrownAway()); + + // All thrown away should have FILTERED reason when using fromPredicate + Map byReason = rowIngestionMeters.getThrownAwayByReason(); + Assert.assertEquals(1, byReason.size()); + Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); + } + + @Test + public void testRowFilterAnd() + { + // First filter: reject nulls (simulated by checking dim1) + final InputRowFilter nullFilter = row -> row == null ? InputRowFilterResult.NULL_OR_EMPTY_RECORD : InputRowFilterResult.ACCEPTED; + + // Second filter: reject if dim1 != 10 + final InputRowFilter valueFilter = row -> (Integer) row.getRaw("dim1") == 10 ? InputRowFilterResult.ACCEPTED : InputRowFilterResult.CUSTOM_FILTER; + + // Combine filters + final InputRowFilter combinedFilter = nullFilter.and(valueFilter); + + final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( + CloseableIterators.withEmptyBaggage(ROWS.iterator()), + combinedFilter, + rowIngestionMeters, + parseExceptionHandler + ); + + final List filteredRows = new ArrayList<>(); + rowIterator.forEachRemaining(filteredRows::add); + + Assert.assertEquals(4, filteredRows.size()); + Assert.assertEquals(2, rowIngestionMeters.getThrownAway()); + + // All rejected rows should have FILTERED reason (from second filter) + Map byReason = rowIngestionMeters.getThrownAwayByReason(); + Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); + } + private static InputRow newRow(DateTime timestamp, Object dim1Val, Object dim2Val) { return new MapBasedInputRow(timestamp, DIMENSIONS, ImmutableMap.of("dim1", dim1Val, "dim2", dim2Val)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 2d9991102882..9a2eeb8c1a66 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -85,6 +85,7 @@ import org.apache.druid.segment.handoff.NoopSegmentHandoffNotifierFactory; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy; @@ -1508,6 +1509,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception IngestionStatsAndErrors reportData = getTaskReportData(); + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + Map expectedThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.DETERMINE_PARTITIONS, ImmutableMap.of( @@ -1515,7 +1518,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception RowIngestionMeters.PROCESSED, 4, RowIngestionMeters.PROCESSED_BYTES, 657, RowIngestionMeters.UNPARSEABLE, 4, - RowIngestionMeters.THROWN_AWAY, 1 + RowIngestionMeters.THROWN_AWAY, 1, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason ), RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1523,7 +1527,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception RowIngestionMeters.PROCESSED, 1, RowIngestionMeters.PROCESSED_BYTES, 657, RowIngestionMeters.UNPARSEABLE, 4, - RowIngestionMeters.THROWN_AWAY, 1 + RowIngestionMeters.THROWN_AWAY, 1, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedThrownAwayByReason ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); @@ -1680,6 +1685,9 @@ public void testMultipleParseExceptionsFailure() throws Exception IngestionStatsAndErrors reportData = getTaskReportData(); + // Jackson will serde numerics ≤ 32bits as Integers, rather than Longs + Map expectedDeterminePartitionsThrownAwayByReason = Map.of(); + Map expectedBuildSegmentsThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.DETERMINE_PARTITIONS, ImmutableMap.of( @@ -1687,7 +1695,8 @@ public void testMultipleParseExceptionsFailure() throws Exception RowIngestionMeters.PROCESSED, 0, RowIngestionMeters.PROCESSED_BYTES, 0, RowIngestionMeters.UNPARSEABLE, 0, - RowIngestionMeters.THROWN_AWAY, 0 + RowIngestionMeters.THROWN_AWAY, 0, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedDeterminePartitionsThrownAwayByReason ), RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1695,7 +1704,8 @@ public void testMultipleParseExceptionsFailure() throws Exception RowIngestionMeters.PROCESSED, 1, RowIngestionMeters.PROCESSED_BYTES, 182, RowIngestionMeters.UNPARSEABLE, 3, - RowIngestionMeters.THROWN_AWAY, 1 + RowIngestionMeters.THROWN_AWAY, 1, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedBuildSegmentsThrownAwayByReason ) ); @@ -1790,6 +1800,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc IngestionStatsAndErrors reportData = getTaskReportData(); + Map expectedDeterminePartitionsThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1); + Map expectedBuildSegmentsThrownAwayByReason = Map.of(); Map expectedMetrics = ImmutableMap.of( RowIngestionMeters.DETERMINE_PARTITIONS, ImmutableMap.of( @@ -1797,7 +1809,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc RowIngestionMeters.PROCESSED, 1, RowIngestionMeters.PROCESSED_BYTES, 182, RowIngestionMeters.UNPARSEABLE, 3, - RowIngestionMeters.THROWN_AWAY, 1 + RowIngestionMeters.THROWN_AWAY, 1, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedDeterminePartitionsThrownAwayByReason ), RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( @@ -1805,7 +1818,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc RowIngestionMeters.PROCESSED, 0, RowIngestionMeters.PROCESSED_BYTES, 0, RowIngestionMeters.UNPARSEABLE, 0, - RowIngestionMeters.THROWN_AWAY, 0 + RowIngestionMeters.THROWN_AWAY, 0, + RowIngestionMeters.THROWN_AWAY_BY_REASON, expectedBuildSegmentsThrownAwayByReason ) ); @@ -2738,7 +2752,10 @@ private void verifySchemaAndAggFactory( Map aggregatorFactoryMap ) { - Assert.assertEquals(segmentWithSchemas.getSegments().size(), segmentWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size()); + Assert.assertEquals( + segmentWithSchemas.getSegments().size(), + segmentWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size() + ); Assert.assertEquals(1, segmentWithSchemas.getSegmentSchemaMapping().getSchemaFingerprintToPayloadMap().size()); Assert.assertEquals( actualRowSignature, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java new file mode 100644 index 000000000000..53616c974e7d --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/InputRowFilterTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.incremental.InputRowFilterResult; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class InputRowFilterTest +{ + private static final List DIMENSIONS = ImmutableList.of("dim1"); + + @Test + public void test_fromPredicate_whichAllowsAll() + { + InputRowFilter filter = InputRowFilter.fromPredicate(row -> true); + InputRow row = newRow(100); + + Assert.assertEquals(InputRowFilterResult.ACCEPTED, filter.test(row)); + } + + @Test + public void testFromPredicateReject() + { + InputRowFilter filter = InputRowFilter.fromPredicate(row -> false); + InputRow row = newRow(100); + + Assert.assertEquals(InputRowFilterResult.CUSTOM_FILTER, filter.test(row)); + } + + @Test + public void testAndBothAccept() + { + InputRowFilter filter1 = InputRowFilter.allowAll(); + InputRowFilter filter2 = InputRowFilter.allowAll(); + InputRowFilter combined = filter1.and(filter2); + + InputRow row = newRow(100); + Assert.assertEquals(InputRowFilterResult.ACCEPTED, combined.test(row)); + } + + @Test + public void testAndFirstRejects() + { + InputRowFilter filter1 = row -> InputRowFilterResult.NULL_OR_EMPTY_RECORD; + InputRowFilter filter2 = InputRowFilter.allowAll(); + InputRowFilter combined = filter1.and(filter2); + + InputRow row = newRow(100); + Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, combined.test(row)); + } + + @Test + public void testAndSecondRejects() + { + InputRowFilter filter1 = InputRowFilter.allowAll(); + InputRowFilter filter2 = row -> InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME; + InputRowFilter combined = filter1.and(filter2); + + InputRow row = newRow(100); + Assert.assertEquals(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, combined.test(row)); + } + + @Test + public void testAndBothRejectReturnsFirst() + { + InputRowFilter filter1 = row -> InputRowFilterResult.NULL_OR_EMPTY_RECORD; + InputRowFilter filter2 = row -> InputRowFilterResult.CUSTOM_FILTER; + InputRowFilter combined = filter1.and(filter2); + + InputRow row = newRow(100); + // Should return reason from first filter + Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, combined.test(row)); + } + + @Test + public void testChainedAnd() + { + InputRowFilter filter1 = InputRowFilter.allowAll(); + InputRowFilter filter2 = InputRowFilter.allowAll(); + InputRowFilter filter3 = row -> InputRowFilterResult.AFTER_MAX_MESSAGE_TIME; + + InputRowFilter combined = filter1.and(filter2).and(filter3); + + InputRow row = newRow(100); + Assert.assertEquals(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, combined.test(row)); + } + + private static InputRow newRow(Object dim1Val) + { + return new MapBasedInputRow( + DateTimes.of("2020-01-01"), + DIMENSIONS, + ImmutableMap.of("dim1", dim1Val) + ); + } +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index cc3cdffa7479..a0e3f3245f2c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -48,6 +48,7 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.DataSegmentsWithSchemas; import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.indexing.DataSchema; @@ -492,6 +493,7 @@ public void testRunInParallelTaskReports() Collections.emptyList() ); TaskReport.ReportMap actualReports = task.doGetLiveReports(true); + Map expectedThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1L); TaskReport.ReportMap expectedReports = buildExpectedTaskReportParallel( task.getId(), ImmutableList.of( @@ -508,7 +510,7 @@ public void testRunInParallelTaskReports() 1L ) ), - new RowIngestionMetersTotals(10, 335, 1, 1, 1) + new RowIngestionMetersTotals(10, 335, 1, expectedThrownAwayByReason, 1) ); compareTaskReports(expectedReports, actualReports); } @@ -543,7 +545,8 @@ public void testRunInSequential() final ParallelIndexSupervisorTask executedTask = (ParallelIndexSupervisorTask) taskContainer.getTask(); TaskReport.ReportMap actualReports = executedTask.doGetLiveReports(true); - final RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 335, 1, 1, 1); + Map expectedThrownAwayByReason = Map.of(InputRowFilterResult.CUSTOM_FILTER.getReason(), 1L); + final RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(10, 335, 1, expectedThrownAwayByReason, 1); List expectedUnparseableEvents = ImmutableList.of( new ParseExceptionReport( "{ts=2017unparseable}", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java index 7518bb7f172a..eb08944d46bd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -36,6 +36,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.incremental.InputRowFilterResult; import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; import org.junit.Assert; @@ -105,13 +106,13 @@ public void testWithinMinMaxTime() LockGranularity.TIME_CHUNK); Mockito.when(row.getTimestamp()).thenReturn(now); - Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertEquals(InputRowFilterResult.ACCEPTED, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row)); Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); - Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + Assert.assertEquals(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row)); Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); - Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + Assert.assertEquals(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row)); } @Test @@ -157,13 +158,58 @@ public void testWithinMinMaxTimeNotPopulated() TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, LockGranularity.TIME_CHUNK); - Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Mockito.when(row.getTimestamp()).thenReturn(now); + Assert.assertEquals(InputRowFilterResult.ACCEPTED, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row)); Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); - Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertEquals(InputRowFilterResult.ACCEPTED, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row)); Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); - Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertEquals(InputRowFilterResult.ACCEPTED, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(row)); + } + + @Test + public void testEnsureRowRejectionReasonForNullRow() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(null); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(null); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, + LockGranularity.TIME_CHUNK); + + Assert.assertEquals(InputRowFilterResult.NULL_OR_EMPTY_RECORD, runner.ensureRowIsNonNullAndWithinMessageTimeBounds(null)); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java index 649c0d57e464..92ed5785abb3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java @@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.InputRowFilter; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.RE; @@ -104,7 +105,7 @@ public void testWithParserAndNullInputformatParseProperly() throws IOException null, null, null, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -121,7 +122,7 @@ public void testWithNullParserAndInputformatParseProperly() throws IOException new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -139,7 +140,7 @@ public void testWithNullParserAndNullInputformatFailToCreateParser() null, null, null, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -169,7 +170,7 @@ public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws I new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -190,7 +191,7 @@ public void parseEmptyNotEndOfShard() throws IOException new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -213,7 +214,7 @@ public void parseEmptyEndOfShard() throws IOException new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, ColumnsFilter.all()), TransformSpec.NONE, temporaryFolder.newFolder(), - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -241,7 +242,7 @@ public void testParseMalformedDataWithAllowedParseExceptions_thenNoException() t final StreamChunkParser chunkParser = new StreamChunkParser<>( parser, mockedByteEntityReader, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, new ParseExceptionHandler( rowIngestionMeters, @@ -279,7 +280,7 @@ public void testParseMalformedDataException() throws IOException final StreamChunkParser chunkParser = new StreamChunkParser<>( parser, mockedByteEntityReader, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ); @@ -318,7 +319,7 @@ public void testParseMalformedDataWithUnlimitedAllowedParseExceptions_thenNoExce final StreamChunkParser chunkParser = new StreamChunkParser<>( parser, mockedByteEntityReader, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, new ParseExceptionHandler( rowIngestionMeters, @@ -354,7 +355,7 @@ public void testWithNullParserAndNullByteEntityReaderFailToInstantiate() () -> new StreamChunkParser<>( null, null, - row -> true, + InputRowFilter.allowAll(), rowIngestionMeters, parseExceptionHandler ) diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index 578d792df493..532553aef6ef 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -57,6 +57,7 @@ public class DruidMetrics public static final String STREAM = "stream"; public static final String PARTITION = "partition"; public static final String SUPERVISOR_ID = "supervisorId"; + public static final String REASON = "reason"; public static final String TAGS = "tags"; diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java b/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java new file mode 100644 index 000000000000..fcb4d8aec116 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/incremental/InputRowFilterResult.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import java.util.Arrays; + +/** + * Result of filtering an input row during ingestion. + */ +public enum InputRowFilterResult +{ + /** + * The row passed the filter and should be processed. + */ + ACCEPTED("accepted"), + /** + * The row was null or the input record was empty. + */ + NULL_OR_EMPTY_RECORD("null"), + + /** + * The row's timestamp is before the minimum message time (late message rejection). + */ + BEFORE_MIN_MESSAGE_TIME("beforeMinimumMessageTime"), + + /** + * The row's timestamp is after the maximum message time (early message rejection). + */ + AFTER_MAX_MESSAGE_TIME("afterMaximumMessageTime"), + + /** + * The row was filtered out by a transformSpec filter or other row filter. + */ + CUSTOM_FILTER("filtered"), + + /** + * A backwards-compatible value for tracking filter reasons for ingestion tasks using older Druid versions without filter reason tracking. + */ + UNKNOWN("unknown"); + + private static final InputRowFilterResult[] REJECTED_VALUES = Arrays.stream(InputRowFilterResult.values()) + .filter(InputRowFilterResult::isRejected) + .toArray(InputRowFilterResult[]::new); + + private final String reason; + + InputRowFilterResult(String reason) + { + this.reason = reason; + } + + /** + * Returns string value representation of this {@link InputRowFilterResult} for metric emission. + */ + public String getReason() + { + return reason; + } + + /** + * Returns true if this result indicates the row was rejected (thrown away). + * Returns false for {@link #ACCEPTED}. + */ + public boolean isRejected() + { + return this != ACCEPTED; + } + + /** + * Returns {@link InputRowFilterResult} that are rejection states. + */ + public static InputRowFilterResult[] rejectedValues() + { + return REJECTED_VALUES; + } + + /** + * Returns total number of {@link InputRowFilterResult} values. + */ + public static int numValues() + { + return values().length; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java index bff4f2e6de32..67eb80a2dc8b 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/NoopRowIngestionMeters.java @@ -23,13 +23,13 @@ import java.util.Map; /** - * This class is used only in {@code RealtimeIndexTask} which is deprecated now. + * This class is used only in {@code DartFrameContext}. * * Consider using {@link RowIngestionMetersFactory} instead. */ public class NoopRowIngestionMeters implements RowIngestionMeters { - private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, 0, 0); + private static final RowIngestionMetersTotals EMPTY_TOTALS = new RowIngestionMetersTotals(0, 0, 0, Map.of(), 0); @Override public long getProcessed() @@ -74,11 +74,17 @@ public long getThrownAway() } @Override - public void incrementThrownAway() + public void incrementThrownAway(InputRowFilterResult reason) { } + @Override + public Map getThrownAwayByReason() + { + return Map.of(); + } + @Override public RowIngestionMetersTotals getTotals() { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java index 25fc3bae481a..502c2057ebc0 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMeters.java @@ -40,6 +40,7 @@ public interface RowIngestionMeters extends InputStats String PROCESSED_WITH_ERROR = "processedWithError"; String UNPARSEABLE = "unparseable"; String THROWN_AWAY = "thrownAway"; + String THROWN_AWAY_BY_REASON = "thrownAwayByReason"; /** * Number of bytes read by an ingestion task. @@ -73,7 +74,17 @@ default long getProcessedBytes() void incrementUnparseable(); long getThrownAway(); - void incrementThrownAway(); + + /** + * Increments the thrown away counter for the specified {@link InputRowFilterResult} reason. + */ + void incrementThrownAway(InputRowFilterResult reason); + + /** + * Returns the count of thrown away events for each reason. + * Keyed by {@link InputRowFilterResult#getReason()}. + */ + Map getThrownAwayByReason(); RowIngestionMetersTotals getTotals(); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java index 2002bb24ac05..43e1b9857056 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/RowIngestionMetersTotals.java @@ -21,7 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class RowIngestionMetersTotals @@ -30,6 +34,7 @@ public class RowIngestionMetersTotals private final long processedBytes; private final long processedWithError; private final long thrownAway; + private final Map thrownAwayByReason; private final long unparseable; @JsonCreator @@ -38,13 +43,49 @@ public RowIngestionMetersTotals( @JsonProperty("processedBytes") long processedBytes, @JsonProperty("processedWithError") long processedWithError, @JsonProperty("thrownAway") long thrownAway, + @JsonProperty("thrownAwayByReason") @Nullable Map thrownAwayByReason, @JsonProperty("unparseable") long unparseable ) + { + this( + processed, + processedBytes, + processedWithError, + Configs.valueOrDefault(thrownAwayByReason, getBackwardsCompatibleThrownAwayByReason(thrownAway)), + unparseable + ); + } + + public RowIngestionMetersTotals( + long processed, + long processedBytes, + long processedWithError, + long thrownAway, + long unparseable + ) + { + this( + processed, + processedBytes, + processedWithError, + getBackwardsCompatibleThrownAwayByReason(thrownAway), + unparseable + ); + } + + public RowIngestionMetersTotals( + long processed, + long processedBytes, + long processedWithError, + Map thrownAwayByReason, + long unparseable + ) { this.processed = processed; this.processedBytes = processedBytes; this.processedWithError = processedWithError; - this.thrownAway = thrownAway; + this.thrownAway = thrownAwayByReason.values().stream().reduce(0L, Long::sum); + this.thrownAwayByReason = thrownAwayByReason; this.unparseable = unparseable; } @@ -72,6 +113,12 @@ public long getThrownAway() return thrownAway; } + @JsonProperty + public Map getThrownAwayByReason() + { + return thrownAwayByReason; + } + @JsonProperty public long getUnparseable() { @@ -92,13 +139,14 @@ public boolean equals(Object o) && processedBytes == that.processedBytes && processedWithError == that.processedWithError && thrownAway == that.thrownAway + && thrownAwayByReason.equals(that.thrownAwayByReason) && unparseable == that.unparseable; } @Override public int hashCode() { - return Objects.hash(processed, processedBytes, processedWithError, thrownAway, unparseable); + return Objects.hash(processed, processedBytes, processedWithError, thrownAway, thrownAwayByReason, unparseable); } @Override @@ -109,7 +157,21 @@ public String toString() ", processedBytes=" + processedBytes + ", processedWithError=" + processedWithError + ", thrownAway=" + thrownAway + + ", thrownAwayByReason=" + thrownAwayByReason + ", unparseable=" + unparseable + '}'; } + + /** + * For backwards compatibility, key by {@link InputRowFilterResult} in case of lack of thrownAwayByReason input during rolling Druid upgrades. + * This can occur when tasks running on older Druid versions return ingest statistic payloads to an overlord running on a newer Druid version. + */ + private static Map getBackwardsCompatibleThrownAwayByReason(long thrownAway) + { + Map results = new HashMap<>(); + if (thrownAway > 0) { + results.put(InputRowFilterResult.UNKNOWN.getReason(), thrownAway); + } + return results; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java index 10293e4e24ae..2140b7812414 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/SimpleRowIngestionMeters.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.incremental; +import java.util.HashMap; import java.util.Map; public class SimpleRowIngestionMeters implements RowIngestionMeters @@ -26,8 +27,8 @@ public class SimpleRowIngestionMeters implements RowIngestionMeters private long processed; private long processedWithError; private long unparseable; - private long thrownAway; private long processedBytes; + private final long[] thrownAwayByReason = new long[InputRowFilterResult.numValues()]; @Override public long getProcessed() @@ -80,13 +81,30 @@ public void incrementUnparseable() @Override public long getThrownAway() { - return thrownAway; + long total = 0; + for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) { + total += thrownAwayByReason[reason.ordinal()]; + } + return total; } @Override - public void incrementThrownAway() + public void incrementThrownAway(InputRowFilterResult reason) { - thrownAway++; + ++thrownAwayByReason[reason.ordinal()]; + } + + @Override + public Map getThrownAwayByReason() + { + final Map result = new HashMap<>(); + for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) { + long count = thrownAwayByReason[reason.ordinal()]; + if (count > 0) { + result.put(reason.getReason(), count); + } + } + return result; } @Override @@ -96,7 +114,7 @@ public RowIngestionMetersTotals getTotals() processed, processedBytes, processedWithError, - thrownAway, + getThrownAwayByReason(), unparseable ); } @@ -112,7 +130,11 @@ public void addRowIngestionMetersTotals(RowIngestionMetersTotals rowIngestionMet this.processed += rowIngestionMetersTotals.getProcessed(); this.processedWithError += rowIngestionMetersTotals.getProcessedWithError(); this.unparseable += rowIngestionMetersTotals.getUnparseable(); - this.thrownAway += rowIngestionMetersTotals.getThrownAway(); this.processedBytes += rowIngestionMetersTotals.getProcessedBytes(); + + final Map thrownAwayByReason = rowIngestionMetersTotals.getThrownAwayByReason(); + for (InputRowFilterResult reason : InputRowFilterResult.rejectedValues()) { + this.thrownAwayByReason[reason.ordinal()] += thrownAwayByReason.getOrDefault(reason.getReason(), 0L); + } } } diff --git a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java index 4ff36d731dfa..b137ffe1f357 100644 --- a/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/indexer/report/TaskReportSerdeTest.java @@ -30,6 +30,7 @@ import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.incremental.RowMeters; +import org.apache.druid.utils.CollectionUtils; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -353,6 +354,7 @@ private void verifyTotalRowStats( "processedBytes", (int) determinePartitionTotalStats.getProcessedBytes(), "processedWithError", (int) determinePartitionTotalStats.getProcessedWithError(), "thrownAway", (int) determinePartitionTotalStats.getThrownAway(), + "thrownAwayByReason", CollectionUtils.mapValues(determinePartitionTotalStats.getThrownAwayByReason(), Long::intValue), "unparseable", (int) determinePartitionTotalStats.getUnparseable() ), observedTotals.get("determinePartitions") @@ -363,6 +365,7 @@ private void verifyTotalRowStats( "processedBytes", (int) buildSegmentTotalStats.getProcessedBytes(), "processedWithError", (int) buildSegmentTotalStats.getProcessedWithError(), "thrownAway", (int) buildSegmentTotalStats.getThrownAway(), + "thrownAwayByReason", CollectionUtils.mapValues(buildSegmentTotalStats.getThrownAwayByReason(), Long::intValue), "unparseable", (int) buildSegmentTotalStats.getUnparseable() ), observedTotals.get("buildSegments") diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java new file mode 100644 index 000000000000..a8d932e4dcd9 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/incremental/InputRowFilterResultTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import org.junit.Assert; +import org.junit.Test; + +public class InputRowFilterResultTest +{ + @Test + public void testOrdinalValues() + { + Assert.assertEquals(0, InputRowFilterResult.ACCEPTED.ordinal()); + Assert.assertEquals(1, InputRowFilterResult.NULL_OR_EMPTY_RECORD.ordinal()); + Assert.assertEquals(2, InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.ordinal()); + Assert.assertEquals(3, InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.ordinal()); + Assert.assertEquals(4, InputRowFilterResult.CUSTOM_FILTER.ordinal()); + } + + @Test + public void testMetricValues() + { + Assert.assertEquals("accepted", InputRowFilterResult.ACCEPTED.getReason()); + Assert.assertEquals("null", InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason()); + Assert.assertEquals("beforeMinimumMessageTime", InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason()); + Assert.assertEquals("afterMaximumMessageTime", InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason()); + Assert.assertEquals("filtered", InputRowFilterResult.CUSTOM_FILTER.getReason()); + } + + @Test + public void testEnumCardinality() + { + Assert.assertEquals(6, InputRowFilterResult.values().length); + } + + @Test + public void testIsRejected() + { + Assert.assertFalse(InputRowFilterResult.ACCEPTED.isRejected()); + Assert.assertTrue(InputRowFilterResult.NULL_OR_EMPTY_RECORD.isRejected()); + Assert.assertTrue(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.isRejected()); + Assert.assertTrue(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.isRejected()); + Assert.assertTrue(InputRowFilterResult.CUSTOM_FILTER.isRejected()); + Assert.assertTrue(InputRowFilterResult.UNKNOWN.isRejected()); + } +} + diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java b/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java index 02b1d290003e..f155e70424dc 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/RowMeters.java @@ -19,6 +19,9 @@ package org.apache.druid.segment.incremental; +import java.util.HashMap; +import java.util.Map; + /** * Utility class to build {@link RowIngestionMetersTotals}, used in tests. */ @@ -27,7 +30,7 @@ public class RowMeters private long processedBytes; private long processedWithError; private long unparseable; - private long thrownAway; + private final Map thrownAwayByReason = new HashMap<>(); /** * Creates a new {@link RowMeters}, that can be used to build an instance of @@ -56,14 +59,20 @@ public RowMeters unparseable(long unparseable) return this; } + public RowMeters thrownAwayByReason(InputRowFilterResult thrownAwayByReason, long thrownAway) + { + this.thrownAwayByReason.put(thrownAwayByReason.getReason(), thrownAway); + return this; + } + public RowMeters thrownAway(long thrownAway) { - this.thrownAway = thrownAway; + this.thrownAwayByReason.put(InputRowFilterResult.UNKNOWN.getReason(), thrownAway); return this; } public RowIngestionMetersTotals totalProcessed(long processed) { - return new RowIngestionMetersTotals(processed, processedBytes, processedWithError, thrownAway, unparseable); + return new RowIngestionMetersTotals(processed, processedBytes, processedWithError, thrownAwayByReason, unparseable); } } diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java index 5f46129c1707..9cccff0c880a 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/SimpleRowIngestionMetersTest.java @@ -22,6 +22,8 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Map; + public class SimpleRowIngestionMetersTest { @Test @@ -32,16 +34,49 @@ public void testIncrement() rowIngestionMeters.incrementProcessedBytes(5); rowIngestionMeters.incrementProcessedWithError(); rowIngestionMeters.incrementUnparseable(); - rowIngestionMeters.incrementThrownAway(); - Assert.assertEquals(rowIngestionMeters.getTotals(), new RowIngestionMetersTotals(1, 5, 1, 1, 1)); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + final Map expected = Map.of(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason(), 1L); + Assert.assertEquals(new RowIngestionMetersTotals(1, 5, 1, expected, 1), rowIngestionMeters.getTotals()); } @Test public void testAddRowIngestionMetersTotals() { SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters(); - RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10, 0, 1, 0, 1); + RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10, 0, 1, 1, 1); rowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals); - Assert.assertEquals(rowIngestionMeters.getTotals(), rowIngestionMetersTotals); + Assert.assertEquals(rowIngestionMetersTotals, rowIngestionMeters.getTotals()); + } + + @Test + public void testIncrementThrownAwayWithReason() + { + SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters(); + + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.NULL_OR_EMPTY_RECORD); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + rowIngestionMeters.incrementThrownAway(InputRowFilterResult.CUSTOM_FILTER); + + Assert.assertEquals(7, rowIngestionMeters.getThrownAway()); + + Map byReason = rowIngestionMeters.getThrownAwayByReason(); + Assert.assertEquals(Long.valueOf(2), byReason.get(InputRowFilterResult.NULL_OR_EMPTY_RECORD.getReason())); + Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.BEFORE_MIN_MESSAGE_TIME.getReason())); + Assert.assertEquals(Long.valueOf(1), byReason.get(InputRowFilterResult.AFTER_MAX_MESSAGE_TIME.getReason())); + Assert.assertEquals(Long.valueOf(3), byReason.get(InputRowFilterResult.CUSTOM_FILTER.getReason())); + } + + @Test + public void testGetThrownAwayByReasonReturnsNoRejectedReasons() + { + SimpleRowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters(); + + // With no increments, no rejected reasons should be present + Map byReason = rowIngestionMeters.getThrownAwayByReason(); + Assert.assertTrue(byReason.isEmpty()); } }