Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void test_launchPeonJobAndWaitForStart()
Pod peonPod = instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS);

Assertions.assertNotNull(peonPod);
Assertions.assertEquals(1, serviceEmitter.getEvents().size());
Assertions.assertEquals(1, serviceEmitter.getNumEmittedEvents());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.UnusedSegmentKillerConfig;
Expand All @@ -52,7 +53,6 @@
import org.junit.Test;

import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -329,8 +329,7 @@ public void test_run_prioritizesOlderIntervals()
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);

// Verify that the kill intervals are sorted with the oldest interval first
final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
final List<ServiceMetricEvent> events = emitter.getMetricEvents(TaskMetrics.RUN_DURATION);
final List<Interval> killIntervals = events.stream().map(event -> {
final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID);
String[] splits = taskId.split("_");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
Assert.assertEquals(2, taskCountAfterScaleOut);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents()
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
Expand Down Expand Up @@ -840,8 +839,7 @@ public int getActiveTaskGroupsCount()

Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents()
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
Expand Down Expand Up @@ -1103,8 +1101,7 @@ public int getActiveTaskGroupsCount()

Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents()
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@ public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
latch.await();

supervisor.emitLag();
Assert.assertEquals(0, emitter.getEvents().size());
Assert.assertEquals(0, emitter.getNumEmittedEvents());
}

private void validateSupervisorStateAfterResetOffsets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@

import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.worker.shuffle.ShuffleMetrics.PerDatasourceShuffleMetrics;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.List;
import java.util.Map;

public class ShuffleMonitorTest
{
Expand All @@ -46,23 +44,16 @@ public void testDoMonitor()
final ShuffleMonitor monitor = new ShuffleMonitor();
monitor.setShuffleMetrics(shuffleMetrics);
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> events = emitter.getEvents();
Assert.assertEquals(2, events.size());
Assert.assertSame(ServiceMetricEvent.class, events.get(0).getClass());
ServiceMetricEvent event = (ServiceMetricEvent) events.get(0);
Assert.assertEquals(ShuffleMonitor.SHUFFLE_BYTES_KEY, event.getMetric());
Assert.assertEquals(310L, event.getValue());
Assert.assertEquals(
ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
event.getUserDims()
Assert.assertEquals(2, emitter.getNumEmittedEvents());
emitter.verifyValue(
ShuffleMonitor.SHUFFLE_BYTES_KEY,
Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
310L
);
Assert.assertSame(ServiceMetricEvent.class, events.get(1).getClass());
event = (ServiceMetricEvent) events.get(1);
Assert.assertEquals(ShuffleMonitor.SHUFFLE_REQUESTS_KEY, event.getMetric());
Assert.assertEquals(3, event.getValue());
Assert.assertEquals(
ImmutableMap.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
event.getUserDims()
emitter.verifyValue(
ShuffleMonitor.SHUFFLE_REQUESTS_KEY,
Map.of(ShuffleMonitor.SUPERVISOR_TASK_ID_DIMENSION, "supervisor"),
3
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public EventMap toMap()
.build();
}

/**
* Creates an immutable copy of this metric event. This is used only in tests.
*/
public ServiceMetricEvent copy()
{
return new ServiceMetricEvent(createdTime, serviceDims, Map.copyOf(userDims), feed, metric, value);
}

/**
* Builder for a {@link ServiceMetricEvent}. This builder can be used for
* building only one event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class ServiceMetricEventTest
{
@Test
public void testStupidTest()
public void testBuilder()
{
ServiceMetricEvent builderEvent = new ServiceMetricEvent.Builder()
.setDimension("user1", "a")
Expand Down Expand Up @@ -317,4 +317,31 @@ public void testSetDimensionIfNotNullShouldNotSetNullDimension()
Assert.assertTrue(target.getUserDims().isEmpty());
Assert.assertNull(target.getUserDims().get("userDimMap"));
}

@Test
public void test_copy_returnsAnImmutableInstance()
{
final ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent
.builder()
.setDimension("dim1", "v1")
.setMetric("m1", 100);

final ServiceMetricEvent event1 = eventBuilder.build("coordinator", "localhost");
final ServiceMetricEvent event1Copy = event1.copy();

Assert.assertEquals(Map.of("dim1", "v1"), event1.getUserDims());
Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());

final ServiceMetricEvent event2 = eventBuilder
.setDimension("dim2", "v2")
.setMetric("m2", 200)
.build("coordinator", "localhost");

// Verify that the original event gets changed dimensions
Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), event2.getUserDims());
Assert.assertEquals(Map.of("dim1", "v1", "dim2", "v2"), event1.getUserDims());

// But the event copy still has the original dimensions
Assert.assertEquals(Map.of("dim1", "v1"), event1Copy.getUserDims());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
Expand All @@ -34,8 +33,6 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;

public class CgroupCpuSetMonitorTest
{
Expand Down Expand Up @@ -72,19 +69,11 @@ public void testMonitor()
final CgroupCpuSetMonitor monitor = new CgroupCpuSetMonitor(discoverer, ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(4, actualEvents.size());
final Map<String, Object> cpusEvent = actualEvents.get(0).toMap();
final Map<String, Object> effectiveCpusEvent = actualEvents.get(1).toMap();
final Map<String, Object> memsEvent = actualEvents.get(2).toMap();
final Map<String, Object> effectiveMemsEvent = actualEvents.get(3).toMap();
Assert.assertEquals("cgroup/cpuset/cpu_count", cpusEvent.get("metric"));
Assert.assertEquals(8, cpusEvent.get("value"));
Assert.assertEquals("cgroup/cpuset/effective_cpu_count", effectiveCpusEvent.get("metric"));
Assert.assertEquals(7, effectiveCpusEvent.get("value"));
Assert.assertEquals("cgroup/cpuset/mems_count", memsEvent.get("metric"));
Assert.assertEquals(4, memsEvent.get("value"));
Assert.assertEquals("cgroup/cpuset/effective_mems_count", effectiveMemsEvent.get("metric"));
Assert.assertEquals(1, effectiveMemsEvent.get("value"));
Assert.assertEquals(4, emitter.getNumEmittedEvents());

emitter.verifyValue("cgroup/cpuset/cpu_count", 8);
emitter.verifyValue("cgroup/cpuset/effective_cpu_count", 7);
emitter.verifyValue("cgroup/cpuset/mems_count", 4);
emitter.verifyValue("cgroup/cpuset/effective_mems_count", 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public void testMonitor() throws IOException
final CgroupDiskMonitor monitor = new CgroupDiskMonitor(discoverer, ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(0, emitter.getEvents().size());
Assert.assertEquals(0, emitter.getNumEmittedEvents());

TestUtils.copyOrReplaceResource("/blkio.throttle.io_service_bytes-2", serviceBytesFile);
TestUtils.copyOrReplaceResource("/blkio.throttle.io_serviced-2", servicedFile);

Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(8, emitter.getEvents().size());
Assert.assertEquals(8, emitter.getNumEmittedEvents());
Assert.assertTrue(
emitter
.getEvents()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
Expand All @@ -34,7 +33,6 @@

import java.io.File;
import java.io.IOException;
import java.util.List;

public class CgroupMemoryMonitorTest
{
Expand Down Expand Up @@ -71,7 +69,6 @@ public void testMonitor()
final CgroupMemoryMonitor monitor = new CgroupMemoryMonitor(discoverer, ImmutableMap.of(), "some_feed");
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(46, actualEvents.size());
Assert.assertEquals(46, emitter.getNumEmittedEvents());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.java.util.metrics;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.metrics.cgroups.CgroupDiscoverer;
import org.apache.druid.java.util.metrics.cgroups.ProcCgroupV2Discoverer;
import org.apache.druid.java.util.metrics.cgroups.TestUtils;
Expand All @@ -33,7 +32,6 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

public class CgroupV2CpuMonitorTest
Expand Down Expand Up @@ -65,8 +63,7 @@ public void testMonitor() throws IOException, InterruptedException
final CgroupV2CpuMonitor monitor = new CgroupV2CpuMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
final List<Event> actualEvents = emitter.getEvents();
Assert.assertEquals(0, actualEvents.size());
Assert.assertEquals(0, emitter.getNumEmittedEvents());

emitter.flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public void testMonitor() throws IOException
final CgroupV2DiskMonitor monitor = new CgroupV2DiskMonitor(discoverer);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(0, emitter.getEvents().size());
Assert.assertEquals(0, emitter.getNumEmittedEvents());

emitter.flush();

TestUtils.copyOrReplaceResource("/cgroupv2/io.stat-2", statFile);

Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(4, emitter.getEvents().size());
Assert.assertEquals(4, emitter.getNumEmittedEvents());
Assert.assertTrue(
emitter
.getEvents()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public void testSimpleMonitor() throws Exception
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertFalse(monitor.doMonitor(emitter));
// First should just cache
Assert.assertEquals(0, emitter.getEvents().size());
Assert.assertEquals(0, emitter.getNumEmittedEvents());
Assert.assertTrue(cpuacct.delete());
TestUtils.copyResource("/cpuacct.usage_all", cpuacct);
Assert.assertTrue(monitor.doMonitor(emitter));
Assert.assertEquals(2 * 128 + 1, emitter.getEvents().size());
Assert.assertEquals(2 * 128 + 1, emitter.getNumEmittedEvents());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Queue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -65,26 +61,19 @@ public void testDoMonitor()

assertTrue(monitor.doMonitor(stubServiceEmitter));

final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();

assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
assertMetricValue(metricEvents, "emitter/successfulSending/minTimeMs", 0);
assertMetricValue(metricEvents, "emitter/buffers/emitQueue", 30);
assertMetricValue(metricEvents, "emitter/failedSending/minTimeMs", 0);
assertMetricValue(metricEvents, "emitter/buffers/allocated/delta", 20);
assertMetricValue(metricEvents, "emitter/batchFilling/maxTimeMs", 0);
assertMetricValue(metricEvents, "emitter/buffers/dropped/delta", 10);
assertMetricValue(metricEvents, "emitter/batchFilling/minTimeMs", 0);
assertMetricValue(metricEvents, "emitter/events/emitQueue", 200L);
assertMetricValue(metricEvents, "emitter/events/large/emitQueue", 75L);
assertMetricValue(metricEvents, "emitter/buffers/reuseQueue", 15);
assertMetricValue(metricEvents, "emitter/buffers/failed/delta", 5);
assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
}

private void assertMetricValue(Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
{
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
stubServiceEmitter.verifyValue("emitter/successfulSending/maxTimeMs", 0);
stubServiceEmitter.verifyValue("emitter/events/emitted/delta", 100L);
stubServiceEmitter.verifyValue("emitter/successfulSending/minTimeMs", 0);
stubServiceEmitter.verifyValue("emitter/buffers/emitQueue", 30);
stubServiceEmitter.verifyValue("emitter/failedSending/minTimeMs", 0);
stubServiceEmitter.verifyValue("emitter/buffers/allocated/delta", 20);
stubServiceEmitter.verifyValue("emitter/batchFilling/maxTimeMs", 0);
stubServiceEmitter.verifyValue("emitter/buffers/dropped/delta", 10);
stubServiceEmitter.verifyValue("emitter/batchFilling/minTimeMs", 0);
stubServiceEmitter.verifyValue("emitter/events/emitQueue", 200L);
stubServiceEmitter.verifyValue("emitter/events/large/emitQueue", 75L);
stubServiceEmitter.verifyValue("emitter/buffers/reuseQueue", 15);
stubServiceEmitter.verifyValue("emitter/buffers/failed/delta", 5);
stubServiceEmitter.verifyValue("emitter/failedSending/maxTimeMs", 0);
}
}
Loading
Loading