From 9733ff54a51c0aba100878b24b6b95c7c45cde94 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 15 Jul 2025 15:52:47 +0530 Subject: [PATCH 1/5] Minor clean up of LatchableEmitter / StubServiceEmitter --- .../common/KubernetesPeonClientTest.java | 2 +- .../duty/UnusedSegmentsKillerTest.java | 5 +- .../SeekableStreamSupervisorSpecTest.java | 9 +-- .../SeekableStreamSupervisorStateTest.java | 2 +- .../worker/shuffle/ShuffleMonitorTest.java | 29 +++---- .../util/metrics/CgroupCpuSetMonitorTest.java | 23 ++---- .../util/metrics/CgroupDiskMonitorTest.java | 4 +- .../util/metrics/CgroupMemoryMonitorTest.java | 5 +- .../util/metrics/CgroupV2CpuMonitorTest.java | 5 +- .../util/metrics/CgroupV2DiskMonitorTest.java | 4 +- .../util/metrics/CpuAcctDeltaMonitorTest.java | 4 +- .../metrics/HttpPostEmitterMonitorTest.java | 39 ++++----- .../java/util/metrics/OshiSysMonitorTest.java | 26 +++--- .../java/util/metrics/StubServiceEmitter.java | 14 +++- .../query/CPUTimeMetricQueryRunnerTest.java | 2 +- .../druid/query/DefaultQueryMetricsTest.java | 2 +- .../client/cache/MemcachedCacheTest.java | 12 +-- .../DruidConnectionStateListenerTest.java | 12 +-- .../appenderator/StreamAppenderatorTest.java | 17 ++-- .../server/ClientQuerySegmentWalkerTest.java | 43 +++++----- .../server/audit/SQLAuditManagerTest.java | 19 ++--- .../cache/LookupCoordinatorManagerTest.java | 2 +- .../metrics/GroupByStatsMonitorTest.java | 25 +++--- .../metrics/HistoricalMetricsMonitorTest.java | 73 +++++------------ .../server/metrics/LatchableEmitter.java | 80 ++++++++++--------- .../metrics/QueryCountStatsMonitorTest.java | 41 ++++------ .../metrics/ServiceStatusMonitorTest.java | 27 +++---- .../metrics/TaskCountStatsMonitorTest.java | 2 +- .../TaskSlotCountStatsMonitorTest.java | 2 +- .../WorkerTaskCountStatsMonitorTest.java | 8 +- .../AsyncQueryForwardingServletTest.java | 3 +- 31 files changed, 219 insertions(+), 322 deletions(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index 4d4e1cd9a1be..9c8312179146 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -93,7 +93,7 @@ void test_launchPeonJobAndWaitForStart() Pod peonPod = instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS); Assertions.assertNotNull(peonPod); - Assertions.assertEquals(1, serviceEmitter.getEvents().size()); + Assertions.assertEquals(1, serviceEmitter.getNumEmittedEvents()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index 111489a74dc5..97c5f6bc1b98 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -52,7 +52,6 @@ import org.junit.Test; import java.util.List; -import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; @@ -329,8 +328,8 @@ public void test_run_prioritizesOlderIntervals() emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); // Verify that the kill intervals are sorted with the oldest interval first - final Queue events = - emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION); + final List events = + emitter.getMetricEvents(TaskMetrics.RUN_DURATION); final List killIntervals = events.stream().map(event -> { final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID); String[] splits = taskId.split("_"); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index cfd49994262d..a663c7c19aa2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -780,8 +780,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc Assert.assertEquals(2, taskCountAfterScaleOut); Assert.assertTrue( dynamicActionEmitter - .getMetricEvents() - .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) + .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) .stream() .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) .filter(Objects::nonNull) @@ -840,8 +839,7 @@ public int getActiveTaskGroupsCount() Assert.assertTrue( dynamicActionEmitter - .getMetricEvents() - .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) + .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) .stream() .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) .filter(Objects::nonNull) @@ -1103,8 +1101,7 @@ public int getActiveTaskGroupsCount() Assert.assertTrue( dynamicActionEmitter - .getMetricEvents() - .get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) + .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) .stream() .map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)) .filter(Objects::nonNull) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 4ae5a44c1c8f..feba656ee63f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2521,7 +2521,7 @@ public void testStaleOffsetsNegativeLagNotEmitted() throws Exception latch.await(); supervisor.emitLag(); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); } private void validateSupervisorStateAfterResetOffsets( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java index 1174bc842ede..6ee98a8d309c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleMonitorTest.java @@ -21,14 +21,12 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics; -import org.apache.druid.java.util.emitter.core.Event; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; -import java.util.List; +import java.util.Map; public class ShuffleMonitorTest { @@ -46,23 +44,16 @@ public void testDoMonitor() final ShuffleMonitor monitor = new ShuffleMonitor(); monitor.setShuffleMetrics(shuffleMetrics); Assert.assertTrue(monitor.doMonitor(emitter)); - final List events = emitter.getEvents(); - Assert.assertEquals(2, events.size()); - Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass()); - ServiceMetricEvent event = (ServiceMetricEvent) events.get(0); - Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric()); - Assert.assertEquals(310L, event.getValue()); - Assert.assertEquals( - ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"), - event.getUserDims() + Assert.assertEquals(2, emitter.getNumEmittedEvents()); + emitter.verifyValue( + ShuffleMonitor.SHUFFLE_BYTES_KEY, + Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"), + 310L ); - Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass()); - event = (ServiceMetricEvent) events.get(1); - Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, event.getMetric()); - Assert.assertEquals(3, event.getValue()); - Assert.assertEquals( - ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"), - event.getUserDims() + emitter.verifyValue( + ShuffleMonitor.SHUFFLE_REQUESTS_KEY, + Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"), + 3 ); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java index 027bd995b339..b18aa138fa55 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupCpuSetMonitorTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.TestUtils; @@ -34,8 +33,6 @@ import java.io.File; import java.io.IOException; -import java.util.List; -import java.util.Map; public class CgroupCpuSetMonitorTest { @@ -72,19 +69,11 @@ public void testMonitor() final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, ImmutableMap.of(), "some_feed"); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - final List actualEvents = emitter.getEvents(); - Assert.assertEquals(4, actualEvents.size()); - final Map cpusEvent = actualEvents.get(0).toMap(); - final Map effectiveCpusEvent = actualEvents.get(1).toMap(); - final Map memsEvent = actualEvents.get(2).toMap(); - final Map effectiveMemsEvent = actualEvents.get(3).toMap(); - Assert.assertEquals("cgroup/cpuset/cpu_count", cpusEvent.get("metric")); - Assert.assertEquals(8, cpusEvent.get("value")); - Assert.assertEquals("cgroup/cpuset/effective_cpu_count", effectiveCpusEvent.get("metric")); - Assert.assertEquals(7, effectiveCpusEvent.get("value")); - Assert.assertEquals("cgroup/cpuset/mems_count", memsEvent.get("metric")); - Assert.assertEquals(4, memsEvent.get("value")); - Assert.assertEquals("cgroup/cpuset/effective_mems_count", effectiveMemsEvent.get("metric")); - Assert.assertEquals(1, effectiveMemsEvent.get("value")); + Assert.assertEquals(4, emitter.getNumEmittedEvents()); + + emitter.verifyValue("cgroup/cpuset/cpu_count", 8); + emitter.verifyValue("cgroup/cpuset/effective_cpu_count", 7); + emitter.verifyValue("cgroup/cpuset/mems_count", 4); + emitter.verifyValue("cgroup/cpuset/effective_mems_count", 1); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java index 21a834587292..d75af320f125 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupDiskMonitorTest.java @@ -68,13 +68,13 @@ public void testMonitor() throws IOException final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed"); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2", serviceBytesFile); TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2", servicedFile); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(8, emitter.getEvents().size()); + Assert.assertEquals(8, emitter.getNumEmittedEvents()); Assert.assertTrue( emitter .getEvents() diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java index 7827368ec9f5..6538cb9691a0 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupMemoryMonitorTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.TestUtils; @@ -34,7 +33,6 @@ import java.io.File; import java.io.IOException; -import java.util.List; public class CgroupMemoryMonitorTest { @@ -71,7 +69,6 @@ public void testMonitor() final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed"); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - final List actualEvents = emitter.getEvents(); - Assert.assertEquals(46, actualEvents.size()); + Assert.assertEquals(46, emitter.getNumEmittedEvents()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java index 7006de1edb08..07682643148a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2CpuMonitorTest.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.metrics; import com.google.common.collect.ImmutableSet; -import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer; import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer; import org.apache.druid.java.util.metrics.cgroups.TestUtils; @@ -33,7 +32,6 @@ import java.io.File; import java.io.IOException; -import java.util.List; import java.util.stream.Collectors; public class CgroupV2CpuMonitorTest @@ -65,8 +63,7 @@ public void testMonitor() throws IOException, InterruptedException final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - final List actualEvents = emitter.getEvents(); - Assert.assertEquals(0, actualEvents.size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); emitter.flush(); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java index 40201a30feb0..f9a977edd92e 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CgroupV2DiskMonitorTest.java @@ -61,14 +61,14 @@ public void testMonitor() throws IOException final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); emitter.flush(); TestUtils.copyOrReplaceResource("/cgroupv2/io.stat-2", statFile); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(4, emitter.getEvents().size()); + Assert.assertEquals(4, emitter.getNumEmittedEvents()); Assert.assertTrue( emitter .getEvents() diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java index 97dd707299e1..94cb1d9b3aa5 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/CpuAcctDeltaMonitorTest.java @@ -94,10 +94,10 @@ public void testSimpleMonitor() throws Exception final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); Assert.assertFalse(monitor.doMonitor(emitter)); // First should just cache - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); Assert.assertTrue(cpuacct.delete()); TestUtils.copyResource("/cpuacct.usage_all", cpuacct); Assert.assertTrue(monitor.doMonitor(emitter)); - Assert.assertEquals(2 * 128 + 1, emitter.getEvents().size()); + Assert.assertEquals(2 * 128 + 1, emitter.getNumEmittedEvents()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java index 2be52c091e78..819c9ba50d76 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java @@ -25,10 +25,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Map; -import java.util.Queue; - -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -65,26 +61,19 @@ public void testDoMonitor() assertTrue(monitor.doMonitor(stubServiceEmitter)); - final Map> metricEvents = stubServiceEmitter.getMetricEvents(); - - assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0); - assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L); - assertMetricValue(metricEvents, "emitter/successfulSending/minTimeMs", 0); - assertMetricValue(metricEvents, "emitter/buffers/emitQueue", 30); - assertMetricValue(metricEvents, "emitter/failedSending/minTimeMs", 0); - assertMetricValue(metricEvents, "emitter/buffers/allocated/delta", 20); - assertMetricValue(metricEvents, "emitter/batchFilling/maxTimeMs", 0); - assertMetricValue(metricEvents, "emitter/buffers/dropped/delta", 10); - assertMetricValue(metricEvents, "emitter/batchFilling/minTimeMs", 0); - assertMetricValue(metricEvents, "emitter/events/emitQueue", 200L); - assertMetricValue(metricEvents, "emitter/events/large/emitQueue", 75L); - assertMetricValue(metricEvents, "emitter/buffers/reuseQueue", 15); - assertMetricValue(metricEvents, "emitter/buffers/failed/delta", 5); - assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L); - } - - private void assertMetricValue(Map> metricEvents, String metricName, Number expectedValue) - { - assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue()); + stubServiceEmitter.verifyValue("emitter/successfulSending/maxTimeMs", 0); + stubServiceEmitter.verifyValue("emitter/events/emitted/delta", 100L); + stubServiceEmitter.verifyValue("emitter/successfulSending/minTimeMs", 0); + stubServiceEmitter.verifyValue("emitter/buffers/emitQueue", 30); + stubServiceEmitter.verifyValue("emitter/failedSending/minTimeMs", 0); + stubServiceEmitter.verifyValue("emitter/buffers/allocated/delta", 20); + stubServiceEmitter.verifyValue("emitter/batchFilling/maxTimeMs", 0); + stubServiceEmitter.verifyValue("emitter/buffers/dropped/delta", 10); + stubServiceEmitter.verifyValue("emitter/batchFilling/minTimeMs", 0); + stubServiceEmitter.verifyValue("emitter/events/emitQueue", 200L); + stubServiceEmitter.verifyValue("emitter/events/large/emitQueue", 75L); + stubServiceEmitter.verifyValue("emitter/buffers/reuseQueue", 15); + stubServiceEmitter.verifyValue("emitter/buffers/failed/delta", 5); + stubServiceEmitter.verifyValue("emitter/failedSending/maxTimeMs", 0); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java index ece54cd3fa64..c31ee8ea7763 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/OshiSysMonitorTest.java @@ -107,7 +107,7 @@ public void testMemStats() m.start(); m.monitorMemStats(emitter); m.stop(); - Assert.assertEquals(3, emitter.getEvents().size()); + Assert.assertEquals(3, emitter.getNumEmittedEvents()); emitter.verifyEmitted("sys/mem/max", 1); emitter.verifyEmitted("sys/mem/used", 1); emitter.verifyEmitted("sys/mem/free", 1); @@ -129,7 +129,7 @@ public void testMemStatsSkipOthers() m.start(); m.doMonitor(emitter); m.stop(); - Assert.assertEquals(3, emitter.getEvents().size()); + Assert.assertEquals(3, emitter.getNumEmittedEvents()); emitter.verifyEmitted("sys/mem/max", 1); emitter.verifyEmitted("sys/mem/used", 1); emitter.verifyEmitted("sys/mem/free", 1); @@ -153,7 +153,7 @@ public void testSwapStats() OshiSysMonitor m = createMonitor(si); m.start(); m.monitorSwapStats(emitter); - Assert.assertEquals(4, emitter.getEvents().size()); + Assert.assertEquals(4, emitter.getNumEmittedEvents()); emitter.verifyEmitted("sys/swap/pageIn", 1); emitter.verifyEmitted("sys/swap/pageOut", 1); emitter.verifyEmitted("sys/swap/max", 1); @@ -201,7 +201,7 @@ public void testFsStats() OshiSysMonitor m = createMonitor(si); m.start(); m.monitorFsStats(emitter); - Assert.assertEquals(8, emitter.getEvents().size()); + Assert.assertEquals(8, emitter.getNumEmittedEvents()); emitter.verifyEmitted("sys/fs/max", 2); emitter.verifyEmitted("sys/fs/used", 2); emitter.verifyEmitted("sys/fs/files/count", 2); @@ -272,7 +272,7 @@ public void testDiskStats() OshiSysMonitor m = createMonitor(si); m.start(); m.monitorDiskStats(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); Mockito.when(disk1.getReadBytes()).thenReturn(400L); Mockito.when(disk1.getReads()).thenReturn(220L); @@ -288,7 +288,7 @@ public void testDiskStats() Mockito.when(disk2.getTransferTime()).thenReturn(1100L); m.monitorDiskStats(emitter); - Assert.assertEquals(12, emitter.getEvents().size()); + Assert.assertEquals(12, emitter.getNumEmittedEvents()); Map userDims1 = ImmutableMap.of( "diskName", @@ -362,7 +362,7 @@ public void testNetStats() OshiSysMonitor m = createMonitor(si); m.start(); m.monitorNetStats(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); Mockito.when(net1.getBytesRecv()).thenReturn(400L); Mockito.when(net1.getPacketsRecv()).thenReturn(220L); @@ -375,7 +375,7 @@ public void testNetStats() m.monitorNetStats(emitter); - Assert.assertEquals(16, emitter.getEvents().size()); // 8 * 2 whitelisted ips + Assert.assertEquals(16, emitter.getNumEmittedEvents()); // 8 * 2 whitelisted ips Map userDims1 = ImmutableMap.of( "netName", @@ -460,7 +460,7 @@ public void testCpuStats() OshiSysMonitor m = createMonitor(si); m.start(); m.monitorCpuStats(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); long[][] procTicks2 = new long[][]{ {4L, 5L, 6L, 8L, 9L, 7L, 10L, 12L}, // Δtick1 {3,3,3,4,4,1,3,4} _total = 25, emitted percentage @@ -470,7 +470,7 @@ public void testCpuStats() m.monitorCpuStats(emitter); m.stop(); - Assert.assertEquals(16, emitter.getEvents().size()); // 8 ticktype * 2 processors + Assert.assertEquals(16, emitter.getNumEmittedEvents()); // 8 ticktype * 2 processors Map userDims = new HashMap<>(); userDims.put("cpuName", "0"); @@ -557,7 +557,7 @@ public void testSysStats() OshiSysMonitor m = createMonitor(si); m.start(); m.monitorSysStats(emitter); - Assert.assertEquals(4, emitter.getEvents().size()); + Assert.assertEquals(4, emitter.getNumEmittedEvents()); m.stop(); emitter.verifyEmitted("sys/uptime", 1); emitter.verifyEmitted("sys/la/1", 1); @@ -592,7 +592,7 @@ public void testTcpStats() m.start(); m.monitorTcpStats(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); Mockito.when(tcpv4.getConnectionsActive()).thenReturn(20L); Mockito.when(tcpv4.getConnectionsPassive()).thenReturn(25L); Mockito.when(tcpv4.getConnectionFailures()).thenReturn(8L); @@ -604,7 +604,7 @@ public void testTcpStats() Mockito.when(tcpv4.getSegmentsRetransmitted()).thenReturn(8L); m.monitorTcpStats(emitter); m.stop(); - Assert.assertEquals(9, emitter.getEvents().size()); + Assert.assertEquals(9, emitter.getNumEmittedEvents()); emitter.verifyValue("sys/tcpv4/activeOpens", 10L); emitter.verifyValue("sys/tcpv4/passiveOpens", 5L); emitter.verifyValue("sys/tcpv4/attemptFails", 3L); diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 55113b97ac20..c1a5413513c5 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -74,14 +74,20 @@ public List getEvents() return new ArrayList<>(events); } + public int getNumEmittedEvents() + { + return events.size(); + } + /** - * Gets all the metric events emitted since the previous {@link #flush()}. + * Gets all the metric events emitted for the given metric name since the previous {@link #flush()}. * - * @return Map from metric name to list of events emitted for that metric. + * @return List of events emitted for the given metric. */ - public Map> getMetricEvents() + public List getMetricEvents(String metricName) { - return metricEvents; + final Queue metricEventQueue = metricEvents.get(metricName); + return metricEventQueue == null ? List.of() : List.copyOf(metricEventQueue); } /** diff --git a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java index 4647704fe279..32d9852d0c1b 100644 --- a/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/CPUTimeMetricQueryRunnerTest.java @@ -69,7 +69,7 @@ public void testCpuTimeMetric() ); Assert.assertEquals(expectedResults, results.toList()); - Assert.assertEquals(1, emitter.getEvents().size()); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); final Event event = Iterables.getOnlyElement(emitter.getEvents()); diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index ce7481c4e4a9..f7cd6887e7e0 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -138,7 +138,7 @@ public static void testQueryMetricsDefaultMetricNamesAndUnits( // Verify that Queried Segment Count does not get emitted by the DefaultQueryMetrics // and the total number of emitted metrics remains unchanged queryMetrics.reportQueriedSegmentCount(25).emit(serviceEmitter); - Assert.assertEquals(10, serviceEmitter.getEvents().size()); + Assert.assertEquals(10, serviceEmitter.getNumEmittedEvents()); } @Test diff --git a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java index f87c7b679cc5..e8209a444cb8 100644 --- a/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java +++ b/server/src/test/java/org/apache/druid/client/cache/MemcachedCacheTest.java @@ -19,7 +19,6 @@ package org.apache.druid.client.cache; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -50,11 +49,9 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.Initialization; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.AbstractMonitor; import org.apache.druid.java.util.metrics.StubServiceEmitter; @@ -206,16 +203,13 @@ public void testMonitor() throws Exception final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig); final StubServiceEmitter serviceEmitter = new StubServiceEmitter("service", "host"); - while (serviceEmitter.getEvents().isEmpty()) { + while (serviceEmitter.getNumEmittedEvents() <= 0) { Thread.sleep(memcachedCacheConfig.getTimeout()); cache.doMonitor(serviceEmitter); } - Assert.assertFalse(serviceEmitter.getEvents().isEmpty()); - ObjectMapper mapper = new DefaultObjectMapper(); - for (Event event : serviceEmitter.getEvents()) { - log.debug("Found event `%s`", mapper.writeValueAsString(event.toMap())); - } + Assert.assertTrue(serviceEmitter.getNumEmittedEvents() > 0); + Assert.assertFalse(serviceEmitter.getMetricEvents("query/cache/memcached/total").isEmpty()); } @Test diff --git a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java index 86ff59c25ae1..4a9b7a4e36fc 100644 --- a/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java +++ b/server/src/test/java/org/apache/druid/curator/DruidConnectionStateListenerTest.java @@ -60,7 +60,7 @@ public void test_isConnected() public void test_doMonitor_init() { listener.doMonitor(emitter); - Assert.assertEquals(1, emitter.getEvents().size()); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); emitter.verifyValue("zk/connected", 0); } @@ -69,7 +69,7 @@ public void test_doMonitor_connected() { listener.stateChanged(null, ConnectionState.CONNECTED); listener.doMonitor(emitter); - Assert.assertEquals(1, emitter.getEvents().size()); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); emitter.verifyValue("zk/connected", 1); } @@ -79,7 +79,7 @@ public void test_doMonitor_notConnected() { listener.stateChanged(null, ConnectionState.SUSPENDED); listener.doMonitor(emitter); - Assert.assertEquals(2, emitter.getEvents().size()); // 2 because stateChanged emitted an alert + Assert.assertEquals(2, emitter.getNumEmittedEvents()); // 2 because stateChanged emitted an alert emitter.verifyValue("zk/connected", 0); } @@ -88,7 +88,7 @@ public void test_doMonitor_notConnected() public void test_suspendedAlert() { listener.stateChanged(null, ConnectionState.SUSPENDED); - Assert.assertEquals(1, emitter.getEvents().size()); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); final AlertEvent alert = emitter.getAlerts().get(0); Assert.assertEquals("alerts", alert.getFeed()); @@ -99,10 +99,10 @@ public void test_suspendedAlert() public void test_reconnectedMetric() { listener.stateChanged(null, ConnectionState.SUSPENDED); - Assert.assertEquals(1, emitter.getEvents().size()); // the first stateChanged emits an alert + Assert.assertEquals(1, emitter.getNumEmittedEvents()); // the first stateChanged emits an alert listener.stateChanged(null, ConnectionState.RECONNECTED); - Assert.assertEquals(2, emitter.getEvents().size()); // the second stateChanged emits a metric + Assert.assertEquals(2, emitter.getNumEmittedEvents()); // the second stateChanged emits a metric long observedReconnectTime = emitter.getValue("zk/reconnect/time", null).longValue(); Assert.assertTrue(observedReconnectTime >= 0); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index 463c4ed8c535..f7e2bb0b57e8 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -78,7 +78,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -2279,17 +2278,15 @@ public void testQueryBySegments() throws Exception private void verifySinkMetrics(StubServiceEmitter emitter, Set segmentIds) { - Map> events = emitter.getMetricEvents(); int segments = segmentIds.size(); - Assert.assertEquals(4, events.size()); - Assert.assertTrue(events.containsKey("query/cpu/time")); - Assert.assertEquals(segments, events.get("query/segment/time").size()); - Assert.assertEquals(segments, events.get("query/segmentAndCache/time").size()); - Assert.assertEquals(segments, events.get("query/wait/time").size()); + emitter.verifyEmitted("query/cpu/time", 1); + Assert.assertEquals(segments, emitter.getMetricEvents("query/segment/time").size()); + Assert.assertEquals(segments, emitter.getMetricEvents("query/segmentAndCache/time").size()); + Assert.assertEquals(segments, emitter.getMetricEvents("query/wait/time").size()); for (String id : segmentIds) { - Assert.assertTrue(events.get("query/segment/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); - Assert.assertTrue(events.get("query/segmentAndCache/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); - Assert.assertTrue(events.get("query/wait/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(emitter.getMetricEvents("query/segment/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(emitter.getMetricEvents("query/segmentAndCache/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); + Assert.assertTrue(emitter.getMetricEvents("query/wait/time").stream().anyMatch(value -> value.getUserDims().containsValue(id))); } } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 484ae809fc07..e8ed5e3bf5a2 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.core.EventMap; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; @@ -987,10 +986,11 @@ public void testMetricsWithMaxSubqueryRowsEnabled() ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) ); - List events = emitter.getEvents(); + List events = + emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC); - for (Event event : events) { - EventMap map = event.toMap(); + for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) { + EventMap map = event.getMetricEvent().toMap(); if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { Assert.assertTrue(map.containsKey("host")); Assert.assertTrue(map.containsKey("service")); @@ -1038,23 +1038,26 @@ public void testMetricsWithMaxSubqueryBytesEnabled() ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) ); - List events = emitter.getEvents(); + List events + = emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC); - for (Event event : events) { - EventMap map = event.toMap(); - if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { - Assert.assertTrue(map.containsKey("host")); - Assert.assertTrue(map.containsKey("service")); - Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); - Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); - Assert.assertEquals(3, map.get("value")); - } else if (ClientQuerySegmentWalker.BYTES_COUNT_METRIC.equals(map.get("metric"))) { - Assert.assertTrue(map.containsKey("host")); - Assert.assertTrue(map.containsKey("service")); - Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); - Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); - Assert.assertEquals(43L, map.get("value")); - } + for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) { + EventMap map = event.getMetricEvent().toMap(); + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(3, map.get("value")); + } + + events = emitter.getMetricEvents(ClientQuerySegmentWalker.BYTES_COUNT_METRIC); + for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) { + EventMap map = event.getMetricEvent().toMap(); + Assert.assertTrue(map.containsKey("host")); + Assert.assertTrue(map.containsKey("service")); + Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); + Assert.assertEquals("1.1", map.get(DruidMetrics.SUBQUERY_ID)); + Assert.assertEquals(43L, map.get("value")); } } diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index 3505eb943a29..5bbf84c322dd 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.TestDerbyConnector; import org.joda.time.DateTime; @@ -43,7 +42,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.TreeMap; @RunWith(MockitoJUnitRunner.class) @@ -92,14 +90,11 @@ public void testAuditMetricEventWithPayload() throws IOException final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc()); auditManager.doAudit(entry); - Map> metricEvents = serviceEmitter.getMetricEvents(); - Assert.assertEquals(1, metricEvents.size()); - - Queue auditMetricEvents = metricEvents.get("config/audit"); - Assert.assertNotNull(auditMetricEvents); + List auditMetricEvents + = serviceEmitter.getMetricEvents("config/audit"); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent(); + StubServiceEmitter.ServiceMetricEventSnapshot metric = auditMetricEvents.get(0); final AuditEntry dbEntry = lookupAuditEntryForKey("testKey"); Assert.assertNotNull(dbEntry); @@ -121,14 +116,12 @@ public void testCreateAuditEntry() throws IOException Assert.assertEquals(entry, dbEntry); // Verify emitted metrics - Map> metricEvents = serviceEmitter.getMetricEvents(); - Assert.assertEquals(1, metricEvents.size()); - - Queue auditMetricEvents = metricEvents.get("config/audit"); + List auditMetricEvents + = serviceEmitter.getMetricEvents("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent(); + StubServiceEmitter.ServiceMetricEventSnapshot metric = auditMetricEvents.get(0); Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key")); Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type")); Assert.assertNull(metric.getUserDims().get("payload")); diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java index c94d8e436683..735d19bd95f1 100644 --- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManagerTest.java @@ -142,7 +142,7 @@ public void setUp() @After public void tearDown() { - Assert.assertEquals(0, SERVICE_EMITTER.getEvents().size()); + Assert.assertEquals(0, SERVICE_EMITTER.getNumEmittedEvents()); SERVICE_EMITTER.flush(); } diff --git a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java index f16acd0b060d..aa5955cd914f 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/GroupByStatsMonitorTest.java @@ -31,13 +31,11 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; public class GroupByStatsMonitorTest { @@ -83,20 +81,15 @@ public void testMonitor() emitter.flush(); // Trigger metric emission monitor.doMonitor(emitter); - Map resultMap = emitter.getEvents() - .stream() - .collect(Collectors.toMap( - event -> (String) event.toMap().get("metric"), - event -> (Long) event.toMap().get("value") - )); - Assert.assertEquals(7, resultMap.size()); - Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests")); - Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/used")); - Assert.assertEquals(1, (long) resultMap.get("mergeBuffer/queries")); - Assert.assertEquals(100, (long) resultMap.get("mergeBuffer/acquisitionTimeNs")); - Assert.assertEquals(2, (long) resultMap.get("groupBy/spilledQueries")); - Assert.assertEquals(200, (long) resultMap.get("groupBy/spilledBytes")); - Assert.assertEquals(300, (long) resultMap.get("groupBy/mergeDictionarySize")); + + Assert.assertEquals(7, emitter.getNumEmittedEvents()); + emitter.verifyValue("mergeBuffer/pendingRequests", 0L); + emitter.verifyValue("mergeBuffer/used", 0L); + emitter.verifyValue("mergeBuffer/queries", 1L); + emitter.verifyValue("mergeBuffer/acquisitionTimeNs", 100L); + emitter.verifyValue("groupBy/spilledQueries", 2L); + emitter.verifyValue("groupBy/spilledBytes", 200L); + emitter.verifyValue("groupBy/mergeDictionarySize", 300L); } @Test diff --git a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java index 92c2a1064e77..05865e0a2dfb 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -23,18 +23,15 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DruidServerConfig; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.SegmentLoadDropHandler; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.List; import java.util.Map; public class HistoricalMetricsMonitorTest extends EasyMockSupport @@ -95,54 +92,26 @@ public void testSimple() monitor.doMonitor(serviceEmitter); EasyMock.verify(druidServerConfig, segmentManager, segmentLoadDropMgr); - final List events = serviceEmitter.getEvents(); - - Assert.assertEquals(ImmutableMap.of( - "metric", "segment/max", - "value", maxSize - ), asMap(events.get(0))); - - Assert.assertEquals(ImmutableMap.of( - "dataSource", dataSource, - "metric", "segment/pendingDelete", - "priority", String.valueOf(priority), - "tier", tier, - "value", dataSegment.getSize() - ), asMap(events.get(1))); - - Assert.assertEquals(ImmutableMap.of( - "metric", "segment/used", - "value", dataSegment.getSize(), - "tier", tier, - "priority", String.valueOf(priority), - "dataSource", dataSource - ), asMap(events.get(2))); - - Assert.assertEquals(ImmutableMap.of( - "metric", "segment/usedPercent", - "value", dataSegment.getSize() * 1.0D / maxSize, - "tier", tier, - "priority", String.valueOf(priority), - "dataSource", dataSource - ), asMap(events.get(3))); - - Assert.assertEquals(ImmutableMap.of( - "metric", "segment/count", - "value", 1L, - "tier", tier, - "priority", String.valueOf(priority), - "dataSource", dataSource - ), asMap(events.get(4))); - } - - private Map asMap(Event event) - { - final Map map = event.toMap(); - Assert.assertNotNull(map.remove("feed")); - Assert.assertNotNull(map.remove("timestamp")); - Assert.assertNotNull(map.remove("service")); - Assert.assertNotNull(map.remove("host")); - - return map; + serviceEmitter.verifyValue("segment/max", maxSize); + serviceEmitter.verifyValue( + "segment/pendingDelete", + Map.of("tier", tier, "dataSource", dataSource, "priority", String.valueOf(priority)), + dataSegment.getSize() + ); + serviceEmitter.verifyValue( + "segment/used", + Map.of("tier", tier, "priority", String.valueOf(priority), "dataSource", dataSource), + dataSegment.getSize() + ); + serviceEmitter.verifyValue( + "segment/usedPercent", + Map.of("tier", tier, "priority", String.valueOf(priority), "dataSource", dataSource), + dataSegment.getSize() * 1.0D / maxSize + ); + serviceEmitter.verifyValue( + "segment/count", + Map.of("tier", tier, "priority", String.valueOf(priority), "dataSource", dataSource), + 1L + ); } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index aeed3c40ea96..c7d8f1010cdd 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -37,7 +37,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.function.UnaryOperator; @@ -60,7 +60,13 @@ public class LatchableEmitter extends StubServiceEmitter */ private final ScheduledExecutorService conditionEvaluateExecutor; private final Set waitConditions = new HashSet<>(); - private final ReentrantReadWriteLock eventReadWriteLock = new ReentrantReadWriteLock(true); + + private final ReentrantLock eventProcessingLock = new ReentrantLock(); + + /** + * Lists of events that have already been processed by {@link #evaluateWaitConditions(Event)}. + */ + private final List processedEvents = new ArrayList<>(); /** * Creates a {@link StubServiceEmitter} that may be used in embedded tests. @@ -75,27 +81,7 @@ public LatchableEmitter(String service, String host, ScheduledExecutorFactory ex public void emit(Event event) { super.emit(event); - triggerConditionEvaluations(); - } - - @Override - public void flush() - { - // flush() or close() is typically not called in tests until the test is complete - // but acquire a lock all the same for the sake of completeness - eventReadWriteLock.writeLock().lock(); - try { - super.flush(); - } - finally { - eventReadWriteLock.writeLock().unlock(); - } - } - - @Override - public void close() - { - flush(); + triggerConditionEvaluations(event); } /** @@ -107,9 +93,9 @@ public void close() public void waitForEvent(Predicate condition, long timeoutMillis) { final WaitCondition waitCondition = new WaitCondition(condition); + registerWaitCondition(waitCondition); waitConditions.add(waitCondition); - triggerConditionEvaluations(); try { final long awaitTime = timeoutMillis >= 0 ? timeoutMillis : Long.MAX_VALUE; if (!waitCondition.countDownLatch.await(awaitTime, TimeUnit.MILLISECONDS)) { @@ -158,12 +144,12 @@ public void waitForEventAggregate( ); } - private void triggerConditionEvaluations() + private void triggerConditionEvaluations(Event event) { if (conditionEvaluateExecutor == null) { throw new ISE("Cannot evaluate conditions as the 'conditionEvaluateExecutor' is null."); } else { - conditionEvaluateExecutor.submit(this::evaluateWaitConditions); + conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event)); } } @@ -171,9 +157,9 @@ private void triggerConditionEvaluations() * Evaluates wait conditions. This method must be invoked on the * {@link #conditionEvaluateExecutor} so that it does not block {@link #emit(Event)}. */ - private void evaluateWaitConditions() + private void evaluateWaitConditions(Event event) { - eventReadWriteLock.readLock().lock(); + eventProcessingLock.lock(); try { // Create a copy of the conditions for thread-safety final List conditionsToEvaluate = List.copyOf(waitConditions); @@ -181,25 +167,41 @@ private void evaluateWaitConditions() return; } - List events = getEvents(); for (WaitCondition condition : conditionsToEvaluate) { - final int currentNumberOfEvents = events.size(); - - // Do not use an iterator over the list to avoid concurrent modification exceptions - // Evaluate new events against this condition - for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) { - if (condition.predicate.test(events.get(i))) { - condition.countDownLatch.countDown(); - } + if (condition.predicate.test(event)) { + condition.countDownLatch.countDown(); } - condition.processedUntil = currentNumberOfEvents; } } catch (Exception e) { log.error(e, "Error while evaluating wait conditions"); } finally { - eventReadWriteLock.readLock().unlock(); + processedEvents.add(event); + eventProcessingLock.unlock(); + } + } + + /** + * Evaluates the given new condition for all past events and then adds it to + * {@link #waitConditions}. + */ + private void registerWaitCondition(WaitCondition condition) + { + eventProcessingLock.lock(); + try { + for (Event event : processedEvents) { + if (condition.predicate.test(event)) { + condition.countDownLatch.countDown(); + } + } + waitConditions.add(condition); + } + catch (Exception e) { + throw new ISE(e, "Error while evaluating condition"); + } + finally { + eventProcessingLock.unlock(); } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java index 58532d290d25..6f6377740a78 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/QueryCountStatsMonitorTest.java @@ -31,10 +31,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.stream.Collectors; public class QueryCountStatsMonitorTest { @@ -111,18 +109,12 @@ public void testMonitor() emitter.flush(); // Trigger metric emission monitor.doMonitor(emitter); - Map resultMap = emitter.getEvents() - .stream() - .collect(Collectors.toMap( - event -> (String) event.toMap().get("metric"), - event -> (Long) event.toMap().get("value") - )); - Assert.assertEquals(5, resultMap.size()); - Assert.assertEquals(1L, (long) resultMap.get("query/success/count")); - Assert.assertEquals(2L, (long) resultMap.get("query/failed/count")); - Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count")); - Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count")); - Assert.assertEquals(10L, (long) resultMap.get("query/count")); + Assert.assertEquals(5, emitter.getNumEmittedEvents()); + emitter.verifyValue("query/success/count", 1L); + emitter.verifyValue("query/failed/count", 2L); + emitter.verifyValue("query/interrupted/count", 3L); + emitter.verifyValue("query/timeout/count", 4L); + emitter.verifyValue("query/count", 10L); } @Test @@ -137,18 +129,13 @@ public void testMonitor_emitPendingRequests() emitter.flush(); // Trigger metric emission monitor.doMonitor(emitter); - Map resultMap = emitter.getEvents() - .stream() - .collect(Collectors.toMap( - event -> (String) event.toMap().get("metric"), - event -> (Long) event.toMap().get("value") - )); - Assert.assertEquals(6, resultMap.size()); - Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests")); - Assert.assertEquals(1L, (long) resultMap.get("query/success/count")); - Assert.assertEquals(2L, (long) resultMap.get("query/failed/count")); - Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count")); - Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count")); - Assert.assertEquals(10L, (long) resultMap.get("query/count")); + + Assert.assertEquals(6, emitter.getNumEmittedEvents()); + emitter.verifyValue("mergeBuffer/pendingRequests", 0L); + emitter.verifyValue("query/success/count", 1L); + emitter.verifyValue("query/failed/count", 2L); + emitter.verifyValue("query/interrupted/count", 3L); + emitter.verifyValue("query/timeout/count", 4L); + emitter.verifyValue("query/count", 10L); } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java index 88acb6dca269..748672a12b4c 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/ServiceStatusMonitorTest.java @@ -32,13 +32,15 @@ public class ServiceStatusMonitorTest { private ServiceStatusMonitor monitor; + private StubServiceEmitter emitter; private Map heartbeatTags; - private Supplier> heartbeatTagsSupplier = () -> heartbeatTags; - private static String HEARTBEAT_METRIC_KEY = "service/heartbeat"; + private final Supplier> heartbeatTagsSupplier = () -> heartbeatTags; + private static final String HEARTBEAT_METRIC_KEY = "service/heartbeat"; @Before public void setUp() { + emitter = new StubServiceEmitter(); monitor = new ServiceStatusMonitor(); heartbeatTags = new HashMap<>(); monitor.heartbeatTagsSupplier = heartbeatTagsSupplier; @@ -47,23 +49,18 @@ public void setUp() @Test public void testDefaultHeartbeatReported() { - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(1, emitter.getEvents().size()); - Assert.assertEquals(HEARTBEAT_METRIC_KEY, emitter.getEvents().get(0).toMap().get("metric")); - Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); + emitter.verifyValue(HEARTBEAT_METRIC_KEY, 1); } @Test public void testLeaderTag() { heartbeatTags.put("leader", 1); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(1, emitter.getEvents().size()); - Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader")); - Assert.assertEquals(HEARTBEAT_METRIC_KEY, emitter.getEvents().get(0).toMap().get("metric")); - Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); + emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1), 1); } @Test @@ -71,12 +68,8 @@ public void testMoreThanOneTag() { heartbeatTags.put("leader", 1); heartbeatTags.put("taskRunner", "http"); - final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(1, emitter.getEvents().size()); - Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("leader")); - Assert.assertEquals("http", emitter.getEvents().get(0).toMap().get("taskRunner")); - Assert.assertEquals(HEARTBEAT_METRIC_KEY, emitter.getEvents().get(0).toMap().get("metric")); - Assert.assertEquals(1, emitter.getEvents().get(0).toMap().get("value")); + Assert.assertEquals(1, emitter.getNumEmittedEvents()); + emitter.verifyValue(HEARTBEAT_METRIC_KEY, Map.of("leader", 1, "taskRunner", "http"), 1); } } diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java index 93687bc1be8d..2489d0334114 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java @@ -94,7 +94,7 @@ public void testMonitor() final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(9, emitter.getEvents().size()); + Assert.assertEquals(9, emitter.getNumEmittedEvents()); emitter.verifyValue("task/success/count", Map.of("dataSource", "d1", "taskType", "index"), 1L); emitter.verifyValue("task/failed/count", Map.of("dataSource", "d1", "taskType", "index"), 1L); diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java index 0fed1c9b6bc1..092a7b66d015 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/TaskSlotCountStatsMonitorTest.java @@ -74,7 +74,7 @@ public void testMonitor() final TaskSlotCountStatsMonitor monitor = new TaskSlotCountStatsMonitor(statsProvider); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(5, emitter.getEvents().size()); + Assert.assertEquals(5, emitter.getNumEmittedEvents()); emitter.verifyValue("taskSlot/total/count", 1L); emitter.verifyValue("taskSlot/idle/count", 1L); emitter.verifyValue("taskSlot/used/count", 1L); diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java index a1930bef6379..05e10ab01424 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java +++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java @@ -222,7 +222,7 @@ public void testMonitor() new WorkerTaskCountStatsMonitor(injectorForMiddleManager, ImmutableSet.of(NodeRole.MIDDLE_MANAGER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(5, emitter.getEvents().size()); + Assert.assertEquals(5, emitter.getNumEmittedEvents()); emitter.verifyValue( "worker/task/failed/count", ImmutableMap.of("category", "workerCategory", "workerVersion", "workerVersion"), @@ -257,7 +257,7 @@ public void testMonitorIndexer() new WorkerTaskCountStatsMonitor(injectorForIndexer, ImmutableSet.of(NodeRole.INDEXER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(10, emitter.getEvents().size()); + Assert.assertEquals(10, emitter.getNumEmittedEvents()); emitter.verifyValue( "worker/task/running/count", ImmutableMap.of("dataSource", "wikipedia"), @@ -317,7 +317,7 @@ public void testMonitorWithNulls() new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); } @Test @@ -327,6 +327,6 @@ public void testMonitorWithPeon() new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON)); final StubServiceEmitter emitter = new StubServiceEmitter("service", "host"); monitor.doMonitor(emitter); - Assert.assertEquals(0, emitter.getEvents().size()); + Assert.assertEquals(0, emitter.getNumEmittedEvents()); } } diff --git a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java index 6bcf87fdc4ad..f974d361b1f8 100644 --- a/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java +++ b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java @@ -683,7 +683,8 @@ protected void doService( } catch (NullPointerException ignored) { } - Assert.assertEquals("query/time", stubServiceEmitter.getEvents().get(0).toMap().get("metric")); + // Assert.assertEquals("query/time", stubServiceEmitter.getEvents().get(0).toMap().get("metric")); + stubServiceEmitter.verifyEmitted("query/time", 1); if (!isJDBCSql) { Assert.assertEquals("dummy", stubServiceEmitter.getEvents().get(0).toMap().get("id")); } From a6329b37bb2dea2d23072d86629ec8ddd68526e5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 16 Jul 2025 10:35:15 +0530 Subject: [PATCH 2/5] Remove ServiceMetricEventSnapshot --- .../duty/UnusedSegmentsKillerTest.java | 4 +- .../emitter/service/ServiceMetricEvent.java | 8 ++++ .../service/ServiceMetricEventTest.java | 29 ++++++++++++- .../java/util/metrics/StubServiceEmitter.java | 42 ++++--------------- .../server/ClientQuerySegmentWalkerTest.java | 20 ++++----- .../server/audit/SQLAuditManagerTest.java | 10 ++--- .../server/metrics/LatchableEmitter.java | 22 +++++++++- 7 files changed, 77 insertions(+), 58 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index 97c5f6bc1b98..0cc5f6f5f232 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.UnusedSegmentKillerConfig; @@ -328,8 +329,7 @@ public void test_run_prioritizesOlderIntervals() emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); // Verify that the kill intervals are sorted with the oldest interval first - final List events = - emitter.getMetricEvents(TaskMetrics.RUN_DURATION); + final List events = emitter.getMetricEvents(TaskMetrics.RUN_DURATION); final List killIntervals = events.stream().map(event -> { final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID); String[] splits = taskId.split("_"); diff --git a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java index 3bab9a3ad04d..f7f3549ecf7d 100644 --- a/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java +++ b/processing/src/main/java/org/apache/druid/java/util/emitter/service/ServiceMetricEvent.java @@ -125,6 +125,14 @@ public EventMap toMap() .build(); } + /** + * Creates an immutable copy of this metric event. This is used only in tests. + */ + public ServiceMetricEvent copy() + { + return new ServiceMetricEvent(createdTime, serviceDims, Map.copyOf(userDims), feed, metric, value); + } + /** * Builder for a {@link ServiceMetricEvent}. This builder can be used for * building only one event. diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java index 5fbf4dc9c9e1..cf59f0f3c40e 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/ServiceMetricEventTest.java @@ -35,7 +35,7 @@ public class ServiceMetricEventTest { @Test - public void testStupidTest() + public void testBuilder() { ServiceMetricEvent builderEvent = new ServiceMetricEvent.Builder() .setDimension("user1", "a") @@ -317,4 +317,31 @@ public void testSetDimensionIfNotNullShouldNotSetNullDimension() Assert.assertTrue(target.getUserDims().isEmpty()); Assert.assertNull(target.getUserDims().get("userDimMap")); } + + @Test + public void test_copy_returnsAnImmutableInstance() + { + final ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent + .builder() + .setDimension("dim1", "v1") + .setMetric("m1", 100); + + final ServiceMetricEvent event1 = eventBuilder.build("coordinator", "localhost"); + final ServiceMetricEvent event1Copy = event1.copy(); + + Assert.assertEquals(Map.of("dim1", "v1"), event1.getUserDims()); + Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims()); + + final ServiceMetricEvent event2 = eventBuilder + .setDimension("dim2", "v2") + .setMetric("m2", 200) + .build("coordinator", "localhost"); + + // Verify that the original event gets changed dimensions + Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), event2.getUserDims()); + Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), event1.getUserDims()); + + // But the event copy still has the original dimensions + Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims()); + } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index c1a5413513c5..b330573c3621 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -41,7 +41,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie { private final Queue events = new ConcurrentLinkedDeque<>(); private final Queue alertEvents = new ConcurrentLinkedDeque<>(); - private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { @@ -59,7 +59,7 @@ public void emit(Event event) if (event instanceof ServiceMetricEvent) { ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ConcurrentLinkedDeque<>()) - .add(new ServiceMetricEventSnapshot(metricEvent)); + .add(metricEvent.copy()); } else if (event instanceof AlertEvent) { alertEvents.add((AlertEvent) event); } @@ -84,9 +84,9 @@ public int getNumEmittedEvents() * * @return List of events emitted for the given metric. */ - public List getMetricEvents(String metricName) + public List getMetricEvents(String metricName) { - final Queue metricEventQueue = metricEvents.get(metricName); + final Queue metricEventQueue = metricEvents.get(metricName); return metricEventQueue == null ? List.of() : List.copyOf(metricEventQueue); } @@ -105,18 +105,18 @@ public List getMetricValues( ) { final List values = new ArrayList<>(); - final Queue events = + final Queue events = metricEvents.getOrDefault(metricName, new ArrayDeque<>()); final Map filters = dimensionFilters == null ? Collections.emptyMap() : dimensionFilters; - for (ServiceMetricEventSnapshot event : events) { + for (ServiceMetricEvent event : events) { final Map userDims = event.getUserDims(); boolean match = filters.keySet().stream() .map(d -> filters.get(d).equals(userDims.get(d))) .reduce((a, b) -> a && b) .orElse(true); if (match) { - values.add(event.getMetricEvent().getValue()); + values.add(event.getValue()); } } @@ -140,32 +140,4 @@ public void flush() public void close() { } - - /** - * Helper class to encapsulate a ServiceMetricEvent and its user dimensions. - * Since {@link StubServiceEmitter} doesn't actually emit metrics and saves the emitted metrics in-memory, - * this helper class saves a copy of {@link ServiceMetricEvent#userDims} of emitted metrics - * via {@link ServiceMetricEvent#getUserDims()} as it can get mutated. - */ - public static class ServiceMetricEventSnapshot - { - private final ServiceMetricEvent metricEvent; - private final Map userDims; - - public ServiceMetricEventSnapshot(ServiceMetricEvent metricEvent) - { - this.metricEvent = metricEvent; - this.userDims = metricEvent.getUserDims(); - } - - public ServiceMetricEvent getMetricEvent() - { - return metricEvent; - } - - public Map getUserDims() - { - return userDims; - } - } } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index e8ed5e3bf5a2..db1b97bce634 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.EventMap; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.DataSource; @@ -986,11 +987,8 @@ public void testMetricsWithMaxSubqueryRowsEnabled() ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) ); - List events = - emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC); - - for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) { - EventMap map = event.getMetricEvent().toMap(); + for (ServiceMetricEvent event : emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC)) { + EventMap map = event.toMap(); if (ClientQuerySegmentWalker.ROWS_COUNT_METRIC.equals(map.get("metric"))) { Assert.assertTrue(map.containsKey("host")); Assert.assertTrue(map.containsKey("service")); @@ -1038,11 +1036,8 @@ public void testMetricsWithMaxSubqueryBytesEnabled() ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L}) ); - List events - = emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC); - - for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) { - EventMap map = event.getMetricEvent().toMap(); + for (ServiceMetricEvent event : emitter.getMetricEvents(ClientQuerySegmentWalker.ROWS_COUNT_METRIC)) { + EventMap map = event.toMap(); Assert.assertTrue(map.containsKey("host")); Assert.assertTrue(map.containsKey("service")); Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); @@ -1050,9 +1045,8 @@ public void testMetricsWithMaxSubqueryBytesEnabled() Assert.assertEquals(3, map.get("value")); } - events = emitter.getMetricEvents(ClientQuerySegmentWalker.BYTES_COUNT_METRIC); - for (StubServiceEmitter.ServiceMetricEventSnapshot event : events) { - EventMap map = event.getMetricEvent().toMap(); + for (ServiceMetricEvent event : emitter.getMetricEvents(ClientQuerySegmentWalker.BYTES_COUNT_METRIC)) { + EventMap map = event.toMap(); Assert.assertTrue(map.containsKey("host")); Assert.assertTrue(map.containsKey("service")); Assert.assertEquals(DUMMY_QUERY_ID, map.get(DruidMetrics.ID)); diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index 5bbf84c322dd..650bf7d02cf2 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.TestDerbyConnector; import org.joda.time.DateTime; @@ -90,11 +91,10 @@ public void testAuditMetricEventWithPayload() throws IOException final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc()); auditManager.doAudit(entry); - List auditMetricEvents - = serviceEmitter.getMetricEvents("config/audit"); + List auditMetricEvents = serviceEmitter.getMetricEvents("config/audit"); Assert.assertEquals(1, auditMetricEvents.size()); - StubServiceEmitter.ServiceMetricEventSnapshot metric = auditMetricEvents.get(0); + ServiceMetricEvent metric = auditMetricEvents.get(0); final AuditEntry dbEntry = lookupAuditEntryForKey("testKey"); Assert.assertNotNull(dbEntry); @@ -116,12 +116,12 @@ public void testCreateAuditEntry() throws IOException Assert.assertEquals(entry, dbEntry); // Verify emitted metrics - List auditMetricEvents + List auditMetricEvents = serviceEmitter.getMetricEvents("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - StubServiceEmitter.ServiceMetricEventSnapshot metric = auditMetricEvents.get(0); + ServiceMetricEvent metric = auditMetricEvents.get(0); Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key")); Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type")); Assert.assertNull(metric.getUserDims().get("payload")); diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index c7d8f1010cdd..b31cb1f68efe 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -84,6 +84,21 @@ public void emit(Event event) triggerConditionEvaluations(event); } + @Override + public void flush() + { + // TODO: some locking here and in close() + super.flush(); + processedEvents.clear(); + } + + @Override + public void close() + { + super.close(); + processedEvents.clear(); + } + /** * Waits until an event that satisfies the given predicate is emitted. * @@ -94,7 +109,6 @@ public void waitForEvent(Predicate condition, long timeoutMillis) { final WaitCondition waitCondition = new WaitCondition(condition); registerWaitCondition(waitCondition); - waitConditions.add(waitCondition); try { final long awaitTime = timeoutMillis >= 0 ? timeoutMillis : Long.MAX_VALUE; @@ -193,9 +207,13 @@ private void registerWaitCondition(WaitCondition condition) for (Event event : processedEvents) { if (condition.predicate.test(event)) { condition.countDownLatch.countDown(); + break; } } - waitConditions.add(condition); + + if (condition.countDownLatch.getCount() > 0) { + waitConditions.add(condition); + } } catch (Exception e) { throw new ISE(e, "Error while evaluating condition"); From c59b48e5b0715ec979d0e1f78440885f1337ef40 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 16 Jul 2025 13:24:55 +0530 Subject: [PATCH 3/5] Add locking around close, flush in LatchableEmitter --- .../server/metrics/LatchableEmitter.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index b31cb1f68efe..5df770b1ce91 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -87,16 +87,27 @@ public void emit(Event event) @Override public void flush() { - // TODO: some locking here and in close() - super.flush(); - processedEvents.clear(); + eventProcessingLock.lock(); + try { + super.flush(); + processedEvents.clear(); + } + finally { + eventProcessingLock.unlock(); + } } @Override public void close() { - super.close(); - processedEvents.clear(); + eventProcessingLock.lock(); + try { + super.close(); + processedEvents.clear(); + } + finally { + eventProcessingLock.unlock(); + } } /** From f5e805459e20d2c908196c4355911d6e9514984b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 16 Jul 2025 14:49:33 +0530 Subject: [PATCH 4/5] Evaluate conditions synchronously --- .../server/metrics/LatchableEmitter.java | 25 +++---------------- .../emitter/LatchableEmitterModule.java | 6 ++--- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index 5df770b1ce91..b493451ff552 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -20,7 +20,6 @@ package org.apache.druid.server.metrics; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; @@ -34,7 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -55,10 +53,6 @@ public class LatchableEmitter extends StubServiceEmitter public static final String TYPE = "latching"; - /** - * Single-threaded executor to evaluate conditions. - */ - private final ScheduledExecutorService conditionEvaluateExecutor; private final Set waitConditions = new HashSet<>(); private final ReentrantLock eventProcessingLock = new ReentrantLock(); @@ -71,17 +65,16 @@ public class LatchableEmitter extends StubServiceEmitter /** * Creates a {@link StubServiceEmitter} that may be used in embedded tests. */ - public LatchableEmitter(String service, String host, ScheduledExecutorFactory executorFactory) + public LatchableEmitter(String service, String host) { super(service, host); - this.conditionEvaluateExecutor = executorFactory.create(1, "LatchingEmitter-eval-%d"); } @Override public void emit(Event event) { super.emit(event); - triggerConditionEvaluations(event); + evaluateWaitConditions(event); } @Override @@ -169,18 +162,8 @@ public void waitForEventAggregate( ); } - private void triggerConditionEvaluations(Event event) - { - if (conditionEvaluateExecutor == null) { - throw new ISE("Cannot evaluate conditions as the 'conditionEvaluateExecutor' is null."); - } else { - conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event)); - } - } - /** - * Evaluates wait conditions. This method must be invoked on the - * {@link #conditionEvaluateExecutor} so that it does not block {@link #emit(Event)}. + * Evaluates wait conditions for the given event. */ private void evaluateWaitConditions(Event event) { @@ -239,8 +222,6 @@ private static class WaitCondition private final Predicate predicate; private final CountDownLatch countDownLatch; - private int processedUntil; - private WaitCondition(Predicate predicate) { this.predicate = predicate; diff --git a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java index 1333164449f7..71c228a96e55 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java @@ -26,7 +26,6 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.server.DruidNode; import org.apache.druid.server.metrics.LatchableEmitter; @@ -48,10 +47,9 @@ public void configure(Binder binder) @Provides @ManageLifecycle public LatchableEmitter makeEmitter( - @Self DruidNode selfNode, - ScheduledExecutorFactory executorFactory + @Self DruidNode selfNode ) { - return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), executorFactory); + return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost()); } } From 90569353dd87efdd3b5a25f57886bcafde593852 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 16 Jul 2025 14:56:42 +0530 Subject: [PATCH 5/5] throw exception if condition eval fails --- .../java/org/apache/druid/server/metrics/LatchableEmitter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index b493451ff552..ef78930f65a9 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -182,7 +182,8 @@ private void evaluateWaitConditions(Event event) } } catch (Exception e) { - log.error(e, "Error while evaluating wait conditions"); + log.error(e, "Error while evaluating wait conditions for event[%s]", event.toMap()); + throw new ISE(e, "Error while evaluating wait conditions for event[%s]", event.toMap()); } finally { processedEvents.add(event);