Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask;
import org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask;
Expand All @@ -48,21 +50,21 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
@JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class),
@JsonSubTypes.Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
@JsonSubTypes.Type(name = "index_sub", value = SinglePhaseSubTask.class), // for backward compatibility
@JsonSubTypes.Type(name = PartialSegmentGenerateTask.TYPE, value = PartialSegmentGenerateTask.class),
@JsonSubTypes.Type(name = PartialSegmentMergeTask.TYPE, value = PartialSegmentMergeTask.class),
@JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime", value = RealtimeIndexTask.class),
@JsonSubTypes.Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
@JsonSubTypes.Type(name = "noop", value = NoopTask.class),
@JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
@Type(name = "kill", value = KillTask.class),
@Type(name = "move", value = MoveTask.class),
@Type(name = "archive", value = ArchiveTask.class),
@Type(name = "restore", value = RestoreTask.class),
@Type(name = "index", value = IndexTask.class),
@Type(name = ParallelIndexSupervisorTask.TYPE, value = ParallelIndexSupervisorTask.class),
@Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
@Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = LegacySinglePhaseSubTask.class), // for backward compatibility
@Type(name = PartialSegmentGenerateTask.TYPE, value = PartialSegmentGenerateTask.class),
@Type(name = PartialSegmentMergeTask.TYPE, value = PartialSegmentMergeTask.class),
@Type(name = "index_hadoop", value = HadoopIndexTask.class),
@Type(name = "index_realtime", value = RealtimeIndexTask.class),
@Type(name = "index_realtime_appenderator", value = AppenderatorDriverRealtimeIndexTask.class),
@Type(name = "noop", value = NoopTask.class),
@Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;

import javax.annotation.Nullable;
import java.util.Map;

public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
{
@JsonCreator
public LegacySinglePhaseSubTask(
@JsonProperty("id") @Nullable final String id,
@JsonProperty("groupId") final String groupId,
@JsonProperty("resource") final TaskResource taskResource,
@JsonProperty("supervisorTaskId") final String supervisorTaskId,
@JsonProperty("numAttempts") final int numAttempts, // zero-based counting
@JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
@JacksonInject IndexingServiceClient indexingServiceClient,
@JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> taskClientFactory,
@JacksonInject AppenderatorsManager appenderatorsManager
)
{
super(
id,
groupId,
taskResource,
supervisorTaskId,
numAttempts,
ingestionSchema,
context,
indexingServiceClient,
taskClientFactory,
appenderatorsManager
);
}

@Override
public String getType()
{
return SinglePhaseSubTask.OLD_TYPE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ParallelIndexIOConfig extends IndexIOConfig
{
@JsonCreator
public ParallelIndexIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("inputSource") @Nullable InputSource inputSource,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
public class SinglePhaseSubTask extends AbstractBatchIndexTask
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";

private static final Logger LOG = new Logger(SinglePhaseSubTask.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,21 @@ public SinglePhaseSubTask newSubTask(int numAttempts)
new DummyForInjectionAppenderatorsManager()
);
}

@Override
public SinglePhaseSubTask newSubTaskWithBackwardCompatibleType(int numAttempts)
{
return new LegacySinglePhaseSubTask(
null,
getGroupId(),
null,
getSupervisorTaskId(),
numAttempts,
getIngestionSpec(),
getContext(),
null,
null,
new DummyForInjectionAppenderatorsManager()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,22 @@ public InputSplit getInputSplit()
return inputSplit;
}

/**
* Creates a new task for this SubTaskSpec.
*/
public abstract T newSubTask(int numAttempts);

/**
* Creates a new task but with a backward compatible type for this SubTaskSpec. This is to support to rolling update
* for parallel indexing task and subclasses override this method properly if its type name has changed between
* releases. See https://github.com/apache/incubator-druid/issues/8836 for more details.
*
* This method will be called if {@link #newSubTask} fails with an {@link IllegalStateException} with an error
* message starting with "Could not resolve type id". The failure of {@link #newSubTask} with this error is NOT
* recorded as a failed attempt in {@link TaskHistory}.
*/
public T newSubTaskWithBackwardCompatibleType(int numAttempts)
{
return newSubTask(numAttempts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,10 @@ public ListenableFuture<SubTaskCompleteEvent<T>> submit(SubTaskSpec<T> spec)
{
synchronized (startStopLock) {
if (!running) {
return Futures.immediateFailedFuture(new ISE("TaskMonitore is not running"));
return Futures.immediateFailedFuture(new ISE("TaskMonitor is not running"));
}
final T task = spec.newSubTask(0);
log.info("Submitting a new task[%s] for spec[%s]", task.getId(), spec.getId());
indexingServiceClient.runTask(task);
final T task = submitTask(spec, 0);
log.info("Submitted a new task[%s] for spec[%s]", task.getId(), spec.getId());
incrementNumRunningTasks();

final SettableFuture<SubTaskCompleteEvent<T>> taskFuture = SettableFuture.create();
Expand All @@ -248,9 +247,8 @@ private void retry(String subTaskSpecId, MonitorEntry monitorEntry, TaskStatusPl
synchronized (startStopLock) {
if (running) {
final SubTaskSpec<T> spec = monitorEntry.spec;
final T task = spec.newSubTask(monitorEntry.taskHistory.size() + 1);
log.info("Submitting a new task[%s] for retrying spec[%s]", task.getId(), spec.getId());
indexingServiceClient.runTask(task);
final T task = submitTask(spec, monitorEntry.taskHistory.size() + 1);
log.info("Submitted a new task[%s] for retrying spec[%s]", task.getId(), spec.getId());
incrementNumRunningTasks();

runningTasks.put(
Expand All @@ -265,6 +263,38 @@ private void retry(String subTaskSpecId, MonitorEntry monitorEntry, TaskStatusPl
}
}

private T submitTask(SubTaskSpec<T> spec, int numAttempts)
{
T task = spec.newSubTask(numAttempts);
try {
indexingServiceClient.runTask(task);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this approach be able to retry immediately, or will it have to exhaust retries in the indexingServiceClient first? (The loop in DruidLeaderClient)

Ideally, this detects a problem on the first submission, and doesn't need to exhaust retries before it moves on to trying the backwards-compatible type.

Could you check it, if you haven't already?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't retry immediately but will exhaust retries in DruidLeaderClient. Yes, ideally it should be able to detect the problem before it retries, but I'm not sure whether that refactoring is worth to do because 1) it's not easy to teach the logic of the caller to DruidLeaderClient, 2) it will happen only during a particular type of rolling update, 3) and retries won't take much time compared to the total indexing time.

}
catch (Exception e) {
if (isUnknownTypeIdException(e)) {
log.warn(e, "Got an unknown type id error. Retrying with a backward compatible type.");
task = spec.newSubTaskWithBackwardCompatibleType(numAttempts);
indexingServiceClient.runTask(task);
} else {
throw e;
}
}
return task;
}

private boolean isUnknownTypeIdException(Throwable e)
{
if (e instanceof IllegalStateException) {
if (e.getMessage() != null && e.getMessage().contains("Could not resolve type id")) {
return true;
}
}
if (e.getCause() != null) {
return isUnknownTypeIdException(e.getCause());
} else {
return false;
}
}

private void incrementNumRunningTasks()
{
synchronized (taskCountLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.druid.data.input.impl.NoopInputSource;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -95,6 +97,7 @@ public int columnCacheSizeBytes()
.addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
.addValue(AppenderatorsManager.class, new TestAppenderatorsManager())
.addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
.addValue(IndexTaskClientFactory.class, new NoopIndexTaskClientFactory())
);

jsonMapper.registerModule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.task;

import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.joda.time.Duration;

public class NoopIndexTaskClientFactory implements IndexTaskClientFactory
{
@Override
public IndexTaskClient build(
TaskInfoProvider taskInfoProvider,
String callerId,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Map;

public class SinglePhaseSubTaskSpecTest
{
private static final SinglePhaseSubTaskSpec SPEC = new SinglePhaseSubTaskSpec(
"id",
"groupId",
"supervisorTaskId",
new ParallelIndexIngestionSpec(
new DataSchema(
"dataSource",
new TimestampSpec(null, null, null),
new DimensionsSpec(null),
new AggregatorFactory[0],
null,
null
),
new ParallelIndexIOConfig(
null,
new LocalInputSource(new File("baseDir"), "filter"),
new JsonInputFormat(null, null),
null
),
null
),
null,
new InputSplit<>("string split")
);

private ObjectMapper mapper;

@Before
public void setup()
{
mapper = new TestUtils().getTestObjectMapper();
}

@Test
public void testNewSubTaskType() throws IOException
{
final SinglePhaseSubTask expected = SPEC.newSubTask(0);
final byte[] json = mapper.writeValueAsBytes(expected);
final Map<String, Object> actual = mapper.readValue(json, Map.class);
Assert.assertEquals(SinglePhaseSubTask.TYPE, actual.get("type"));
}

@Test
public void testNewSubTaskWithBackwardCompatibleType() throws IOException
{
final SinglePhaseSubTask expected = SPEC.newSubTaskWithBackwardCompatibleType(0);
final byte[] json = mapper.writeValueAsBytes(expected);
final Map<String, Object> actual = mapper.readValue(json, Map.class);
Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type"));
}
}
Loading