From 90b64e84254fb6eb0e4bd229e0e6b25e3de96de6 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 29 Nov 2021 14:54:48 -0600 Subject: [PATCH 1/4] add a unit test that tests that new metric is emitted --- docs/operations/metrics.md | 1 + .../common/task/AbstractBatchIndexTask.java | 9 ++ .../indexing/common/task/IndexTaskTest.java | 88 +++++++++++++++++++ website/.spelling | 1 + 4 files changed, 99 insertions(+) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index ef39085381d7..96c0bcea0e47 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -215,6 +215,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| +|`task/segmentAvailability/wait/time`|The amount of milliseconds a task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.| ## Shuffle metrics (Native parallel task) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 59ed88385c71..28f8064faf0c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -53,6 +53,7 @@ import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -671,6 +672,14 @@ protected boolean waitForSegmentAvailability( } finally { segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + toolbox.getEmitter().emit( + new ServiceMetricEvent.Builder() + .setDimension("dataSource", getDataSource()) + .setDimension("taskType", getType()) + .setDimension("taskId", getId()) + .setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted) + .build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs) + ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index f805b2c5ce24..102e8cdae214 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -61,6 +61,9 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; @@ -123,6 +126,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; @RunWith(Parameterized.class) public class IndexTaskTest extends IngestionTestBase @@ -1148,12 +1153,74 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOExc EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()) .andReturn(new NoopSegmentHandoffNotifierFactory()) .once(); + + EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once(); + + EasyMock.replay(mockToolbox); + EasyMock.replay(mockDataSegment1, mockDataSegment2); + + Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000)); + EasyMock.verify(mockToolbox); + EasyMock.verify(mockDataSegment1, mockDataSegment2); + } + + @Test + public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException, InterruptedException + { + final File tmpDir = temporaryFolder.newFolder(); + + LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter(); + latchEmitter.latch = new CountDownLatch(1); + + TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); + + DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class); + DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class); + List segmentsToWaitFor = new ArrayList<>(); + segmentsToWaitFor.add(mockDataSegment1); + segmentsToWaitFor.add(mockDataSegment2); + + IndexTask indexTask = new IndexTask( + null, + null, + createDefaultIngestionSpec( + jsonMapper, + tmpDir, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + null, + createTuningConfigWithMaxRowsPerSegment(2, true), + false, + false + ), + null + ); + + EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once(); + EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once(); + EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); + EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once(); + EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once(); + EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once(); + EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); + EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once(); + + EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()) + .andReturn(new NoopSegmentHandoffNotifierFactory()) + .once(); + EasyMock.expect(mockToolbox.getEmitter()) + .andReturn(latchEmitter).anyTimes(); + EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once(); EasyMock.replay(mockToolbox); EasyMock.replay(mockDataSegment1, mockDataSegment2); Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000)); + latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS); EasyMock.verify(mockToolbox); EasyMock.verify(mockDataSegment1, mockDataSegment2); } @@ -2604,6 +2671,27 @@ private static IndexIngestionSpec createIngestionSpec( } } + /** + * Used to test that expected metric is emitted by AbstractBatchIndexTask#waitForSegmentAvailability + */ + private static class LatchableServiceEmitter extends ServiceEmitter + { + private CountDownLatch latch; + + private LatchableServiceEmitter() + { + super("", "", null); + } + + @Override + public void emit(Event event) + { + if (latch != null && "task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) { + latch.countDown(); + } + } + } + @Test public void testEqualsAndHashCode() { diff --git a/website/.spelling b/website/.spelling index aa2bc7874884..2d5de401065d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1323,6 +1323,7 @@ numMetrics poolKind poolName remoteAddress +segmentAvailabilityConfirmed serviceName taskStatus taskType From 018aa60359eaefa6524d80a9684cd56dc45da781 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 29 Nov 2021 15:29:58 -0600 Subject: [PATCH 2/4] remove unused import --- .../org/apache/druid/indexing/common/task/IndexTaskTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 102e8cdae214..1149db582c99 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -63,7 +63,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; From 1007c0fa0a7749a58192db957df4efbe4cc0b5dc Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Mon, 29 Nov 2021 15:34:07 -0600 Subject: [PATCH 3/4] clarify in doc that this is for batch tasks --- docs/operations/metrics.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 96c0bcea0e47..b23779d436e4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -215,7 +215,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| |`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.| -|`task/segmentAvailability/wait/time`|The amount of milliseconds a task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.| +|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.| ## Shuffle metrics (Native parallel task) From 594cd66fb168e74d75aa4228d762bd41989c3a92 Mon Sep 17 00:00:00 2001 From: "Lucas.Capistrant" Date: Wed, 8 Dec 2021 09:45:58 -0600 Subject: [PATCH 4/4] fix IndexTaskTest --- .../org/apache/druid/indexing/common/task/IndexTaskTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 1149db582c99..3c1fee7b8932 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -92,6 +92,7 @@ import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -1088,6 +1089,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws IOExc EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once(); + EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes(); EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once(); EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once(); mockNotifier.start(); @@ -1152,6 +1154,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOExc EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()) .andReturn(new NoopSegmentHandoffNotifierFactory()) .once(); + EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes(); EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();