Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ public boolean run()
} else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
}
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
groupByJob
);

config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ public boolean run()
groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
groupByJob
);

config.addInputPaths(groupByJob);
config.intoConfiguration(groupByJob);
Expand Down Expand Up @@ -186,7 +190,11 @@ public boolean run()
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob);
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
dimSelectionJob
);

config.intoConfiguration(dimSelectionJob);
FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ public boolean run()

config.intoConfiguration(job);

JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job);
JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()),
job
);

job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
Expand Down
209 changes: 174 additions & 35 deletions indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.druid.indexer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
Expand All @@ -33,22 +34,6 @@
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -64,9 +49,24 @@
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;

/**
*/
Expand All @@ -79,6 +79,7 @@ public class JobHelper
private static final int NUM_RETRIES = 8;
private static final int SECONDS_BETWEEN_RETRIES = 2;
private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB
private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$");

public static Path distributedClassPath(String path)
{
Expand All @@ -90,9 +91,21 @@ public static Path distributedClassPath(Path base)
return new Path(base, "classpath");
}

/**
* Uploads jar files to hdfs and configures the classpath.
* Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs.
* Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs.
*
* @param distributedClassPath classpath shared across multiple jobs
* @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files.
* @param job job to run
*
* @throws IOException
*/
public static void setupClasspath(
Path distributedClassPath,
Job job
final Path distributedClassPath,
final Path intermediateClassPath,
final Job job
)
throws IOException
{
Expand All @@ -111,32 +124,158 @@ public static void setupClasspath(
}

for (String jarFilePath : jarFiles) {
File jarFile = new File(jarFilePath);

final File jarFile = new File(jarFilePath);
if (jarFile.getName().endsWith(".jar")) {
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName());

if (!existing.contains(hdfsPath)) {
if (jarFile.getName().matches(".*SNAPSHOT(-selfcontained)?\\.jar$") || !fs.exists(hdfsPath)) {
log.info("Uploading jar to path[%s]", hdfsPath);
ByteStreams.copy(
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
try {
RetryUtils.retry(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(hdfsPath);
if (isSnapshot(jarFile)) {
addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
} else {
addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
}
return true;
}
);
}
},
shouldRetryPredicate(),
NUM_RETRIES
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
}

existing.add(hdfsPath);
public static final Predicate<Throwable> shouldRetryPredicate()
{
return new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
if (input == null) {
return false;
}
if (input instanceof IOException) {
return true;
}
return apply(input.getCause());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addJarToClassPath has

throw new ISE("File does not exist even after moving from[%s] to [%s]", intermediateHdfsPath, hdfsPath);

that means it won't be retried. Is that intentional?

why not retry here irrespective of what the exception type is, in the worst case there would be 3 retries instead of having to do a recursion?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will modify it to IOException though this should ideally never happen in any case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
};
}

DistributedCache.addFileToClassPath(hdfsPath, conf, fs);
static void addJarToClassPath(
File jarFile,
Path distributedClassPath,
Path intermediateClassPath,
FileSystem fs,
Job job
)
throws IOException
{
// Create distributed directory if it does not exist.
// rename will always fail if destination does not exist.
fs.mkdirs(distributedClassPath);

// Non-snapshot jar files are uploaded to the shared classpath.
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName());
if (!fs.exists(hdfsPath)) {
// Muliple jobs can try to upload the jar here,
// to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath.
final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName());
uploadJar(jarFile, intermediateHdfsPath, fs);
IOException exception = null;
try {
log.info("Renaming jar to path[%s]", hdfsPath);
fs.rename(intermediateHdfsPath, hdfsPath);
if (!fs.exists(hdfsPath)) {
throw new IOException(
String.format(
"File does not exist even after moving from[%s] to [%s]",
intermediateHdfsPath,
hdfsPath
)
);
}
}
catch (IOException e) {
// rename failed, possibly due to race condition. check if some other job has uploaded the jar file.
try {
if (!fs.exists(hdfsPath)) {
log.error(e, "IOException while Renaming jar file");
exception = e;
}
}
catch (IOException e1) {
e.addSuppressed(e1);
exception = e;
}
}
finally {
try {
if (fs.exists(intermediateHdfsPath)) {
fs.delete(intermediateHdfsPath, false);
}
}
catch (IOException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
}
if (exception != null) {
throw exception;
}
}
}
job.addFileToClassPath(hdfsPath);
}

static void addSnapshotJarToClassPath(
File jarFile,
Path intermediateClassPath,
FileSystem fs,
Job job
) throws IOException
{
// Snapshot jars are uploaded to non shared intermediate directory.
final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName());

// existing is used to prevent uploading file multiple times in same run.
if (!existing.contains(hdfsPath)) {
uploadJar(jarFile, hdfsPath, fs);
existing.add(hdfsPath);
}
job.addFileToClassPath(hdfsPath);
}

static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException
{
log.info("Uploading jar to path[%s]", path);
ByteStreams.copy(
Files.newInputStreamSupplier(jarFile),
new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return fs.create(path);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this retry on transient errors? It seems like we would want a copy to be able to retry (and overwrite if prior was a partial success) when feasible.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate more on which transient error you are referring to ?
At present, this will overwrite any partial written file from previous run if any.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Retry logic to JobHelper.setupClasspath

}
}
);
}

static boolean isSnapshot(File jarFile)
{
return SNAPSHOT_JAR.matcher(jarFile.getName()).matches();
}

public static void injectSystemProperties(Job job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,17 @@ public static Path getTaskPath(JobID jobID, TaskAttemptID taskAttemptID, Path wo
return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString());
}

public static Path getJobClassPathDir(String jobName, Path workingDirectory) throws IOException
{
return new Path(workingDirectory, jobName.replace(":", ""));
}

public static void cleanup(Job job) throws IOException
{
final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory());
final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
fs.delete(jobDir, true);
fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method could use some extra assurances / retries as well. (non-blocking)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to do this for cleanup for other Hadoop Jobs as well, would like to do it in a separate PR. Opening a github issue

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}


Expand Down Expand Up @@ -231,7 +237,11 @@ public List<DataSegment> run() throws IOException
job.setMapSpeculativeExecution(false);
job.setOutputFormatClass(ConvertingOutputFormat.class);

JobHelper.setupClasspath(JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), job);
JobHelper.setupClasspath(
JobHelper.distributedClassPath(jobConf.getWorkingDirectory()),
JobHelper.distributedClassPath(getJobClassPathDir(job.getJobName(), jobConf.getWorkingDirectory())),
job
);

Throwable throwable = null;
try {
Expand Down
Loading