diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 5be81e97a73c..45db63a31af8 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -104,6 +104,52 @@
hamcrest-all
test
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.compile.version}
+ test-jar
+ test
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+ ${hadoop.compile.version}
+ test-jar
+ test
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop.compile.version}
+ test
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ ${hadoop.compile.version}
+ test-jar
+ test
+
+
+ io.druid
+ druid-server
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ org.apache.derby
+ derbyclient
+ test
+
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
index 3233a1c44a1a..f4e96b5cc059 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java
@@ -102,7 +102,7 @@ public boolean run()
} else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
}
- JobHelper.setupClasspath(config, groupByJob);
+ JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
index 7868edf88aa8..62dd90470ee1 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
@@ -135,7 +135,7 @@ public boolean run()
groupByJob.setOutputKeyClass(BytesWritable.class);
groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
- JobHelper.setupClasspath(config, groupByJob);
+ JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), groupByJob);
config.addInputPaths(groupByJob);
config.addJobProperties(groupByJob);
@@ -186,7 +186,7 @@ public boolean run()
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setPartitionerClass(DeterminePartitionsDimSelectionPartitioner.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
- JobHelper.setupClasspath(config, dimSelectionJob);
+ JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), dimSelectionJob);
config.addJobProperties(dimSelectionJob);
config.intoConfiguration(dimSelectionJob);
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
index 09ab8db062d3..1202ab5dabcb 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java
@@ -176,7 +176,7 @@ public boolean run()
config.intoConfiguration(job);
- JobHelper.setupClasspath(config, job);
+ JobHelper.setupClasspath(JobHelper.distributedClassPath(config.getWorkingPath()), job);
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
index 36ee1ee80a9f..688bd8028898 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java
@@ -26,6 +26,7 @@
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
+import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -46,8 +48,10 @@
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
@@ -58,6 +62,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
@@ -68,12 +73,21 @@ public class JobHelper
private static final Set existing = Sets.newHashSet();
- private static final int NUM_RETRIES = 6;
- private static final int SECONDS_BETWEEN_RETRIES = 10;
+ private static final int NUM_RETRIES = 8;
+ private static final int SECONDS_BETWEEN_RETRIES = 2;
+ public static Path distributedClassPath(String path)
+ {
+ return distributedClassPath(new Path(path));
+ }
+
+ public static Path distributedClassPath(Path base)
+ {
+ return new Path(base, "classpath");
+ }
public static void setupClasspath(
- HadoopDruidIndexerConfig config,
+ Path distributedClassPath,
Job job
)
throws IOException
@@ -86,7 +100,6 @@ public static void setupClasspath(
String[] jarFiles = classpathProperty.split(File.pathSeparator);
final Configuration conf = job.getConfiguration();
- final Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
final FileSystem fs = distributedClassPath.getFileSystem(conf);
if (fs instanceof LocalFileSystem) {
@@ -216,7 +229,7 @@ public static DataSegment serializeOutIndex(
DataPusher.class, new DataPusher()
{
@Override
- public void push() throws IOException
+ public long push() throws IOException
{
try (OutputStream outputStream = fileContext.create(
tmpPath,
@@ -231,9 +244,10 @@ public void push() throws IOException
log.error(exception, "Exception in retry loop");
throw exception;
}
+ return -1;
}
},
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+ RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
zipPusher.push();
log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri());
@@ -294,7 +308,7 @@ public static void writeSegmentDescriptor(
DataPusher.class, new DataPusher()
{
@Override
- public void push() throws IOException
+ public long push() throws IOException
{
try {
progressable.progress();
@@ -317,9 +331,10 @@ public void push() throws IOException
log.info(ex, "Error in retry loop");
throw ex;
}
+ return -1;
}
},
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+ RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
);
descriptorPusher.push();
}
@@ -329,7 +344,7 @@ public void push() throws IOException
*/
public interface DataPusher
{
- void push() throws IOException;
+ long push() throws IOException;
}
public static long zipAndCopyDir(
@@ -425,4 +440,108 @@ public static Path prependFSIfNullScheme(FileSystem fs, Path path)
}
return path;
}
+
+ // TODO: Replace this whenever hadoop gets their act together and stops breaking with more recent versions of Guava
+ public static long unzipNoGuava(
+ final Path zip,
+ final Configuration configuration,
+ final File outDir,
+ final Progressable progressable
+ ) throws IOException
+ {
+ final DataPusher zipPusher = (DataPusher) RetryProxy.create(
+ DataPusher.class, new DataPusher()
+ {
+ @Override
+ public long push() throws IOException
+ {
+ final FileContext context = FileContext.getFileContext(zip.toUri(), configuration);
+ long size = 0L;
+ final byte[] buffer = new byte[1 << 13];
+ progressable.progress();
+ try (ZipInputStream in = new ZipInputStream(context.open(zip, 1 << 13))) {
+ for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
+ final String fileName = entry.getName();
+ try (final OutputStream out = new BufferedOutputStream(
+ new FileOutputStream(
+ outDir.getAbsolutePath()
+ + File.separator
+ + fileName
+ ), 1 << 13
+ )) {
+ for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
+ progressable.progress();
+ if (len == 0) {
+ continue;
+ }
+ size += len;
+ out.write(buffer, 0, len);
+ }
+ out.flush();
+ }
+ }
+ }
+ catch (IOException | RuntimeException exception) {
+ log.error(exception, "Exception in retry loop");
+ throw exception;
+ }
+ progressable.progress();
+ return size;
+ }
+ },
+ RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS)
+ );
+ return zipPusher.push();
+ }
+
+ public static ProgressIndicator progressIndicatorForContext(
+ final TaskAttemptContext context
+ )
+ {
+ return new ProgressIndicator()
+ {
+
+ @Override
+ public void progress()
+ {
+ context.progress();
+ }
+
+ @Override
+ public void start()
+ {
+ context.progress();
+ context.setStatus("STARTED");
+ }
+
+ @Override
+ public void stop()
+ {
+ context.progress();
+ context.setStatus("STOPPED");
+ }
+
+ @Override
+ public void startSection(String section)
+ {
+ context.progress();
+ context.setStatus(String.format("STARTED [%s]", section));
+ }
+
+ @Override
+ public void progressSection(String section, String message)
+ {
+ log.info("Progress message for section [%s] : [%s]", section, message);
+ context.progress();
+ context.setStatus(String.format("PROGRESS [%s]", section));
+ }
+
+ @Override
+ public void stopSection(String section)
+ {
+ context.progress();
+ context.setStatus(String.format("STOPPED [%s]", section));
+ }
+ };
+ }
}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java
new file mode 100644
index 000000000000..c961031a685d
--- /dev/null
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexer.updater;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.metamx.common.IAE;
+import com.metamx.common.ISE;
+import com.metamx.common.logger.Logger;
+import io.druid.indexer.JobHelper;
+import io.druid.segment.IndexIO;
+import io.druid.segment.IndexMaker;
+import io.druid.timeline.DataSegment;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.util.Progressable;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class HadoopConverterJob
+{
+ private static final Logger log = new Logger(HadoopConverterJob.class);
+ private static final String COUNTER_GROUP = "Hadoop Druid Converter";
+ private static final String COUNTER_LOADED = "Loaded Bytes";
+ private static final String COUNTER_WRITTEN = "Written Bytes";
+
+ private static void setJobName(JobConf jobConf, List segments)
+ {
+ if (segments.size() == 1) {
+ final DataSegment segment = segments.get(0);
+ jobConf.setJobName(
+ String.format(
+ "druid-convert-%s-%s-%s",
+ segment.getDataSource(),
+ segment.getInterval(),
+ segment.getVersion()
+ )
+ );
+ } else {
+ final Set dataSources = Sets.newHashSet(
+ Iterables.transform(
+ segments,
+ new Function()
+ {
+ @Override
+ public String apply(DataSegment input)
+ {
+ return input.getDataSource();
+ }
+ }
+ )
+ );
+ final Set versions = Sets.newHashSet(
+ Iterables.transform(
+ segments,
+ new Function()
+ {
+ @Override
+ public String apply(DataSegment input)
+ {
+ return input.getVersion();
+ }
+ }
+ )
+ );
+ jobConf.setJobName(
+ String.format(
+ "druid-convert-%s-%s",
+ Arrays.toString(dataSources.toArray()),
+ Arrays.toString(versions.toArray())
+ )
+ );
+ }
+ }
+
+ public static Path getJobPath(JobID jobID, Path workingDirectory)
+ {
+ return new Path(workingDirectory, jobID.toString());
+ }
+
+ public static Path getTaskPath(JobID jobID, TaskAttemptID taskAttemptID, Path workingDirectory)
+ {
+ return new Path(getJobPath(jobID, workingDirectory), taskAttemptID.toString());
+ }
+
+ public static void cleanup(Job job) throws IOException
+ {
+ final Path jobDir = getJobPath(job.getJobID(), job.getWorkingDirectory());
+ final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
+ fs.delete(jobDir, true);
+ }
+
+
+ public static HadoopDruidConverterConfig converterConfigFromConfiguration(Configuration configuration)
+ throws IOException
+ {
+ final String property = Preconditions.checkNotNull(
+ configuration.get(HadoopDruidConverterConfig.CONFIG_PROPERTY),
+ HadoopDruidConverterConfig.CONFIG_PROPERTY
+ );
+ return HadoopDruidConverterConfig.fromString(property);
+ }
+
+ public static void converterConfigIntoConfiguration(
+ HadoopDruidConverterConfig priorConfig,
+ List segments,
+ Configuration configuration
+ )
+ {
+ final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
+ priorConfig.getDataSource(),
+ priorConfig.getInterval(),
+ priorConfig.getIndexSpec(),
+ segments,
+ priorConfig.isValidate(),
+ priorConfig.getDistributedSuccessCache(),
+ priorConfig.getHadoopProperties(),
+ priorConfig.getJobPriority(),
+ priorConfig.getSegmentOutputPath()
+ );
+ try {
+ configuration.set(
+ HadoopDruidConverterConfig.CONFIG_PROPERTY,
+ HadoopDruidConverterConfig.jsonMapper.writeValueAsString(config)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private final HadoopDruidConverterConfig converterConfig;
+ private long loadedBytes = 0L;
+ private long writtenBytes = 0L;
+
+ public HadoopConverterJob(
+ HadoopDruidConverterConfig converterConfig
+ )
+ {
+ this.converterConfig = converterConfig;
+ }
+
+ public List run() throws IOException
+ {
+ final JobConf jobConf = new JobConf();
+ jobConf.setKeepFailedTaskFiles(false);
+ for (Map.Entry entry : converterConfig.getHadoopProperties().entrySet()) {
+ jobConf.set(entry.getKey(), entry.getValue(), "converterConfig.getHadoopProperties()");
+ }
+ final List segments = converterConfig.getSegments();
+ if (segments.isEmpty()) {
+ throw new IAE(
+ "No segments found for datasource [%s]",
+ converterConfig.getDataSource()
+ );
+ }
+ converterConfigIntoConfiguration(converterConfig, segments, jobConf);
+
+ jobConf.setNumReduceTasks(0);// Map only. Number of map tasks determined by input format
+ jobConf.setWorkingDirectory(new Path(converterConfig.getDistributedSuccessCache()));
+
+ setJobName(jobConf, segments);
+
+ if (converterConfig.getJobPriority() != null) {
+ jobConf.setJobPriority(JobPriority.valueOf(converterConfig.getJobPriority()));
+ }
+
+ final Job job = Job.getInstance(jobConf);
+
+ job.setInputFormatClass(ConfigInputFormat.class);
+ job.setMapperClass(ConvertingMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setMapSpeculativeExecution(false);
+ job.setOutputFormatClass(ConvertingOutputFormat.class);
+
+ JobHelper.setupClasspath(JobHelper.distributedClassPath(jobConf.getWorkingDirectory()), job);
+
+ Throwable throwable = null;
+ try {
+ job.submit();
+ log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());
+ final boolean success = job.waitForCompletion(true);
+ if (!success) {
+ final TaskReport[] reports = job.getTaskReports(TaskType.MAP);
+ if (reports != null) {
+ for (final TaskReport report : reports) {
+ log.error("Error in task [%s] : %s", report.getTaskId(), Arrays.toString(report.getDiagnostics()));
+ }
+ }
+ return null;
+ }
+ try {
+ loadedBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_LOADED).getValue();
+ writtenBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_WRITTEN).getValue();
+ }
+ catch (IOException ex) {
+ log.error(ex, "Could not fetch counters");
+ }
+ final JobID jobID = job.getJobID();
+
+ final Path jobDir = getJobPath(jobID, job.getWorkingDirectory());
+ final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
+ final RemoteIterator it = fs.listFiles(jobDir, true);
+ final List goodPaths = new ArrayList<>();
+ while (it.hasNext()) {
+ final LocatedFileStatus locatedFileStatus = it.next();
+ if (locatedFileStatus.isFile()) {
+ final Path myPath = locatedFileStatus.getPath();
+ if (ConvertingOutputFormat.DATA_SUCCESS_KEY.equals(myPath.getName())) {
+ goodPaths.add(new Path(myPath.getParent(), ConvertingOutputFormat.DATA_FILE_KEY));
+ }
+ }
+ }
+ if (goodPaths.isEmpty()) {
+ log.warn("No good data found at [%s]", jobDir);
+ return null;
+ }
+ final List returnList = ImmutableList.copyOf(
+ Lists.transform(
+ goodPaths, new Function()
+ {
+ @Nullable
+ @Override
+ public DataSegment apply(final Path input)
+ {
+ try {
+ if (!fs.exists(input)) {
+ throw new ISE(
+ "Somehow [%s] was found but [%s] is missing at [%s]",
+ ConvertingOutputFormat.DATA_SUCCESS_KEY,
+ ConvertingOutputFormat.DATA_FILE_KEY,
+ jobDir
+ );
+ }
+ }
+ catch (final IOException e) {
+ throw Throwables.propagate(e);
+ }
+ try (final InputStream stream = fs.open(input)) {
+ return HadoopDruidConverterConfig.jsonMapper.readValue(stream, DataSegment.class);
+ }
+ catch (final IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+ )
+ );
+ if (returnList.size() == segments.size()) {
+ return returnList;
+ } else {
+ throw new ISE(
+ "Tasks reported success but result length did not match! Expected %d found %d at path [%s]",
+ segments.size(),
+ returnList.size(),
+ jobDir
+ );
+ }
+ }
+ catch (InterruptedException | ClassNotFoundException e) {
+ RuntimeException exception = Throwables.propagate(e);
+ throwable = exception;
+ throw exception;
+ }
+ catch (Throwable t) {
+ throwable = t;
+ throw t;
+ }
+ finally {
+ try {
+ cleanup(job);
+ }
+ catch (IOException e) {
+ if (throwable != null) {
+ throwable.addSuppressed(e);
+ } else {
+ log.error(e, "Could not clean up job [%s]", job.getJobID());
+ }
+ }
+ }
+ }
+
+ public long getLoadedBytes()
+ {
+ return loadedBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return writtenBytes;
+ }
+
+ public static class ConvertingOutputFormat extends OutputFormat
+ {
+ protected static final String DATA_FILE_KEY = "result";
+ protected static final String DATA_SUCCESS_KEY = "_SUCCESS";
+ protected static final String PUBLISHED_SEGMENT_KEY = "io.druid.indexer.updater.converter.publishedSegment";
+ private static final Logger log = new Logger(ConvertingOutputFormat.class);
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ return new RecordWriter()
+ {
+ @Override
+ public void write(Text key, Text value) throws IOException, InterruptedException
+ {
+ // NOOP
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException
+ {
+ // NOOP
+ }
+ };
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
+ {
+ // NOOP
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(final TaskAttemptContext context)
+ throws IOException, InterruptedException
+ {
+ return new OutputCommitter()
+ {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException
+ {
+ // NOOP
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException
+ {
+ // NOOP
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException
+ {
+ return taskContext.getConfiguration().get(PUBLISHED_SEGMENT_KEY) != null;
+ }
+
+ @Override
+ public void commitTask(final TaskAttemptContext taskContext) throws IOException
+ {
+ final Progressable commitProgressable = new Progressable()
+ {
+ @Override
+ public void progress()
+ {
+ taskContext.progress();
+ }
+ };
+ final String finalSegmentString = taskContext.getConfiguration().get(PUBLISHED_SEGMENT_KEY);
+ if (finalSegmentString == null) {
+ throw new IOException("Could not read final segment");
+ }
+ final DataSegment newSegment = HadoopDruidConverterConfig.jsonMapper.readValue(
+ finalSegmentString,
+ DataSegment.class
+ );
+ log.info("Committing new segment [%s]", newSegment);
+ taskContext.progress();
+
+ final FileSystem fs = taskContext.getWorkingDirectory().getFileSystem(taskContext.getConfiguration());
+ final Path taskAttemptDir = getTaskPath(
+ context.getJobID(),
+ context.getTaskAttemptID(),
+ taskContext.getWorkingDirectory()
+ );
+ final Path taskAttemptFile = new Path(taskAttemptDir, DATA_FILE_KEY);
+ final Path taskAttemptSuccess = new Path(taskAttemptDir, DATA_SUCCESS_KEY);
+ try (final OutputStream outputStream = fs.create(taskAttemptFile, false, 1 << 10, commitProgressable)) {
+ outputStream.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsBytes(newSegment));
+ }
+
+ fs.create(taskAttemptSuccess, false).close();
+
+ taskContext.progress();
+ taskContext.setStatus("Committed");
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException
+ {
+ log.warn("Aborting task. Nothing to clean up.");
+ }
+ };
+ }
+ }
+
+
+ public static class ConvertingMapper extends Mapper
+ {
+ private static final Logger log = new Logger(ConvertingMapper.class);
+ private static final String TMP_FILE_LOC_KEY = "io.druid.indexer.updater.converter.reducer.tmpDir";
+
+ @Override
+ protected void map(
+ String key, String value,
+ final Context context
+ ) throws IOException, InterruptedException
+ {
+ final InputSplit split = context.getInputSplit();
+ if (!(split instanceof DataSegmentSplit)) {
+ throw new IAE(
+ "Unexpected split type. Expected [%s] was [%s]",
+ DataSegmentSplit.class.getCanonicalName(),
+ split.getClass().getCanonicalName()
+ );
+ }
+
+ final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
+ final File tmpDir = Paths.get(tmpDirLoc).toFile();
+
+ final DataSegment segment = ((DataSegmentSplit) split).getDataSegment();
+
+ final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
+
+ context.setStatus("DOWNLOADING");
+ context.progress();
+ final Path inPath = new Path(getURIFromSegment(segment));
+ final File inDir = new File(tmpDir, "in");
+
+ if (inDir.exists() && !inDir.delete()) {
+ log.warn("Could not delete [%s]", inDir);
+ }
+
+ if (!inDir.mkdir() && (!inDir.exists() || inDir.isDirectory())) {
+ log.warn("Unable to make directory");
+ }
+
+ final long inSize = JobHelper.unzipNoGuava(inPath, context.getConfiguration(), inDir, context);
+ log.debug("Loaded %d bytes into [%s] for converting", inSize, inDir.getAbsolutePath());
+ context.getCounter(COUNTER_GROUP, COUNTER_LOADED).increment(inSize);
+
+ context.setStatus("CONVERTING");
+ context.progress();
+ final File outDir = new File(tmpDir, "out");
+ if (!outDir.mkdir() && (!outDir.exists() || !outDir.isDirectory())) {
+ throw new IOException(String.format("Could not create output directory [%s]", outDir));
+ }
+ IndexMaker.convert(
+ inDir,
+ outDir,
+ config.getIndexSpec(),
+ JobHelper.progressIndicatorForContext(context)
+ );
+ if (config.isValidate()) {
+ context.setStatus("Validating");
+ IndexIO.DefaultIndexIOHandler.validateTwoSegments(inDir, outDir);
+ }
+ context.progress();
+ context.setStatus("Starting PUSH");
+ final Path baseOutputPath = new Path(config.getSegmentOutputPath());
+ final FileSystem outputFS = baseOutputPath.getFileSystem(context.getConfiguration());
+ final DataSegment finalSegmentTemplate = segment.withVersion(
+ segment.getVersion()
+ + "_converted"
+ );
+ final DataSegment finalSegment = JobHelper.serializeOutIndex(
+ finalSegmentTemplate,
+ context.getConfiguration(),
+ context,
+ context.getTaskAttemptID(),
+ outDir,
+ JobHelper.makeSegmentOutputPath(
+ baseOutputPath,
+ outputFS,
+ finalSegmentTemplate.getDataSource(),
+ finalSegmentTemplate.getVersion(),
+ finalSegmentTemplate.getInterval(),
+ finalSegmentTemplate.getShardSpec().getPartitionNum()
+ )
+ );
+ context.progress();
+ context.setStatus("Finished PUSH");
+ final String finalSegmentString = HadoopDruidConverterConfig.jsonMapper.writeValueAsString(finalSegment);
+ context.getConfiguration().set(ConvertingOutputFormat.PUBLISHED_SEGMENT_KEY, finalSegmentString);
+ context.write(new Text("dataSegment"), new Text(finalSegmentString));
+
+ context.getCounter(COUNTER_GROUP, COUNTER_WRITTEN).increment(finalSegment.getSize());
+ context.progress();
+ context.setStatus("Ready To Commit");
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException
+ {
+ final File tmpFile = Files.createTempDir();
+ context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath());
+ }
+
+ private static URI getURIFromSegment(DataSegment dataSegment)
+ {
+ // There is no good way around this...
+ // TODO: add getURI() to URIDataPuller
+ final Map loadSpec = dataSegment.getLoadSpec();
+ final String type = loadSpec.get("type").toString();
+ final URI segmentLocURI;
+ if ("s3_zip".equals(type)) {
+ segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
+ } else if ("hdfs".equals(type)) {
+ segmentLocURI = URI.create(loadSpec.get("path").toString());
+ } else if ("local".equals(type)) {
+ try {
+ segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
+ }
+ catch (URISyntaxException e) {
+ throw new ISE(e, "Unable to form simple file uri");
+ }
+ } else {
+ try {
+ throw new IAE(
+ "Cannot figure out loadSpec %s",
+ HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE("Cannot write Map with json mapper");
+ }
+ }
+ return segmentLocURI;
+ }
+
+ @Override
+ protected void cleanup(
+ Context context
+ ) throws IOException, InterruptedException
+ {
+ final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
+ final File tmpDir = Paths.get(tmpDirLoc).toFile();
+ FileUtils.deleteDirectory(tmpDir);
+ context.progress();
+ context.setStatus("Clean");
+ }
+ }
+
+ public static class DataSegmentSplit extends InputSplit implements Writable
+ {
+ private DataSegment dataSegment = null;
+
+ public DataSegmentSplit()
+ {
+ // For serialization purposes
+ }
+
+ public DataSegmentSplit(@NotNull DataSegment dataSegment)
+ {
+ this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment");
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException
+ {
+ return dataSegment.getSize();
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException
+ {
+ return new String[]{};
+ }
+
+ protected DataSegment getDataSegment()
+ {
+ return dataSegment;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException
+ {
+ out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException
+ {
+ dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class);
+ }
+ }
+
+ public static class ConfigInputFormat extends InputFormat
+ {
+ @Override
+ public List getSplits(final JobContext jobContext) throws IOException, InterruptedException
+ {
+ final HadoopDruidConverterConfig config = converterConfigFromConfiguration(jobContext.getConfiguration());
+ final List segments = config.getSegments();
+ if (segments == null) {
+ throw new IOException("Bad config, missing segments");
+ }
+ return Lists.transform(
+ segments, new Function()
+ {
+ @Nullable
+ @Override
+ public InputSplit apply(DataSegment input)
+ {
+ return new DataSegmentSplit(input);
+ }
+ }
+ );
+ }
+
+ @Override
+ public RecordReader createRecordReader(
+ final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext
+ ) throws IOException, InterruptedException
+ {
+ return new RecordReader()
+ {
+ boolean readAnything = false;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException
+ {
+ // NOOP
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException
+ {
+ return !readAnything;
+ }
+
+ @Override
+ public String getCurrentKey() throws IOException, InterruptedException
+ {
+ return "key";
+ }
+
+ @Override
+ public String getCurrentValue() throws IOException, InterruptedException
+ {
+ readAnything = true;
+ return "fakeValue";
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException
+ {
+ return readAnything ? 0.0F : 1.0F;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ // NOOP
+ }
+ };
+ }
+ }
+}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java
new file mode 100644
index 000000000000..3c822ca5ecea
--- /dev/null
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexer.updater;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import io.druid.guice.GuiceInjectors;
+import io.druid.guice.JsonConfigProvider;
+import io.druid.guice.annotations.Self;
+import io.druid.initialization.Initialization;
+import io.druid.segment.IndexSpec;
+import io.druid.server.DruidNode;
+import io.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public class HadoopDruidConverterConfig
+{
+ public static final String CONFIG_PROPERTY = "io.druid.indexer.updater.converter";
+ public static final ObjectMapper jsonMapper;
+ private static final Injector injector = Initialization.makeInjectorWithModules(
+ GuiceInjectors.makeStartupInjector(),
+ ImmutableList.of(
+ new Module()
+ {
+ @Override
+ public void configure(Binder binder)
+ {
+ JsonConfigProvider.bindInstance(
+ binder, Key.get(DruidNode.class, Self.class), new DruidNode("hadoop-converter", null, null)
+ );
+ }
+ }
+ )
+ );
+
+ static {
+ jsonMapper = injector.getInstance(ObjectMapper.class);
+ jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class);
+ }
+
+ private static final TypeReference