From 2640e03ebf0efb31b8687634a22af5a9d8db83dc Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 28 Oct 2022 16:41:56 -0700 Subject: [PATCH 1/4] MSQ: Fix task lock checking during publish, fix lock priority. Fixes two issues: 1) ControllerImpl did not properly check the return value of SegmentTransactionalInsertAction when doing a REPLACE. This could cause it to not realize that its locks were preempted. 2) Task lock priority was the default of 0. It should be the higher batch default of 50. The low priority made it possible for MSQ tasks to be preempted by compaction tasks, which is not desired. --- .../apache/druid/msq/exec/ControllerImpl.java | 19 +++++++++++++++---- .../druid/msq/indexing/MSQControllerTask.java | 6 ++++++ .../error/InsertLockPreemptedFault.java | 4 ++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 8686abbc2d06..b045d6af4d22 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -59,8 +59,8 @@ import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -1090,8 +1090,13 @@ private void publishAllSegments(final Set segments) throws IOExcept } } else { try { - context.taskActionClient() - .submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)); + final SegmentPublishResult result = + context.taskActionClient() + .submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)); + + if (!result.isSuccess()) { + throw new MSQException(InsertLockPreemptedFault.instance()); + } } catch (Exception e) { if (isTaskLockPreemptedException(e)) { @@ -1104,7 +1109,13 @@ private void publishAllSegments(final Set segments) throws IOExcept } else if (!segments.isEmpty()) { // Append mode. try { - context.taskActionClient().submit(new SegmentInsertAction(segments)); + final SegmentPublishResult result = + context.taskActionClient() + .submit(SegmentTransactionalInsertAction.appendAction(segments, null, null)); + + if (!result.isSuccess()) { + throw new MSQException(InsertLockPreemptedFault.instance()); + } } catch (Exception e) { if (isTaskLockPreemptedException(e)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 3e733168170b..b3ecc258efdf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -204,6 +204,12 @@ public void stopGracefully(final TaskConfig taskConfig) } } + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + } + private static String getDataSourceForTaskMetadata(final MSQSpec querySpec) { final MSQDestination destination = querySpec.getDestination(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java index 83bd9ad8e62a..c355dc3ff673 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/InsertLockPreemptedFault.java @@ -32,8 +32,8 @@ public class InsertLockPreemptedFault extends BaseMSQFault { super( CODE, - "Insert lock preempted while trying to ingest the data." - + " This can occur if there are higher priority jobs like real-time ingestion running on same time chunks." + "Lock preempted while trying to ingest the data. This can occur if there are higher priority tasks, such as " + + "real-time ingestion, running on the same time chunks." ); } From d3538176d84cee13533e7c31cd404a3ed387155b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Nov 2022 07:48:00 -0700 Subject: [PATCH 2/4] Restructuring, add docs. --- docs/ingestion/tasks.md | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 67 ++++++++++--------- .../druid/msq/indexing/MSQWorkerTask.java | 7 ++ 3 files changed, 43 insertions(+), 33 deletions(-) diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index dbb4f2b65c76..1a6d58d0633b 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -332,7 +332,7 @@ Each task type has a different default lock priority. The below table shows the |task type|default priority| |---------|----------------| |Realtime index task|75| -|Batch index task|50| +|Batch index tasks, including [native batch](native-batch.md), [SQL](../multi-stage-query/index.md), and [Hadoop-based](hadoop.md)|50| |Merge/Append/Compaction task|25| |Other tasks|0| diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 4f2be099c86d..f81dd553895a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -60,6 +60,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; @@ -1090,41 +1091,17 @@ private void publishAllSegments(final Set segments) throws IOExcept .submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval)); } } else { - try { - final SegmentPublishResult result = - context.taskActionClient() - .submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)); - - if (!result.isSuccess()) { - throw new MSQException(InsertLockPreemptedFault.instance()); - } - } - catch (Exception e) { - if (isTaskLockPreemptedException(e)) { - throw new MSQException(e, InsertLockPreemptedFault.instance()); - } else { - throw e; - } - } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments) + ); } } else if (!segments.isEmpty()) { // Append mode. - try { - final SegmentPublishResult result = - context.taskActionClient() - .submit(SegmentTransactionalInsertAction.appendAction(segments, null, null)); - - if (!result.isSuccess()) { - throw new MSQException(InsertLockPreemptedFault.instance()); - } - } - catch (Exception e) { - if (isTaskLockPreemptedException(e)) { - throw new MSQException(e, InsertLockPreemptedFault.instance()); - } else { - throw e; - } - } + performSegmentPublish( + context.taskActionClient(), + SegmentTransactionalInsertAction.appendAction(segments, null, null) + ); } } @@ -1874,6 +1851,32 @@ private static Map copyOfStageRuntimesEndingAtCurrentTime( return retVal; } + /** + * Performs a particular {@link SegmentTransactionalInsertAction}, publishing segments. + * + * Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption. + */ + private static void performSegmentPublish( + final TaskActionClient client, + final SegmentTransactionalInsertAction action + ) throws IOException + { + try { + final SegmentPublishResult result = client.submit(action); + + if (!result.isSuccess()) { + throw new MSQException(InsertLockPreemptedFault.instance()); + } + } + catch (Exception e) { + if (isTaskLockPreemptedException(e)) { + throw new MSQException(e, InsertLockPreemptedFault.instance()); + } else { + throw e; + } + } + } + /** * Method that determines whether an exception was raised due to the task lock for the controller task being * preempted. Uses string comparison, because the relevant Overlord APIs do not have a more reliable way of diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 04eafc49cc42..e8117a015f53 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.exec.WorkerContext; @@ -112,4 +113,10 @@ public void stopGracefully(TaskConfig taskConfig) worker.stopGracefully(); } } + + @Override + public int getPriority() + { + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + } } From 9bb007e680b397456ebef59528e77db38b00a3e9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Nov 2022 16:24:37 -0700 Subject: [PATCH 3/4] Add performSegmentPublish tests. --- .../apache/druid/msq/exec/ControllerImpl.java | 2 +- .../druid/msq/exec/ControllerImplTest.java | 104 ++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index f81dd553895a..01a1b535a442 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1856,7 +1856,7 @@ private static Map copyOfStageRuntimesEndingAtCurrentTime( * * Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption. */ - private static void performSegmentPublish( + static void performSegmentPublish( final TaskActionClient client, final SegmentTransactionalInsertAction action ) throws IOException diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java new file mode 100644 index 000000000000..b7b0c4b825fa --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerImplTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault; +import org.apache.druid.msq.indexing.error.MSQException; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +public class ControllerImplTest +{ + @Test + public void test_performSegmentPublish_ok() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.ok(Collections.emptySet())); + EasyMock.replay(taskActionClient); + + // All OK. + ControllerImpl.performSegmentPublish(taskActionClient, action); + } + + @Test + public void test_performSegmentPublish_publishFail() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.fail("oops")); + EasyMock.replay(taskActionClient); + + final MSQException e = Assert.assertThrows( + MSQException.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault()); + } + + @Test + public void test_performSegmentPublish_publishException() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("oops")); + EasyMock.replay(taskActionClient); + + final ISE e = Assert.assertThrows( + ISE.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals("oops", e.getMessage()); + } + + @Test + public void test_performSegmentPublish_publishLockPreemptedException() throws IOException + { + final SegmentTransactionalInsertAction action = + SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + + final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class); + EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("are not covered by locks")); + EasyMock.replay(taskActionClient); + + final MSQException e = Assert.assertThrows( + MSQException.class, + () -> ControllerImpl.performSegmentPublish(taskActionClient, action) + ); + + Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault()); + } +} From d39609b3697946af71009b5169a38355d5b91964 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 7 Nov 2022 08:32:24 -0800 Subject: [PATCH 4/4] Fix tests. --- .../java/org/apache/druid/msq/exec/ControllerImpl.java | 3 +++ .../apache/druid/msq/test/MSQTestTaskActionClient.java | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 01a1b535a442..5b58c91448d2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1888,6 +1888,9 @@ static void performSegmentPublish( private static boolean isTaskLockPreemptedException(Exception e) { final String exceptionMsg = e.getMessage(); + if (exceptionMsg == null) { + return false; + } final List validExceptionExcerpts = ImmutableList.of( "are not covered by locks" /* From TaskLocks */, "is preempted and no longer valid" /* From SegmentAllocateAction */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 5022cfe92916..2e6ee4a9bc6f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -27,16 +27,20 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.joda.time.Interval; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -91,6 +95,10 @@ public RetType submit(TaskAction taskAction) )); } else if (taskAction instanceof RetrieveUsedSegmentsAction) { return (RetType) ImmutableSet.of(); + } else if (taskAction instanceof SegmentTransactionalInsertAction) { + // Always OK. + final Set segments = ((SegmentTransactionalInsertAction) taskAction).getSegments(); + return (RetType) SegmentPublishResult.ok(segments); } else { return null; }