Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon

|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.<br />`workerVersion`, `category`, `status` on the Middle Manager.<br />`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon |1|
| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.<br />`workerVersion`, `category`, `status` on the Middle Manager.<br />`taskId`, `groupId`, `taskType`, `status`, `dataSource`, `tags` on the Peon |1|

### Historical

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@
"namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" },
"namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" },

"service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" },
"service/heartbeat" : { "dimensions" : ["leader", "workerVersion", "category", "status", "taskId", "groupId", "dataSource", "taskStatus" ], "type" : "count" },

"killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" },
"killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,8 @@ public void testRunWithPauseAndResume() throws Exception
)
);

Assert.assertEquals(Status.NOT_STARTED.toString(), task.getCurrentRunnerStatus());

final ListenableFuture<TaskStatus> future = runTask(task);

// Insert some data, but not enough for the task to finish
Expand All @@ -2280,6 +2282,8 @@ public void testRunWithPauseAndResume() throws Exception
}
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
Assert.assertEquals(Status.READING.toString(), task.getCurrentRunnerStatus());


Map<KafkaTopicPartition, Long> currentOffsets = OBJECT_MAPPER.readValue(
task.getRunner().pause().getEntity().toString(),
Expand All @@ -2288,6 +2292,7 @@ public void testRunWithPauseAndResume() throws Exception
}
);
Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus());
Assert.assertEquals(Status.PAUSED.toString(), task.getCurrentRunnerStatus());
// Insert remaining data
insertData(Iterables.skip(records, 4));

Expand All @@ -2305,6 +2310,7 @@ public void testRunWithPauseAndResume() throws Exception

Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
Assert.assertEquals(Status.PUBLISHING.toString(), task.getCurrentRunnerStatus());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));

// Check published metadata and segments in deep storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
}

/**
* @return the current status of this task.
*/
@Nullable
public String getCurrentRunnerStatus()
{
SeekableStreamIndexTaskRunner.Status status = (getRunner() != null) ? getRunner().getStatus() : null;
return (status != null) ? status.toString() : null;
}

public Appenderator newAppenderator(
TaskToolbox toolbox,
SegmentGenerationMetrics metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
private SeekableStreamSupervisorSpec spec;
private SeekableStreamIndexTaskClient indexTaskClient;
private RecordSupplier<String, String, ByteEntity> recordSupplier;
private SeekableStreamIndexTaskRunner<String, String, ByteEntity> streamingTaskRunner;

private RowIngestionMetersFactory rowIngestionMetersFactory;
private SupervisorStateManagerConfig supervisorConfig;
Expand All @@ -161,6 +162,7 @@ public void setupTest()
spec = createMock(SeekableStreamSupervisorSpec.class);
indexTaskClient = createMock(SeekableStreamIndexTaskClient.class);
recordSupplier = createMock(RecordSupplier.class);
streamingTaskRunner = createMock(SeekableStreamIndexTaskRunner.class);

rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();

Expand Down Expand Up @@ -1459,6 +1461,8 @@ public Duration getEmissionDuration()
supervisor.start();
supervisor.runInternal();

Assert.assertNull(id1.getCurrentRunnerStatus());

supervisor.checkpoint(
0,
new TestSeekableStreamDataSourceMetadata(
Expand Down Expand Up @@ -1518,6 +1522,8 @@ public Duration getEmissionDuration()
EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
EasyMock.expect(streamingTaskRunner.getStatus()).andReturn(null);
EasyMock.expect(streamingTaskRunner.getStatus()).andReturn(SeekableStreamIndexTaskRunner.Status.NOT_STARTED);

SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

Expand All @@ -1543,7 +1549,8 @@ public Duration getEmissionDuration()
ioConfig
),
context,
"0"
"0",
streamingTaskRunner
);

final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
Expand Down Expand Up @@ -1595,6 +1602,9 @@ public Duration getEmissionDuration()
supervisor.runInternal();
supervisor.handoffTaskGroupsEarly(ImmutableList.of(0));

Assert.assertNull(id1.getCurrentRunnerStatus());
Assert.assertEquals("NOT_STARTED", id1.getCurrentRunnerStatus());

while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
Expand Down Expand Up @@ -2704,6 +2714,8 @@ public String toString()

private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
private final SeekableStreamIndexTaskRunner<String, String, ByteEntity> streamingTaskRunner;

public TestSeekableStreamIndexTask(
String id,
@Nullable TaskResource taskResource,
Expand All @@ -2713,6 +2725,29 @@ public TestSeekableStreamIndexTask(
@Nullable Map<String, Object> context,
@Nullable String groupId
)
{
this(
id,
taskResource,
dataSchema,
tuningConfig,
ioConfig,
context,
groupId,
null
);
}

public TestSeekableStreamIndexTask(
String id,
@Nullable TaskResource taskResource,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig,
SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
@Nullable Map<String, Object> context,
@Nullable String groupId,
@Nullable SeekableStreamIndexTaskRunner<String, String, ByteEntity> streamingTaskRunner
)
{
super(
id,
Expand All @@ -2723,12 +2758,14 @@ public TestSeekableStreamIndexTask(
context,
groupId
);
this.streamingTaskRunner = streamingTaskRunner;
}

@Nullable
@Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
{
return null;
return streamingTaskRunner;
}

@Override
Expand Down
39 changes: 25 additions & 14 deletions services/src/main/java/org/apache/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -91,6 +90,7 @@
import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
Expand Down Expand Up @@ -303,19 +303,7 @@ public void configure(Binder binder)
@Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
public Supplier<Map<String, Object>> heartbeatDimensions(Task task)
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put(DruidMetrics.TASK_ID, task.getId());
builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
builder.put(DruidMetrics.TASK_TYPE, task.getType());
builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags != null && !tags.isEmpty()) {
builder.put(DruidMetrics.TAGS, tags);
}

return Suppliers.ofInstance(
builder.build()
);
return () -> CliPeon.heartbeatDimensions(task);
}

@Provides
Expand Down Expand Up @@ -565,6 +553,29 @@ static void configureIntermediaryData(Binder binder)
shuffleClientBiddy.addBinding("deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class);
}

static Map<String, Object> heartbeatDimensions(Task task)
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put(DruidMetrics.TASK_ID, task.getId());
builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
builder.put(DruidMetrics.TASK_TYPE, task.getType());
builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags != null && !tags.isEmpty()) {
builder.put(DruidMetrics.TAGS, tags);
}

if (task instanceof SeekableStreamIndexTask) {
SeekableStreamIndexTask streamingTask = (SeekableStreamIndexTask) task;
String status = streamingTask.getCurrentRunnerStatus();
if (status != null) {
builder.put(DruidMetrics.STATUS, status);
}
}

return builder.build();
}

public class BroadcastSegmentLoadingModule implements Module
{
@Override
Expand Down
Loading