Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
{
private static final Logger log = new Logger(HadoopIndexTask.class);
private static final String HADOOP_JOB_ID_FILENAME = "mapReduceJobId.json";
private TaskConfig taskConfig = null;

private static String getTheDataSource(HadoopIngestionSpec spec)
{
Expand Down Expand Up @@ -223,7 +222,7 @@ public String getClasspathPrefix()
return classpathPrefix;
}

public String getHadoopJobIdFileName()
private String getHadoopJobIdFileName(TaskConfig taskConfig)
{
return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
}
Expand All @@ -232,7 +231,6 @@ public String getHadoopJobIdFileName()
public TaskStatus run(TaskToolbox toolbox)
{
try {
taskConfig = toolbox.getConfig();
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this, false);
Expand Down Expand Up @@ -270,7 +268,7 @@ public TaskStatus run(TaskToolbox toolbox)
@SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
String hadoopJobIdFile = getHadoopJobIdFileName();
String hadoopJobIdFile = getHadoopJobIdFileName(toolbox.getConfig());
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();

Expand Down Expand Up @@ -432,7 +430,7 @@ public void stopGracefully(TaskConfig taskConfig)
// To avoid issue of kill command once the ingestion task is actually completed
if (!ingestionState.equals(IngestionState.COMPLETED)) {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
String hadoopJobIdFile = getHadoopJobIdFileName();
String hadoopJobIdFile = getHadoopJobIdFileName(taskConfig);

try {
ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,9 @@ public void shutdown(final String taskid, String reason)
}

if (taskInfo.processHolder != null) {
// Will trigger normal failure mechanisms due to process exit
// Will cleanup the underlying running task if any and trigger normal failure mechanisms due to process exit
log.info("Killing process for task: %s", taskid);
taskInfo.getTask().stopGracefully(taskConfig);
taskInfo.processHolder.process.destroy();
}
}
Expand Down