Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ Each task type has a different default lock priority. The below table shows the
|task type|default priority|
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Batch index tasks, including [native batch](native-batch.md), [SQL](../multi-stage-query/index.md), and [Hadoop-based](hadoop.md)|50|
|Merge/Append/Compaction task|25|
|Other tasks|0|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -1090,30 +1091,17 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
.submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval));
}
} else {
try {
context.taskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments));
}
catch (Exception e) {
if (isTaskLockPreemptedException(e)) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
} else {
throw e;
}
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)
);
}
} else if (!segments.isEmpty()) {
// Append mode.
try {
context.taskActionClient().submit(new SegmentInsertAction(segments));
}
catch (Exception e) {
if (isTaskLockPreemptedException(e)) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
} else {
throw e;
}
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.appendAction(segments, null, null)
);
}
}

Expand Down Expand Up @@ -1863,6 +1851,32 @@ private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(
return retVal;
}

/**
* Performs a particular {@link SegmentTransactionalInsertAction}, publishing segments.
*
* Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption.
*/
static void performSegmentPublish(
final TaskActionClient client,
final SegmentTransactionalInsertAction action
) throws IOException
{
try {
final SegmentPublishResult result = client.submit(action);

if (!result.isSuccess()) {
throw new MSQException(InsertLockPreemptedFault.instance());
}
}
catch (Exception e) {
if (isTaskLockPreemptedException(e)) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
} else {
throw e;
}
}
}

/**
* Method that determines whether an exception was raised due to the task lock for the controller task being
* preempted. Uses string comparison, because the relevant Overlord APIs do not have a more reliable way of
Expand All @@ -1874,6 +1888,9 @@ private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(
private static boolean isTaskLockPreemptedException(Exception e)
{
final String exceptionMsg = e.getMessage();
if (exceptionMsg == null) {
return false;
}
final List<String> validExceptionExcerpts = ImmutableList.of(
"are not covered by locks" /* From TaskLocks */,
"is preempted and no longer valid" /* From SegmentAllocateAction */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ public void stopGracefully(final TaskConfig taskConfig)
}
}

@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 updated.

}

private static String getDataSourceForTaskMetadata(final MSQSpec querySpec)
{
final MSQDestination destination = querySpec.getDestination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerContext;
Expand Down Expand Up @@ -112,4 +113,10 @@ public void stopGracefully(TaskConfig taskConfig)
worker.stopGracefully();
}
}

@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class InsertLockPreemptedFault extends BaseMSQFault
{
super(
CODE,
"Insert lock preempted while trying to ingest the data."
+ " This can occur if there are higher priority jobs like real-time ingestion running on same time chunks."
"Lock preempted while trying to ingest the data. This can occur if there are higher priority tasks, such as "
+ "real-time ingestion, running on the same time chunks."
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.exec;

import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;

public class ControllerImplTest
{
@Test
public void test_performSegmentPublish_ok() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);

final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.ok(Collections.emptySet()));
EasyMock.replay(taskActionClient);

// All OK.
ControllerImpl.performSegmentPublish(taskActionClient, action);
}

@Test
public void test_performSegmentPublish_publishFail() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);

final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.fail("oops"));
EasyMock.replay(taskActionClient);

final MSQException e = Assert.assertThrows(
MSQException.class,
() -> ControllerImpl.performSegmentPublish(taskActionClient, action)
);

Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault());
}

@Test
public void test_performSegmentPublish_publishException() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);

final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("oops"));
EasyMock.replay(taskActionClient);

final ISE e = Assert.assertThrows(
ISE.class,
() -> ControllerImpl.performSegmentPublish(taskActionClient, action)
);

Assert.assertEquals("oops", e.getMessage());
}

@Test
public void test_performSegmentPublish_publishLockPreemptedException() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);

final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("are not covered by locks"));
EasyMock.replay(taskActionClient);

final MSQException e = Assert.assertThrows(
MSQException.class,
() -> ControllerImpl.performSegmentPublish(taskActionClient, action)
);

Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -91,6 +95,10 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction)
));
} else if (taskAction instanceof RetrieveUsedSegmentsAction) {
return (RetType) ImmutableSet.of();
} else if (taskAction instanceof SegmentTransactionalInsertAction) {
// Always OK.
final Set<DataSegment> segments = ((SegmentTransactionalInsertAction) taskAction).getSegments();
return (RetType) SegmentPublishResult.ok(segments);
} else {
return null;
}
Expand Down