diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java index a6583db66fc9..60563d6daf44 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java @@ -405,6 +405,15 @@ private void submitController() controller.queryId() ); } + catch (Throwable e) { + log.error( + e, + "Controller failed for sqlQueryId[%s], controllerHost[%s]", + plannerContext.getSqlQueryId(), + controller.queryId() + ); + throw e; + } finally { controllerRegistry.deregister(controllerHolder); Thread.currentThread().setName(threadName); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 9c3d94094cbe..a68a58ae4763 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -343,10 +343,23 @@ public void run(final QueryListener queryListener) throws Exception try (final Closer closer = Closer.create()) { reportPayload = runInternal(queryListener, closer); } + catch (Throwable e) { + log.error(e, "Controller internal execution encountered exception."); + queryListener.onQueryComplete(makeStatusReportForException(e)); + throw e; + } // Call onQueryComplete after Closer is fully closed, ensuring no controller-related processing is ongoing. queryListener.onQueryComplete(reportPayload); } + + private MSQTaskReportPayload makeStatusReportForException(Throwable e) + { + MSQErrorReport errorReport = MSQErrorReport.fromFault(queryId(), null, null, UnknownFault.forException(e)); + MSQStatusReport statusReport = new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, new HashMap<>(), 0, 0, null, null); + return new MSQTaskReportPayload(statusReport, null, null, null); + } + @Override public void stop(CancellationReason reason) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index 1130eefa8d78..111489a74dc5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -52,6 +52,7 @@ import org.junit.Test; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; @@ -328,7 +329,7 @@ public void test_run_prioritizesOlderIntervals() emitter.verifyEmitted(UnusedSegmentsKiller.Metric.PROCESSED_KILL_JOBS, 10); // Verify that the kill intervals are sorted with the oldest interval first - final List events = + final Queue events = emitter.getMetricEvents().get(TaskMetrics.RUN_DURATION); final List killIntervals = events.stream().map(event -> { final String taskId = (String) event.getUserDims().get(DruidMetrics.TASK_ID); diff --git a/pom.xml b/pom.xml index cf282bc3cb2c..0b432865810c 100644 --- a/pom.xml +++ b/pom.xml @@ -1094,7 +1094,7 @@ org.junit junit-bom - 5.10.2 + 5.13.3 pom import diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java index 601dc4a61b44..2be52c091e78 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/HttpPostEmitterMonitorTest.java @@ -25,8 +25,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.List; import java.util.Map; +import java.util.Queue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -65,7 +65,7 @@ public void testDoMonitor() assertTrue(monitor.doMonitor(stubServiceEmitter)); - final Map> metricEvents = stubServiceEmitter.getMetricEvents(); + final Map> metricEvents = stubServiceEmitter.getMetricEvents(); assertMetricValue(metricEvents, "emitter/successfulSending/maxTimeMs", 0); assertMetricValue(metricEvents, "emitter/events/emitted/delta", 100L); @@ -83,8 +83,8 @@ public void testDoMonitor() assertMetricValue(metricEvents, "emitter/failedSending/maxTimeMs", 0L); } - private void assertMetricValue(Map> metricEvents, String metricName, Number expectedValue) + private void assertMetricValue(Map> metricEvents, String metricName, Number expectedValue) { - assertEquals(metricEvents.get(metricName).get(0).getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue()); + assertEquals(metricEvents.get(metricName).peek().getMetricEvent().getValue().doubleValue(), expectedValue.doubleValue()); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 323d8cd308c9..55113b97ac20 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -24,11 +24,14 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; /** * Test implementation of {@link ServiceEmitter} that collects emitted metrics @@ -36,9 +39,9 @@ */ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier { - private final List events = new ArrayList<>(); - private final List alertEvents = new ArrayList<>(); - private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); + private final Queue events = new ConcurrentLinkedDeque<>(); + private final Queue alertEvents = new ConcurrentLinkedDeque<>(); + private final ConcurrentHashMap> metricEvents = new ConcurrentHashMap<>(); public StubServiceEmitter() { @@ -55,7 +58,7 @@ public void emit(Event event) { if (event instanceof ServiceMetricEvent) { ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; - metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>()) + metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ConcurrentLinkedDeque<>()) .add(new ServiceMetricEventSnapshot(metricEvent)); } else if (event instanceof AlertEvent) { alertEvents.add((AlertEvent) event); @@ -68,7 +71,7 @@ public void emit(Event event) */ public List getEvents() { - return events; + return new ArrayList<>(events); } /** @@ -76,7 +79,7 @@ public List getEvents() * * @return Map from metric name to list of events emitted for that metric. */ - public Map> getMetricEvents() + public Map> getMetricEvents() { return metricEvents; } @@ -86,7 +89,7 @@ public Map> getMetricEvents() */ public List getAlerts() { - return alertEvents; + return new ArrayList<>(alertEvents); } @Override @@ -96,8 +99,8 @@ public List getMetricValues( ) { final List values = new ArrayList<>(); - final List events = - metricEvents.getOrDefault(metricName, Collections.emptyList()); + final Queue events = + metricEvents.getOrDefault(metricName, new ArrayDeque<>()); final Map filters = dimensionFilters == null ? Collections.emptyMap() : dimensionFilters; for (ServiceMetricEventSnapshot event : events) { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java index fee0d2ca7576..463c4ed8c535 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java @@ -78,6 +78,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -162,7 +163,7 @@ public void testSimpleIngestion() throws Exception ).get(); Assert.assertEquals( ImmutableMap.of("x", "3"), - (Map) segmentsAndCommitMetadata.getCommitMetadata() + segmentsAndCommitMetadata.getCommitMetadata() ); Assert.assertEquals( IDENTIFIERS.subList(0, 2), @@ -2278,7 +2279,7 @@ public void testQueryBySegments() throws Exception private void verifySinkMetrics(StubServiceEmitter emitter, Set segmentIds) { - Map> events = emitter.getMetricEvents(); + Map> events = emitter.getMetricEvents(); int segments = segmentIds.size(); Assert.assertEquals(4, events.size()); Assert.assertTrue(events.containsKey("query/cpu/time")); diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index f10248bf1e92..3505eb943a29 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.TreeMap; @RunWith(MockitoJUnitRunner.class) @@ -91,14 +92,14 @@ public void testAuditMetricEventWithPayload() throws IOException final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc()); auditManager.doAudit(entry); - Map> metricEvents = serviceEmitter.getMetricEvents(); + Map> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List auditMetricEvents = metricEvents.get("config/audit"); + Queue auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); + ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent(); final AuditEntry dbEntry = lookupAuditEntryForKey("testKey"); Assert.assertNotNull(dbEntry); @@ -120,14 +121,14 @@ public void testCreateAuditEntry() throws IOException Assert.assertEquals(entry, dbEntry); // Verify emitted metrics - Map> metricEvents = serviceEmitter.getMetricEvents(); + Map> metricEvents = serviceEmitter.getMetricEvents(); Assert.assertEquals(1, metricEvents.size()); - List auditMetricEvents = metricEvents.get("config/audit"); + Queue auditMetricEvents = metricEvents.get("config/audit"); Assert.assertNotNull(auditMetricEvents); Assert.assertEquals(1, auditMetricEvents.size()); - ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent(); + ServiceMetricEvent metric = auditMetricEvents.peek().getMetricEvent(); Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key")); Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type")); Assert.assertNull(metric.getUserDims().get("payload")); diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index 87a5b612147c..aeed3c40ea96 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -181,13 +181,14 @@ private void evaluateWaitConditions() return; } + List events = getEvents(); for (WaitCondition condition : conditionsToEvaluate) { - final int currentNumberOfEvents = getEvents().size(); + final int currentNumberOfEvents = events.size(); // Do not use an iterator over the list to avoid concurrent modification exceptions // Evaluate new events against this condition for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) { - if (condition.predicate.test(getEvents().get(i))) { + if (condition.predicate.test(events.get(i))) { condition.countDownLatch.countDown(); } }