From ece49674a71c63b19a8404c27b0eac998d5afa5e Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Fri, 23 Nov 2018 20:53:19 +0800 Subject: [PATCH 1/4] Add TaskCountStatsMonitor to monitor task count stats --- docs/content/operations/metrics.md | 4 ++ .../druid/indexing/overlord/TaskMaster.java | 48 +++++++++++++- .../druid/indexing/overlord/TaskQueue.java | 48 ++++++++++++++ .../server/metrics/TaskCountStatsMonitor.java | 62 +++++++++++++++++++ .../metrics/TaskCountStatsProvider.java | 46 ++++++++++++++ .../org/apache/druid/cli/CliOverlord.java | 2 + 6 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java create mode 100644 server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index e3270230aaa7..35642c50c856 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -163,6 +163,10 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskId, taskType, interval.|Varies.| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskId, taskType, interval.|Varies.| +|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| ## Coordination diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 4428661513a4..3d4b0f3fcf92 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -41,14 +41,16 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig; +import org.apache.druid.server.metrics.TaskCountStatsProvider; +import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; /** * Encapsulates the indexer leadership lifecycle. */ -public class TaskMaster +public class TaskMaster implements TaskCountStatsProvider { private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); @@ -265,4 +267,48 @@ public Optional getSupervisorManager() return Optional.absent(); } } + + @Override + public Map getSuccessfulTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getSuccessfulTaskCount(); + } else { + return null; + } + } + + @Override + public Map getFailedTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getFailedTaskCount(); + } else { + return null; + } + } + + @Override + public Map getRunningTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getRunningTaskCount(); + } else { + return null; + } + } + + @Override + public Map getPendingTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getPendingTaskCount(); + } else { + return null; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 9a9c41dfbb83..0c33f2580123 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -53,12 +53,15 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * Interface between task producers and the task runner. @@ -100,6 +103,11 @@ public class TaskQueue private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); + private final Map totalSuccessfulTaskCount = new ConcurrentHashMap<>(); + private final Map totalFailedTaskCount = new ConcurrentHashMap<>(); + private Map prevSuccessfulTaskCount = new HashMap<>(); + private Map prevFailedTaskCount = new HashMap<>(); + @Inject public TaskQueue( TaskQueueConfig config, @@ -510,6 +518,14 @@ private void handleStatus(final TaskStatus status) task, status.getDuration() ); + + if (status.isSuccess()) { + totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()); + totalSuccessfulTaskCount.get(task.getDataSource()).incrementAndGet(); + } else { + totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()); + totalFailedTaskCount.get(task.getDataSource()).incrementAndGet(); + } } } catch (Exception e) { @@ -586,4 +602,36 @@ private static Map toTaskIDMap(List taskList) return rv; } + private Map getDeltaValues(Map total, Map prev) + { + return total.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); + } + + public Map getSuccessfulTaskCount() + { + Map total = totalSuccessfulTaskCount.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); + Map delta = getDeltaValues(total, prevSuccessfulTaskCount); + prevSuccessfulTaskCount = total; + return delta; + } + + public Map getFailedTaskCount() + { + Map total = totalFailedTaskCount.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); + Map delta = getDeltaValues(total, prevFailedTaskCount); + prevFailedTaskCount = total; + return delta; + } + + public Map getRunningTaskCount() + { + Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + return taskRunner.getRunningTasks().stream().collect(Collectors.toMap(e -> taskDatasources.getOrDefault(e.getTaskId(), ""), e -> 1L, Long::sum)); + } + + public Map getPendingTaskCount() + { + Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); + return taskRunner.getPendingTasks().stream().collect(Collectors.toMap(e -> taskDatasources.getOrDefault(e.getTaskId(), ""), e -> 1L, Long::sum)); + } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java new file mode 100644 index 000000000000..09d4ecaa6709 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import com.google.inject.Inject; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; + +import java.util.Map; + +public class TaskCountStatsMonitor extends AbstractMonitor +{ + private final TaskCountStatsProvider statsProvider; + + @Inject + public TaskCountStatsMonitor( + TaskCountStatsProvider statsProvider + ) + { + this.statsProvider = statsProvider; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + emit(emitter, "task/success/count", statsProvider.getSuccessfulTaskCount()); + emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount()); + emit(emitter, "task/running/count", statsProvider.getRunningTaskCount()); + emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount()); + return true; + } + + private void emit(ServiceEmitter emitter, String key, Map counts) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + if (counts != null) { + counts.forEach((k, v) -> { + builder.setDimension("dataSource", k); + emitter.emit(builder.build(key, v)); + }); + } + } + +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java new file mode 100644 index 000000000000..add3b7fea121 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import java.util.Map; + +public interface TaskCountStatsProvider +{ + /** + * Return the number of successful tasks for each datasource during emission period. + */ + Map getSuccessfulTaskCount(); + + /** + * Return the number of failed tasks for each datasource during emission period. + */ + Map getFailedTaskCount(); + + /** + * Return the number of current running tasks for each datasource. + */ + Map getRunningTaskCount(); + + /** + * Return the number of current pending tasks for each datasource. + */ + Map getPendingTaskCount(); + +} diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 038ad837c4d6..540722195e8a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -100,6 +100,7 @@ import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.jetty.JettyServerInitUtils; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; +import org.apache.druid.server.metrics.TaskCountStatsProvider; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationUtils; import org.apache.druid.server.security.Authenticator; @@ -170,6 +171,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class); + binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class); binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); binder.bind( From cdc906cdf68a45fc63521a5b6af272eb4ebdb6b5 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Wed, 28 Nov 2018 19:53:53 +0800 Subject: [PATCH 2/4] address comments --- .../java/util/metrics/StubServiceEmitter.java | 2 +- docs/content/operations/metrics.md | 1 + .../druid/indexing/overlord/TaskMaster.java | 11 ++++ .../druid/indexing/overlord/TaskQueue.java | 64 ++++++++++++++----- .../server/metrics/TaskCountStatsMonitor.java | 1 + .../metrics/TaskCountStatsProvider.java | 4 ++ .../metrics/TaskCountStatsMonitorTest.java | 62 ++++++++++++++++++ 7 files changed, 129 insertions(+), 16 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 8a41db527bab..38f715f848aa 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -25,7 +25,7 @@ import java.util.ArrayList; import java.util.List; -class StubServiceEmitter extends ServiceEmitter +public class StubServiceEmitter extends ServiceEmitter { private List events = new ArrayList<>(); diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md index 35642c50c856..c9e8a3a0638e 100644 --- a/docs/content/operations/metrics.md +++ b/docs/content/operations/metrics.md @@ -167,6 +167,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| |`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| +|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.| ## Coordination diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 3d4b0f3fcf92..25fe06fe608a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -311,4 +311,15 @@ public Map getPendingTaskCount() return null; } } + + @Override + public Map getWaitingTaskCount() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getWaitingTaskCount(); + } else { + return null; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 0c33f2580123..ef30f15f2259 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -105,8 +105,8 @@ public class TaskQueue private final Map totalSuccessfulTaskCount = new ConcurrentHashMap<>(); private final Map totalFailedTaskCount = new ConcurrentHashMap<>(); - private Map prevSuccessfulTaskCount = new HashMap<>(); - private Map prevFailedTaskCount = new HashMap<>(); + private Map prevTotalSuccessfulTaskCount = new HashMap<>(); + private Map prevTotalFailedTaskCount = new HashMap<>(); @Inject public TaskQueue( @@ -520,11 +520,11 @@ private void handleStatus(final TaskStatus status) ); if (status.isSuccess()) { - totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()); - totalSuccessfulTaskCount.get(task.getDataSource()).incrementAndGet(); + totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()) + .incrementAndGet(); } else { - totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()); - totalFailedTaskCount.get(task.getDataSource()).incrementAndGet(); + totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong()) + .incrementAndGet(); } } } @@ -604,34 +604,68 @@ private static Map toTaskIDMap(List taskList) private Map getDeltaValues(Map total, Map prev) { - return total.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); + return total.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L))); } public Map getSuccessfulTaskCount() { - Map total = totalSuccessfulTaskCount.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); - Map delta = getDeltaValues(total, prevSuccessfulTaskCount); - prevSuccessfulTaskCount = total; + Map total = totalSuccessfulTaskCount.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().get() + )); + Map delta = getDeltaValues(total, prevTotalSuccessfulTaskCount); + prevTotalSuccessfulTaskCount = total; return delta; } public Map getFailedTaskCount() { - Map total = totalFailedTaskCount.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); - Map delta = getDeltaValues(total, prevFailedTaskCount); - prevFailedTaskCount = total; + Map total = totalFailedTaskCount.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().get() + )); + Map delta = getDeltaValues(total, prevTotalFailedTaskCount); + prevTotalFailedTaskCount = total; return delta; } public Map getRunningTaskCount() { Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); - return taskRunner.getRunningTasks().stream().collect(Collectors.toMap(e -> taskDatasources.getOrDefault(e.getTaskId(), ""), e -> 1L, Long::sum)); + return taskRunner.getRunningTasks() + .stream() + .collect(Collectors.toMap( + e -> taskDatasources.getOrDefault(e.getTaskId(), ""), + e -> 1L, + Long::sum + )); } public Map getPendingTaskCount() { Map taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource)); - return taskRunner.getPendingTasks().stream().collect(Collectors.toMap(e -> taskDatasources.getOrDefault(e.getTaskId(), ""), e -> 1L, Long::sum)); + return taskRunner.getPendingTasks() + .stream() + .collect(Collectors.toMap( + e -> taskDatasources.getOrDefault(e.getTaskId(), ""), + e -> 1L, + Long::sum + )); + } + + public Map getWaitingTaskCount() + { + Set runnerKnownTaskIds = taskRunner.getKnownTasks() + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()); + return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId())) + .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum)); } } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java index 09d4ecaa6709..6c3394e97753 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java @@ -45,6 +45,7 @@ public boolean doMonitor(ServiceEmitter emitter) emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount()); emit(emitter, "task/running/count", statsProvider.getRunningTaskCount()); emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount()); + emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount()); return true; } diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java index add3b7fea121..a96f8ce0623a 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java @@ -43,4 +43,8 @@ public interface TaskCountStatsProvider */ Map getPendingTaskCount(); + /** + * Return the number of current waiting tasks for each datasource. + */ + Map getWaitingTaskCount(); } diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java new file mode 100644 index 000000000000..8ef5121ed4f9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java @@ -0,0 +1,62 @@ +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class TaskCountStatsMonitorTest +{ + private TaskCountStatsProvider statsProvider; + + @Before + public void setUp() + { + statsProvider = new TaskCountStatsProvider() + { + @Override + public Map getSuccessfulTaskCount() + { + return null; + } + + @Override + public Map getFailedTaskCount() + { + return null; + } + + @Override + public Map getRunningTaskCount() + { + return ImmutableMap.of("d1", 1L); + } + + @Override + public Map getPendingTaskCount() + { + return null; + } + + @Override + public Map getWaitingTaskCount() + { + return null; + } + }; + } + + @Test + public void testMonitor() + { + final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider); + final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); + monitor.doMonitor(emitter); + Assert.assertEquals(1, emitter.getEvents().size()); + Assert.assertEquals("task/running/count", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value")); + } +} From 78c0b86daf59316d44602ee1b8f83e60d8e2f66e Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Wed, 28 Nov 2018 20:13:46 +0800 Subject: [PATCH 3/4] add file header --- .../metrics/TaskCountStatsMonitorTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java index 8ef5121ed4f9..6cc620a40dee 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.server.metrics; import com.google.common.collect.ImmutableMap; From bf3e48ed98adeffa2ce40be92f1de0c11e75acae Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Tue, 4 Dec 2018 15:40:14 +0800 Subject: [PATCH 4/4] tweak test --- .../metrics/TaskCountStatsMonitorTest.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java index 6cc620a40dee..24d89ab957a2 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java @@ -39,13 +39,13 @@ public void setUp() @Override public Map getSuccessfulTaskCount() { - return null; + return ImmutableMap.of("d1", 1L); } @Override public Map getFailedTaskCount() { - return null; + return ImmutableMap.of("d1", 1L); } @Override @@ -57,13 +57,13 @@ public Map getRunningTaskCount() @Override public Map getPendingTaskCount() { - return null; + return ImmutableMap.of("d1", 1L); } @Override public Map getWaitingTaskCount() { - return null; + return ImmutableMap.of("d1", 1L); } }; } @@ -74,8 +74,16 @@ public void testMonitor() final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(1, emitter.getEvents().size()); - Assert.assertEquals("task/running/count", emitter.getEvents().get(0).toMap().get("metric")); + Assert.assertEquals(5, emitter.getEvents().size()); + Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric")); Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value")); + Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value")); + Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value")); + Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric")); + Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value")); } }