diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 8a41db527bab..38f715f848aa 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -25,7 +25,7 @@ import java.util.ArrayList; import java.util.List; -class StubServiceEmitter extends ServiceEmitter +public class StubServiceEmitter extends ServiceEmitter { private List events = new ArrayList<>(); diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index e3270230aaa7..c9e8a3a0638e 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -163,6 +163,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskId, taskType, interval.|Varies.| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskId, taskType, interval.|Varies.| +|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| ## Coordination diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 4428661513a4..25fe06fe608a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -41,14 +41,16 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; +import org.apache.druid.server.metrics.TaskCountStatsProvider; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; /** * Encapsulates the indexer leadership lifecycle. */ -public class TaskMaster +public class TaskMaster implements TaskCountStatsProvider { private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); @@ -265,4 +267,59 @@ public Optional getSupervisorManager() return Optional.absent(); } } + + @Override + public Map getSuccessfulTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getSuccessfulTaskCount(); + } else { + return null; + } + } + + @Override + public Map getFailedTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getFailedTaskCount(); + } else { + return null; + } + } + + @Override + public Map getRunningTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getRunningTaskCount(); + } else { + return null; + } + } + + @Override + public Map getPendingTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getPendingTaskCount(); + } else { + return null; + } + } + + @Override + public Map getWaitingTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getWaitingTaskCount(); + } else { + return null; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 9a9c41dfbb83..ef30f15f2259 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -53,12 +53,15 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * Interface between task producers and the task runner. @@ -100,6 +103,11 @@ public class TaskQueue private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); + private final Map totalSuccessfulTaskCount = new ConcurrentHashMap<>(); + private final Map totalFailedTaskCount = new ConcurrentHashMap<>(); + private Map prevTotalSuccessfulTaskCount = new HashMap<>(); + private Map prevTotalFailedTaskCount = new HashMap<>(); + @Inject public TaskQueue( TaskQueueConfig config, @@ -510,6 +518,14 @@ private void handleStatus(final TaskStatus status) task, status.getDuration() ); + + if (status.isSuccess()) { + totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()) + .incrementAndGet(); + } else { + totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()) + .incrementAndGet(); + } } } catch (Exception e) { @@ -586,4 +602,70 @@ private static Map toTaskIDMap(List taskList) return rv; } + private Map getDeltaValues(Map total, Map prev) + { + return total.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); + } + + public Map getSuccessfulTaskCount() + { + Map total = totalSuccessfulTaskCount.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().get() + )); + Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); + prevTotalSuccessfulTaskCount = total; + return delta; + } + + public Map getFailedTaskCount() + { + Map total = totalFailedTaskCount.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().get() + )); + Map delta = getDeltaValues(total, prevTotalFailedTaskCount); + prevTotalFailedTaskCount = total; + return delta; + } + + public Map getRunningTaskCount() + { + Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + return taskRunner.getRunningTasks() + .stream() + .collect(Collectors.toMap( + e -> taskDatasources.getOrDefault(e.getTaskId(), ""), + e -> 1L, + Long::sum + )); + } + + public Map getPendingTaskCount() + { + Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + return taskRunner.getPendingTasks() + .stream() + .collect(Collectors.toMap( + e -> taskDatasources.getOrDefault(e.getTaskId(), ""), + e -> 1L, + Long::sum + )); + } + + public Map getWaitingTaskCount() + { + Set runnerKnownTaskIds = taskRunner.getKnownTasks() + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()); + return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) + .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); + } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java new file mode 100644 index 000000000000..6c3394e97753 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.inject.Inject; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; + +import java.util.Map; + +public class TaskCountStatsMonitor extends AbstractMonitor +{ + private final TaskCountStatsProvider statsProvider; + + @Inject + public TaskCountStatsMonitor( + TaskCountStatsProvider statsProvider + ) + { + this.statsProvider = statsProvider; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + emit(emitter, "task/success/count", statsProvider.getSuccessfulTaskCount()); + emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount()); + emit(emitter, "task/running/count", statsProvider.getRunningTaskCount()); + emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount()); + emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount()); + return true; + } + + private void emit(ServiceEmitter emitter, String key, Map counts) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + if (counts != null) { + counts.forEach((k, v) -> { + builder.setDimension("dataSource", k); + emitter.emit(builder.build(key, v)); + }); + } + } + +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java new file mode 100644 index 000000000000..a96f8ce0623a --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import java.util.Map; + +public interface TaskCountStatsProvider +{ + /** + * Return the number of successful tasks for each datasource during emission period. + */ + Map getSuccessfulTaskCount(); + + /** + * Return the number of failed tasks for each datasource during emission period. + */ + Map getFailedTaskCount(); + + /** + * Return the number of current running tasks for each datasource. + */ + Map getRunningTaskCount(); + + /** + * Return the number of current pending tasks for each datasource. + */ + Map getPendingTaskCount(); + + /** + * Return the number of current waiting tasks for each datasource. + */ + Map getWaitingTaskCount(); +} diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java new file mode 100644 index 000000000000..24d89ab957a2 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class TaskCountStatsMonitorTest +{ + private TaskCountStatsProvider statsProvider; + + @Before + public void setUp() + { + statsProvider = new TaskCountStatsProvider() + { + @Override + public Map getSuccessfulTaskCount() + { + return ImmutableMap.of("d1", 1L); + } + + @Override + public Map getFailedTaskCount() + { + return ImmutableMap.of("d1", 1L); + } + + @Override + public Map getRunningTaskCount() + { + return ImmutableMap.of("d1", 1L); + } + + @Override + public Map getPendingTaskCount() + { + return ImmutableMap.of("d1", 1L); + } + + @Override + public Map getWaitingTaskCount() + { + return ImmutableMap.of("d1", 1L); + } + }; + } + + @Test + public void testMonitor() + { + final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + Assert.assertEquals(5, emitter.getEvents().size()); + Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value")); + Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value")); + Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value")); + Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value")); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 038ad837c4d6..540722195e8a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -100,6 +100,7 @@ import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.jetty.JettyServerInitUtils; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.metrics.TaskCountStatsProvider; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationUtils; import org.apache.druid.server.security.Authenticator; @@ -170,6 +171,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class); + binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); binder.bind(