diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index ccb20f2e6dd1..46ee96df5b8e 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -402,7 +402,7 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
-| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon |1|
+| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `status`, `dataSource`, `tags` on the Peon |1|
### Historical
diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index a83aeb1bc7e1..48bc72505fe5 100644
--- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -190,7 +190,7 @@
"namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" },
"namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" },
- "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" },
+ "service/heartbeat" : { "dimensions" : ["leader", "workerVersion", "category", "status", "taskId", "groupId", "dataSource", "taskStatus" ], "type" : "count" },
"killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" },
"killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" },
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 3bae10a1969b..0621f6f9ba15 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2270,6 +2270,8 @@ public void testRunWithPauseAndResume() throws Exception
)
);
+ Assert.assertEquals(Status.NOT_STARTED.toString(), task.getCurrentRunnerStatus());
+
final ListenableFuture future = runTask(task);
// Insert some data, but not enough for the task to finish
@@ -2280,6 +2282,8 @@ public void testRunWithPauseAndResume() throws Exception
}
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+ Assert.assertEquals(Status.READING.toString(), task.getCurrentRunnerStatus());
+
Map currentOffsets = OBJECT_MAPPER.readValue(
task.getRunner().pause().getEntity().toString(),
@@ -2288,6 +2292,7 @@ public void testRunWithPauseAndResume() throws Exception
}
);
Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus());
+ Assert.assertEquals(Status.PAUSED.toString(), task.getCurrentRunnerStatus());
// Insert remaining data
insertData(Iterables.skip(records, 4));
@@ -2305,6 +2310,7 @@ public void testRunWithPauseAndResume() throws Exception
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
+ Assert.assertEquals(Status.PUBLISHING.toString(), task.getCurrentRunnerStatus());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
// Check published metadata and segments in deep storage
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index d43b83d78d07..7b10c0afdccf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -179,6 +179,16 @@ public QueryRunner getQueryRunner(Query query)
return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
}
+ /**
+ * @return the current status of this task.
+ */
+ @Nullable
+ public String getCurrentRunnerStatus()
+ {
+ SeekableStreamIndexTaskRunner.Status status = (getRunner() != null) ? getRunner().getStatus() : null;
+ return (status != null) ? status.toString() : null;
+ }
+
public Appenderator newAppenderator(
TaskToolbox toolbox,
SegmentGenerationMetrics metrics,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 8a3d36ea67fe..f182ced69fbb 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -143,6 +143,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
private SeekableStreamSupervisorSpec spec;
private SeekableStreamIndexTaskClient indexTaskClient;
private RecordSupplier recordSupplier;
+ private SeekableStreamIndexTaskRunner streamingTaskRunner;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private SupervisorStateManagerConfig supervisorConfig;
@@ -161,6 +162,7 @@ public void setupTest()
spec = createMock(SeekableStreamSupervisorSpec.class);
indexTaskClient = createMock(SeekableStreamIndexTaskClient.class);
recordSupplier = createMock(RecordSupplier.class);
+ streamingTaskRunner = createMock(SeekableStreamIndexTaskRunner.class);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@@ -1459,6 +1461,8 @@ public Duration getEmissionDuration()
supervisor.start();
supervisor.runInternal();
+ Assert.assertNull(id1.getCurrentRunnerStatus());
+
supervisor.checkpoint(
0,
new TestSeekableStreamDataSourceMetadata(
@@ -1518,6 +1522,8 @@ public Duration getEmissionDuration()
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+ EasyMock.expect(streamingTaskRunner.getStatus()).andReturn(null);
+ EasyMock.expect(streamingTaskRunner.getStatus()).andReturn(SeekableStreamIndexTaskRunner.Status.NOT_STARTED);
SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();
@@ -1543,7 +1549,8 @@ public Duration getEmissionDuration()
ioConfig
),
context,
- "0"
+ "0",
+ streamingTaskRunner
);
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
@@ -1595,6 +1602,9 @@ public Duration getEmissionDuration()
supervisor.runInternal();
supervisor.handoffTaskGroupsEarly(ImmutableList.of(0));
+ Assert.assertNull(id1.getCurrentRunnerStatus());
+ Assert.assertEquals("NOT_STARTED", id1.getCurrentRunnerStatus());
+
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
@@ -2704,6 +2714,8 @@ public String toString()
private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask
{
+ private final SeekableStreamIndexTaskRunner streamingTaskRunner;
+
public TestSeekableStreamIndexTask(
String id,
@Nullable TaskResource taskResource,
@@ -2713,6 +2725,29 @@ public TestSeekableStreamIndexTask(
@Nullable Map context,
@Nullable String groupId
)
+ {
+ this(
+ id,
+ taskResource,
+ dataSchema,
+ tuningConfig,
+ ioConfig,
+ context,
+ groupId,
+ null
+ );
+ }
+
+ public TestSeekableStreamIndexTask(
+ String id,
+ @Nullable TaskResource taskResource,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig,
+ SeekableStreamIndexTaskIOConfig ioConfig,
+ @Nullable Map context,
+ @Nullable String groupId,
+ @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner
+ )
{
super(
id,
@@ -2723,12 +2758,14 @@ public TestSeekableStreamIndexTask(
context,
groupId
);
+ this.streamingTaskRunner = streamingTaskRunner;
}
+ @Nullable
@Override
protected SeekableStreamIndexTaskRunner createTaskRunner()
{
- return null;
+ return streamingTaskRunner;
}
@Override
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 15374625d301..9d4bffecb5a8 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -25,7 +25,6 @@
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -91,6 +90,7 @@
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
@@ -303,19 +303,7 @@ public void configure(Binder binder)
@Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
public Supplier