Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|

## Shuffle metrics (Native parallel task)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
Expand Down Expand Up @@ -671,6 +672,14 @@ protected boolean waitForSegmentAvailability(
}
finally {
segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
toolbox.getEmitter().emit(
new ServiceMetricEvent.Builder()
.setDimension("dataSource", getDataSource())
.setDimension("taskType", getType())
.setDimension("taskId", getId())
.setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted)
.build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
Expand Down Expand Up @@ -90,6 +92,7 @@
import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -124,6 +127,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@RunWith(Parameterized.class)
Expand Down Expand Up @@ -1086,6 +1091,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() throws IOExc
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();

EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
mockNotifier.start();
Expand Down Expand Up @@ -1150,12 +1156,75 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() throws IOExc
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();

EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();

EasyMock.replay(mockToolbox);
EasyMock.replay(mockDataSegment1, mockDataSegment2);

Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000));
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}

@Test
public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException, InterruptedException
{
final File tmpDir = temporaryFolder.newFolder();

LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
latchEmitter.latch = new CountDownLatch(1);

TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);

DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class);
List<DataSegment> segmentsToWaitFor = new ArrayList<>();
segmentsToWaitFor.add(mockDataSegment1);
segmentsToWaitFor.add(mockDataSegment2);

IndexTask indexTask = new IndexTask(
null,
null,
createDefaultIngestionSpec(
jsonMapper,
tmpDir,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
null,
createTuningConfigWithMaxRowsPerSegment(2, true),
false,
false
),
null
);

EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once();
EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once();
EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once();
EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once();
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();

EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
.andReturn(new NoopSegmentHandoffNotifierFactory())
.once();
EasyMock.expect(mockToolbox.getEmitter())
.andReturn(latchEmitter).anyTimes();

EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();

EasyMock.replay(mockToolbox);
EasyMock.replay(mockDataSegment1, mockDataSegment2);

Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000));
latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
EasyMock.verify(mockToolbox);
EasyMock.verify(mockDataSegment1, mockDataSegment2);
}
Expand Down Expand Up @@ -2709,6 +2778,27 @@ private static IndexIngestionSpec createIngestionSpec(
}
}

/**
* Used to test that expected metric is emitted by AbstractBatchIndexTask#waitForSegmentAvailability
*/
private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;

private LatchableServiceEmitter()
{
super("", "", null);
}

@Override
public void emit(Event event)
{
if (latch != null && "task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}

@Test
public void testEqualsAndHashCode()
{
Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,7 @@ numMetrics
poolKind
poolName
remoteAddress
segmentAvailabilityConfirmed
serviceName
taskStatus
taskType
Expand Down