Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@ private void submitController()
controller.queryId()
);
}
catch (Throwable e) {
log.error(
e,
"Controller failed for sqlQueryId[%s], controllerHost[%s]",
plannerContext.getSqlQueryId(),
controller.queryId()
);
throw e;
}
finally {
controllerRegistry.deregister(controllerHolder);
Thread.currentThread().setName(threadName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,23 @@ public void run(final QueryListener queryListener) throws Exception
try (final Closer closer = Closer.create()) {
reportPayload = runInternal(queryListener, closer);
}
catch (Throwable e) {
log.error(e, "Controller internal execution encountered exception.");
queryListener.onQueryComplete(makeStatusReportForException(e));
throw e;
}
// Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing.
queryListener.onQueryComplete(reportPayload);
}


private MSQTaskReportPayload makeStatusReportForException(Throwable e)
{
MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null, null, UnknownFault.forException(e));
MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null);
return new MSQTaskReportPayload(statusReport, null, null, null);
}

@Override
public void stop(CancellationReason reason)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.Test;

import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -328,7 +329,7 @@ public void test_run_prioritizesOlderIntervals()
emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10);

// Verify that the kill intervals are sorted with the oldest interval first
final List<StubServiceEmitter.ServiceMetricEventSnapshot> events =
final Queue<StubServiceEmitter.ServiceMetricEventSnapshot> events =
emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION);
final List<Interval> killIntervals = events.stream().map(event -> {
final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.10.2</version>
<version>5.13.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.Queue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void testDoMonitor()

assertTrue(monitor.doMonitor(stubServiceEmitter));

final Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();
final Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = stubServiceEmitter.getMetricEvents();

assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0);
assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L);
Expand All @@ -83,8 +83,8 @@ public void testDoMonitor()
assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L);
}

private void assertMetricValue(Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
private void assertMetricValue(Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents, String metricName, Number expectedValue)
{
assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,24 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
* Test implementation of {@link ServiceEmitter} that collects emitted metrics
* and alerts in lists.
*/
public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();
private final Queue<Event> events = new ConcurrentLinkedDeque<>();
private final Queue<AlertEvent> alertEvents = new ConcurrentLinkedDeque<>();
private final ConcurrentHashMap<String, Queue<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();

public StubServiceEmitter()
{
Expand All @@ -55,7 +58,7 @@ public void emit(Event event)
{
if (event instanceof ServiceMetricEvent) {
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>())
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ConcurrentLinkedDeque<>())
.add(new ServiceMetricEventSnapshot(metricEvent));
} else if (event instanceof AlertEvent) {
alertEvents.add((AlertEvent) event);
Expand All @@ -68,15 +71,15 @@ public void emit(Event event)
*/
public List<Event> getEvents()
{
return events;
return new ArrayList<>(events);
}

/**
* Gets all the metric events emitted since the previous {@link #flush()}.
*
* @return Map from metric name to list of events emitted for that metric.
*/
public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
public Map<String, Queue<ServiceMetricEventSnapshot>> getMetricEvents()
{
return metricEvents;
}
Expand All @@ -86,7 +89,7 @@ public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
*/
public List<AlertEvent> getAlerts()
{
return alertEvents;
return new ArrayList<>(alertEvents);
}

@Override
Expand All @@ -96,8 +99,8 @@ public List<Number> getMetricValues(
)
{
final List<Number> values = new ArrayList<>();
final List<ServiceMetricEventSnapshot> events =
metricEvents.getOrDefault(metricName, Collections.emptyList());
final Queue<ServiceMetricEventSnapshot> events =
metricEvents.getOrDefault(metricName, new ArrayDeque<>());
final Map<String, Object> filters =
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
for (ServiceMetricEventSnapshot event : events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -162,7 +163,7 @@ public void testSimpleIngestion() throws Exception
).get();
Assert.assertEquals(
ImmutableMap.of("x", "3"),
(Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
segmentsAndCommitMetadata.getCommitMetadata()
);
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Expand Down Expand Up @@ -2278,7 +2279,7 @@ public void testQueryBySegments() throws Exception

private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> segmentIds)
{
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
int segments = segmentIds.size();
Assert.assertEquals(4, events.size());
Assert.assertTrue(events.containsKey("query/cpu/time"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -91,14 +92,14 @@ public void testAuditMetricEventWithPayload() throws IOException
final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc());
auditManager.doAudit(entry);

Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());

List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());

ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();

final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
Assert.assertNotNull(dbEntry);
Expand All @@ -120,14 +121,14 @@ public void testCreateAuditEntry() throws IOException
Assert.assertEquals(entry, dbEntry);

// Verify emitted metrics
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Map<String, Queue<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());

List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Queue<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());

ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent();
Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
Assert.assertNull(metric.getUserDims().get("payload"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,14 @@ private void evaluateWaitConditions()
return;
}

List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
final int currentNumberOfEvents = getEvents().size();
final int currentNumberOfEvents = events.size();

// Do not use an iterator over the list to avoid concurrent modification exceptions
// Evaluate new events against this condition
for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) {
if (condition.predicate.test(getEvents().get(i))) {
if (condition.predicate.test(events.get(i))) {
condition.countDownLatch.countDown();
}
}
Expand Down
Loading