diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 829a03a68441..594841f5201f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -102,7 +102,11 @@ public boolean run() } else { groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); } - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + groupByJob + ); config.addInputPaths(groupByJob); config.intoConfiguration(groupByJob); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index a502c340c1b0..0fe4bb9c5106 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -136,7 +136,11 @@ public boolean run() groupByJob.setOutputKeyClass(BytesWritable.class); groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + groupByJob + ); config.addInputPaths(groupByJob); config.intoConfiguration(groupByJob); @@ -186,7 +190,11 @@ public boolean run() dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class); dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + dimSelectionJob + ); config.intoConfiguration(dimSelectionJob); FileOutputFormat.setOutputPath(dimSelectionJob, config.makeIntermediatePath()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 16de192049f3..e137d8bae642 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -190,7 +190,11 @@ public boolean run() config.intoConfiguration(job); - JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(config.getWorkingPath()), + JobHelper.distributedClassPath(config.makeIntermediatePath()), + job + ); job.submit(); log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index a9c282ddfadd..dc9c6e53c572 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -18,6 +18,7 @@ package io.druid.indexer; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -33,22 +34,6 @@ import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.util.Progressable; -import org.joda.time.DateTime; -import org.joda.time.Interval; -import org.joda.time.format.ISODateTimeFormat; - import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -64,9 +49,24 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.util.Progressable; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; /** */ @@ -79,6 +79,7 @@ public class JobHelper private static final int NUM_RETRIES = 8; private static final int SECONDS_BETWEEN_RETRIES = 2; private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KB + private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$"); public static Path distributedClassPath(String path) { @@ -90,9 +91,21 @@ public static Path distributedClassPath(Path base) return new Path(base, "classpath"); } + /** + * Uploads jar files to hdfs and configures the classpath. + * Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs. + * Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs. + * + * @param distributedClassPath classpath shared across multiple jobs + * @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files. + * @param job job to run + * + * @throws IOException + */ public static void setupClasspath( - Path distributedClassPath, - Job job + final Path distributedClassPath, + final Path intermediateClassPath, + final Job job ) throws IOException { @@ -111,32 +124,158 @@ public static void setupClasspath( } for (String jarFilePath : jarFiles) { - File jarFile = new File(jarFilePath); + + final File jarFile = new File(jarFilePath); if (jarFile.getName().endsWith(".jar")) { - final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); - - if (!existing.contains(hdfsPath)) { - if (jarFile.getName().matches(".*SNAPSHOT(-selfcontained)?\\.jar$") || !fs.exists(hdfsPath)) { - log.info("Uploading jar to path[%s]", hdfsPath); - ByteStreams.copy( - Files.newInputStreamSupplier(jarFile), - new OutputSupplier() + try { + RetryUtils.retry( + new Callable() + { + @Override + public Boolean call() throws Exception { - @Override - public OutputStream getOutput() throws IOException - { - return fs.create(hdfsPath); + if (isSnapshot(jarFile)) { + addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job); + } else { + addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job); } + return true; } - ); - } + }, + shouldRetryPredicate(), + NUM_RETRIES + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + } - existing.add(hdfsPath); + public static final Predicate shouldRetryPredicate() + { + return new Predicate() + { + @Override + public boolean apply(Throwable input) + { + if (input == null) { + return false; + } + if (input instanceof IOException) { + return true; } + return apply(input.getCause()); + } + }; + } - DistributedCache.addFileToClassPath(hdfsPath, conf, fs); + static void addJarToClassPath( + File jarFile, + Path distributedClassPath, + Path intermediateClassPath, + FileSystem fs, + Job job + ) + throws IOException + { + // Create distributed directory if it does not exist. + // rename will always fail if destination does not exist. + fs.mkdirs(distributedClassPath); + + // Non-snapshot jar files are uploaded to the shared classpath. + final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); + if (!fs.exists(hdfsPath)) { + // Muliple jobs can try to upload the jar here, + // to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath. + final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName()); + uploadJar(jarFile, intermediateHdfsPath, fs); + IOException exception = null; + try { + log.info("Renaming jar to path[%s]", hdfsPath); + fs.rename(intermediateHdfsPath, hdfsPath); + if (!fs.exists(hdfsPath)) { + throw new IOException( + String.format( + "File does not exist even after moving from[%s] to [%s]", + intermediateHdfsPath, + hdfsPath + ) + ); + } + } + catch (IOException e) { + // rename failed, possibly due to race condition. check if some other job has uploaded the jar file. + try { + if (!fs.exists(hdfsPath)) { + log.error(e, "IOException while Renaming jar file"); + exception = e; + } + } + catch (IOException e1) { + e.addSuppressed(e1); + exception = e; + } + } + finally { + try { + if (fs.exists(intermediateHdfsPath)) { + fs.delete(intermediateHdfsPath, false); + } + } + catch (IOException e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + if (exception != null) { + throw exception; + } } } + job.addFileToClassPath(hdfsPath); + } + + static void addSnapshotJarToClassPath( + File jarFile, + Path intermediateClassPath, + FileSystem fs, + Job job + ) throws IOException + { + // Snapshot jars are uploaded to non shared intermediate directory. + final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName()); + + // existing is used to prevent uploading file multiple times in same run. + if (!existing.contains(hdfsPath)) { + uploadJar(jarFile, hdfsPath, fs); + existing.add(hdfsPath); + } + job.addFileToClassPath(hdfsPath); + } + + static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException + { + log.info("Uploading jar to path[%s]", path); + ByteStreams.copy( + Files.newInputStreamSupplier(jarFile), + new OutputSupplier() + { + @Override + public OutputStream getOutput() throws IOException + { + return fs.create(path); + } + } + ); + } + + static boolean isSnapshot(File jarFile) + { + return SNAPSHOT_JAR.matcher(jarFile.getName()).matches(); } public static void injectSystemProperties(Job job) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index a1a411bf3ffb..0e13ed00cf59 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -140,11 +140,17 @@ public static Path getTaskPath(JobID jobID, TaskAttemptID taskAttemptID, Path wo return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString()); } + public static Path getJobClassPathDir(String jobName, Path workingDirectory) throws IOException + { + return new Path(workingDirectory, jobName.replace(":", "")); + } + public static void cleanup(Job job) throws IOException { final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); fs.delete(jobDir, true); + fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); } @@ -231,7 +237,11 @@ public List run() throws IOException job.setMapSpeculativeExecution(false); job.setOutputFormatClass(ConvertingOutputFormat.class); - JobHelper.setupClasspath(JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), job); + JobHelper.setupClasspath( + JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), + JobHelper.distributedClassPath(getJobClassPathDir(job.getJobName(), jobConf.getWorkingDirectory())), + job + ); Throwable throwable = null; try { diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java new file mode 100644 index 000000000000..6204c04d1de1 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HdfsClasspathSetupTest.java @@ -0,0 +1,203 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.StringUtils; +import io.druid.common.utils.UUIDUtils; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import junit.framework.Assert; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class HdfsClasspathSetupTest +{ + private static MiniDFSCluster miniCluster; + private static File hdfsTmpDir; + private static Configuration conf; + private static String dummyJarString = "This is a test jar file."; + private File dummyJarFile; + private Path finalClasspath; + private Path intermediatePath; + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @BeforeClass + public static void setupStatic() throws IOException, ClassNotFoundException + { + hdfsTmpDir = File.createTempFile("hdfsClasspathSetupTest", "dir"); + hdfsTmpDir.deleteOnExit(); + if (!hdfsTmpDir.delete()) { + throw new IOException(String.format("Unable to delete hdfsTmpDir [%s]", hdfsTmpDir.getAbsolutePath())); + } + conf = new Configuration(true); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsTmpDir.getAbsolutePath()); + miniCluster = new MiniDFSCluster.Builder(conf).build(); + } + + @Before + public void setUp() throws IOException + { + // intermedatePath and finalClasspath are relative to hdfsTmpDir directory. + intermediatePath = new Path(String.format("/tmp/classpath/%s", UUIDUtils.generateUuid())); + finalClasspath = new Path(String.format("/tmp/intermediate/%s", UUIDUtils.generateUuid())); + dummyJarFile = tempFolder.newFile("dummy-test.jar"); + Files.copy( + new ByteArrayInputStream(StringUtils.toUtf8(dummyJarString)), + dummyJarFile.toPath(), + StandardCopyOption.REPLACE_EXISTING + ); + } + + @AfterClass + public static void tearDownStatic() throws IOException + { + if (miniCluster != null) { + miniCluster.shutdown(true); + } + } + + @After + public void tearDown() throws IOException + { + dummyJarFile.delete(); + Assert.assertFalse(dummyJarFile.exists()); + miniCluster.getFileSystem().delete(finalClasspath, true); + Assert.assertFalse(miniCluster.getFileSystem().exists(finalClasspath)); + miniCluster.getFileSystem().delete(intermediatePath, true); + Assert.assertFalse(miniCluster.getFileSystem().exists(intermediatePath)); + } + + @Test + public void testAddSnapshotJarToClasspath() throws IOException + { + Job job = Job.getInstance(conf, "test-job"); + DistributedFileSystem fs = miniCluster.getFileSystem(); + Path intermediatePath = new Path("/tmp/classpath"); + JobHelper.addSnapshotJarToClassPath(dummyJarFile, intermediatePath, fs, job); + Path expectedJarPath = new Path(intermediatePath, dummyJarFile.getName()); + // check file gets uploaded to HDFS + Assert.assertTrue(fs.exists(expectedJarPath)); + // check file gets added to the classpath + Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)); + Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath)))); + } + + @Test + public void testAddNonSnapshotJarToClasspath() throws IOException + { + Job job = Job.getInstance(conf, "test-job"); + DistributedFileSystem fs = miniCluster.getFileSystem(); + JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job); + Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); + // check file gets uploaded to final HDFS path + Assert.assertTrue(fs.exists(expectedJarPath)); + // check that the intermediate file gets deleted + Assert.assertFalse(fs.exists(new Path(intermediatePath, dummyJarFile.getName()))); + // check file gets added to the classpath + Assert.assertEquals(expectedJarPath.toString(), job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES)); + Assert.assertEquals(dummyJarString, StringUtils.fromUtf8(IOUtils.toByteArray(fs.open(expectedJarPath)))); + } + + @Test + public void testIsSnapshot() + { + Assert.assertTrue(JobHelper.isSnapshot(new File("test-SNAPSHOT.jar"))); + Assert.assertTrue(JobHelper.isSnapshot(new File("test-SNAPSHOT-selfcontained.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("test.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("test-selfcontained.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("iAmNotSNAPSHOT.jar"))); + Assert.assertFalse(JobHelper.isSnapshot(new File("iAmNotSNAPSHOT-selfcontained.jar"))); + + } + + @Test + public void testConcurrentUpload() throws IOException, InterruptedException, ExecutionException, TimeoutException + { + final int concurrency = 10; + ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(concurrency)); + // barrier ensures that all jobs try to add files to classpath at same time. + final CyclicBarrier barrier = new CyclicBarrier(concurrency); + final DistributedFileSystem fs = miniCluster.getFileSystem(); + final Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); + List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrency; i++) { + futures.add( + pool.submit( + new Callable() + { + @Override + public Boolean call() throws Exception + { + int id = barrier.await(); + Job job = Job.getInstance(conf, "test-job-" + id); + Path intermediatePathForJob = new Path(intermediatePath, "job-" + id); + JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job); + // check file gets uploaded to final HDFS path + Assert.assertTrue(fs.exists(expectedJarPath)); + // check that the intermediate file is not present + Assert.assertFalse(fs.exists(new Path(intermediatePathForJob, dummyJarFile.getName()))); + // check file gets added to the classpath + Assert.assertEquals( + expectedJarPath.toString(), + job.getConfiguration().get(MRJobConfig.CLASSPATH_FILES) + ); + return true; + } + } + ) + ); + } + + Futures.allAsList(futures).get(30, TimeUnit.SECONDS); + + pool.shutdownNow(); + } + +}