diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index dc93947060a4..b525424facdd 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -213,6 +213,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.| ## Shuffle metrics (Native parallel task) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 59ed88385c71..28f8064faf0c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -53,6 +53,7 @@ import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -671,6 +672,14 @@ protected boolean waitForSegmentAvailability( } finally { segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + toolbox.getEmitter().emit( + new ServiceMetricEvent.Builder() + .setDimension("dataSource", getDataSource()) + .setDimension("taskType", getType()) + .setDimension("taskId", getId()) + .setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted) + .build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs) + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 4cce707eb9c7..b714947e95ad 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -61,6 +61,8 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; @@ -90,6 +92,7 @@ import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -124,6 +127,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @RunWith(Parameterized.class) @@ -1086,6 +1091,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws IOExc EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once(); + EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes(); EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once(); EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once(); mockNotifier.start(); @@ -1150,12 +1156,75 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOExc EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()) .andReturn(new NoopSegmentHandoffNotifierFactory()) .once(); + EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes(); + + EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once(); + + EasyMock.replay(mockToolbox); + EasyMock.replay(mockDataSegment1, mockDataSegment2); + + Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000)); + EasyMock.verify(mockToolbox); + EasyMock.verify(mockDataSegment1, mockDataSegment2); + } + + @Test + public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException, InterruptedException + { + final File tmpDir = temporaryFolder.newFolder(); + + LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter(); + latchEmitter.latch = new CountDownLatch(1); + + TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); + + DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class); + DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class); + List segmentsToWaitFor = new ArrayList<>(); + segmentsToWaitFor.add(mockDataSegment1); + segmentsToWaitFor.add(mockDataSegment2); + + IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + null, + createTuningConfigWithMaxRowsPerSegment(2, true), + false, + false + ), + null + ); + + EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once(); + EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once(); + EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); + EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once(); + EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once(); + EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once(); + EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); + EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once(); + + EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()) + .andReturn(new NoopSegmentHandoffNotifierFactory()) + .once(); + EasyMock.expect(mockToolbox.getEmitter()) + .andReturn(latchEmitter).anyTimes(); + EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once(); EasyMock.replay(mockToolbox); EasyMock.replay(mockDataSegment1, mockDataSegment2); Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000)); + latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS); EasyMock.verify(mockToolbox); EasyMock.verify(mockDataSegment1, mockDataSegment2); } @@ -2709,6 +2778,27 @@ private static IndexIngestionSpec createIngestionSpec( } } + /** + * Used to test that expected metric is emitted by AbstractBatchIndexTask#waitForSegmentAvailability + */ + private static class LatchableServiceEmitter extends ServiceEmitter + { + private CountDownLatch latch; + + private LatchableServiceEmitter() + { + super("", "", null); + } + + @Override + public void emit(Event event) + { + if (latch != null && "task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) { + latch.countDown(); + } + } + } + @Test public void testEqualsAndHashCode() { diff --git a/website/.spelling b/website/.spelling index 58e8f8191002..9b11ed6592ab 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1359,6 +1359,7 @@ numMetrics poolKind poolName remoteAddress +segmentAvailabilityConfirmed serviceName taskStatus taskType