From 77264a948f7a6ceebc7f4ced8078d7cb41688759 Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 22 Jun 2016 16:43:41 -0700 Subject: [PATCH 1/3] fix 1828 fixes https://github.com/druid-io/druid/issues/1828 --- .../main/java/io/druid/indexer/JobHelper.java | 23 +++++++++++++++++++ .../indexer/updater/HadoopConverterJob.java | 4 ++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 0148cad7582e..1a6e36ea9933 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -774,4 +775,26 @@ public void stopSection(String section) } }; } + + public static boolean deleteWithRetry(final FileSystem fs, final Path path, final boolean recursive) + { + try { + return RetryUtils.retry( + new Callable() + { + @Override + public Boolean call() throws Exception + { + return fs.delete(path, recursive); + } + }, + shouldRetryPredicate(), + NUM_RETRIES + ); + } + catch (Exception e) { + log.error(e, "Failed to cleanup path[%s]", path); + throw Throwables.propagate(e); + } + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index 451b1fa95614..de4e1ab2ff0a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -147,8 +147,8 @@ public static void cleanup(Job job) throws IOException { final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); - fs.delete(jobDir, true); - fs.delete(getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); + JobHelper.deleteWithRetry(fs, jobDir, true); + JobHelper.deleteWithRetry(fs, getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); } From 9c45ee148756d27a027746116ad1daeae30f83f7 Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 22 Jun 2016 17:43:34 -0700 Subject: [PATCH 2/3] remove unused import --- indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 1a6e36ea9933..d4eda3aeaed4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; From 2cdca4af45f014be9e28f0a9609cb980c1044b6f Mon Sep 17 00:00:00 2001 From: Nishant Date: Thu, 23 Jun 2016 10:48:14 -0700 Subject: [PATCH 3/3] Review comment --- .../indexer/updater/HadoopConverterJob.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index de4e1ab2ff0a..db8d9ff6c725 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -147,8 +147,26 @@ public static void cleanup(Job job) throws IOException { final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory()); final FileSystem fs = jobDir.getFileSystem(job.getConfiguration()); - JobHelper.deleteWithRetry(fs, jobDir, true); - JobHelper.deleteWithRetry(fs, getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); + RuntimeException e = null; + try { + JobHelper.deleteWithRetry(fs, jobDir, true); + } + catch (RuntimeException ex) { + e = ex; + } + try { + JobHelper.deleteWithRetry(fs, getJobClassPathDir(job.getJobName(), job.getWorkingDirectory()), true); + } + catch (RuntimeException ex) { + if (e == null) { + e = ex; + } else { + e.addSuppressed(ex); + } + } + if (e != null) { + throw e; + } }