diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 578cd97e2c05..25672e85cad8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -53,9 +53,7 @@ import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; @@ -246,7 +244,8 @@ public PartitionsSpec getPartitionsSpec() return schema.getTuningConfig().getPartitionsSpec(); } - public IndexSpec getIndexSpec() { + public IndexSpec getIndexSpec() + { return schema.getTuningConfig().getIndexSpec(); } @@ -488,35 +487,6 @@ public Path makeDescriptorInfoPath(DataSegment segment) return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); } - public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket) - { - final Interval bucketInterval = schema.getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(); - if (fileSystem instanceof DistributedFileSystem) { - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - schema.getIOConfig().getSegmentOutputPath(), - schema.getDataSchema().getDataSource(), - bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()), - bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()), - schema.getTuningConfig().getVersion().replace(":", "_"), - bucket.partitionNum - ) - ); - } - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - schema.getIOConfig().getSegmentOutputPath(), - schema.getDataSchema().getDataSource(), - bucketInterval.getStart().toString(), - bucketInterval.getEnd().toString(), - schema.getTuningConfig().getVersion(), - bucket.partitionNum - ) - ); - } - public void addJobProperties(Job job) { Configuration conf = job.getConfiguration(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index f36ac384e960..09ab8db062d3 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -22,17 +22,14 @@ import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; -import com.google.common.io.Closeables; import com.google.common.primitives.Longs; import com.metamx.common.IAE; import com.metamx.common.ISE; -import com.metamx.common.guava.CloseQuietly; import com.metamx.common.logger.Logger; import com.metamx.common.parsers.ParseException; import io.druid.collections.StupidPool; @@ -43,11 +40,9 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMaker; -import io.druid.segment.IndexSpec; import io.druid.segment.LoggingProgressIndicator; import io.druid.segment.ProgressIndicator; import io.druid.segment.QueryableIndex; -import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OffheapIncrementalIndex; @@ -56,7 +51,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -71,21 +66,13 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.joda.time.DateTime; import org.joda.time.Interval; -import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; -import java.net.URI; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; /** */ @@ -160,7 +147,7 @@ public boolean run() SortableBytes.useSortableBytesAsMapOutputKey(job); int numReducers = Iterables.size(config.getAllBuckets().get()); - if(numReducers == 0) { + if (numReducers == 0) { throw new RuntimeException("No buckets?? seems there is no data to index."); } job.setNumReduceTasks(numReducers); @@ -179,8 +166,8 @@ public boolean run() // once IndexIO doesn't rely on globally injected properties, we can move this into the HadoopTuningConfig. final String bitmapProperty = "druid.processing.bitmap.type"; final String bitmapType = HadoopDruidIndexerConfig.properties.getProperty(bitmapProperty); - if(bitmapType != null) { - for(String property : new String[] {"mapreduce.reduce.java.opts", "mapreduce.map.java.opts"}) { + if (bitmapType != null) { + for (String property : new String[]{"mapreduce.reduce.java.opts", "mapreduce.map.java.opts"}) { // prepend property to allow overriding using hadoop.xxx properties by JobHelper.injectSystemProperties above String value = Strings.nullToEmpty(job.getConfiguration().get(property)); job.getConfiguration().set(property, String.format("-D%s=%s %s", bitmapProperty, bitmapType, value)); @@ -376,7 +363,8 @@ protected void reduce( allDimensionNames.addAll(inputRow.getDimensions()); numRows = index.add(inputRow); - } catch (ParseException e) { + } + catch (ParseException e) { if (config.isIgnoreInvalidRows()) { log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); @@ -437,245 +425,57 @@ protected void reduce( indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator ); } - serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); - for (File file : toMerge) { - FileUtils.deleteDirectory(file); - } - } - finally { - index.close(); - } - } - - private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List dimensionNames) - throws IOException - { - Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); - - int attemptNumber = context.getTaskAttemptID().getId(); - - final FileSystem intermediateFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()).getFileSystem( - context.getConfiguration() - ); - final Path indexBasePath = config.makeSegmentOutputPath(outputFS, bucket); - final Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - - outputFS.mkdirs(indexBasePath); - - Exception caughtException = null; - ZipOutputStream out = null; - long size = 0; - try { - out = new ZipOutputStream(new BufferedOutputStream(outputFS.create(indexZipFilePath), 256 * 1024)); - - List filesToCopy = Arrays.asList(mergedBase.list()); - - for (String file : filesToCopy) { - size += copyFile(context, out, mergedBase, file); - } - } - catch (Exception e) { - caughtException = e; - } - finally { - if (caughtException == null) { - Closeables.close(out, false); - } else { - CloseQuietly.close(out); - throw Throwables.propagate(caughtException); - } - } - - Path finalIndexZipFilePath = new Path(indexBasePath, "index.zip"); - final URI indexOutURI = finalIndexZipFilePath.toUri(); - ImmutableMap loadSpec; - - // We do String comparison instead of instanceof checks here because in Hadoop 2.6.0 - // NativeS3FileSystem got moved to a separate jar (hadoop-aws) that is not guaranteed - // to be part of the core code anymore. The instanceof check requires that the class exist - // but we do not have any guarantee that it will exist, so instead we must pull out - // the String name of it and verify that. We do a full package-qualified test in order - // to be as explicit as possible. - String fsClazz = outputFS.getClass().getName(); - if ("org.apache.hadoop.fs.s3native.NativeS3FileSystem".equals(fsClazz)) { - loadSpec = ImmutableMap.of( - "type", "s3_zip", - "bucket", indexOutURI.getHost(), - "key", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - } else if ("org.apache.hadoop.fs.LocalFileSystem".equals(fsClazz)) { - loadSpec = ImmutableMap.of( - "type", "local", - "path", indexOutURI.getPath() + final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()) + .getFileSystem(context.getConfiguration()); + final DataSegment segment = JobHelper.serializeOutIndex( + new DataSegment( + config.getDataSource(), + interval, + config.getSchema().getTuningConfig().getVersion(), + null, + ImmutableList.copyOf(allDimensionNames), + metricNames, + config.getShardSpec(bucket).getActualSpec(), + -1, + -1 + ), + context.getConfiguration(), + context, + context.getTaskAttemptID(), + mergedBase, + JobHelper.makeSegmentOutputPath( + new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), + outputFS, + config.getSchema().getDataSchema().getDataSource(), + config.getSchema().getTuningConfig().getVersion(), + config.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + bucket.partitionNum + ) ); - } else if ("org.apache.hadoop.hdfs.DistributedFileSystem".equals(fsClazz)) { - loadSpec = ImmutableMap.of( - "type", "hdfs", - "path", indexOutURI.toString() - ); - } else { - throw new ISE("Unknown file system[%s]", fsClazz); - } - - DataSegment segment = new DataSegment( - config.getDataSource(), - interval, - config.getSchema().getTuningConfig().getVersion(), - loadSpec, - dimensionNames, - metricNames, - config.getShardSpec(bucket).getActualSpec(), - SegmentUtils.getVersionFromDir(mergedBase), - size - ); - - // retry 1 minute - boolean success = false; - for (int i = 0; i < 6; i++) { - if (renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { - log.info("Successfully renamed [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); - success = true; - break; - } else { - log.info("Failed to rename [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); - try { - Thread.sleep(10000); - context.progress(); - } - catch (InterruptedException e) { - throw new ISE( - "Thread error in retry loop for renaming [%s] to [%s]", - indexZipFilePath.toUri().getPath(), - finalIndexZipFilePath.toUri().getPath() - ); - } - } - } - - if (!success) { - if (!outputFS.exists(indexZipFilePath)) { - throw new ISE("File [%s] does not exist after retry loop.", indexZipFilePath.toUri().getPath()); - } - if (outputFS.getFileStatus(indexZipFilePath).getLen() == outputFS.getFileStatus(finalIndexZipFilePath) - .getLen()) { - outputFS.delete(indexZipFilePath, true); - } else { - outputFS.delete(finalIndexZipFilePath, true); - if (!renameIndexFiles(intermediateFS, outputFS, indexBasePath, indexZipFilePath, finalIndexZipFilePath, segment)) { - throw new ISE( - "Files [%s] and [%s] are different, but still cannot rename after retry loop", - indexZipFilePath.toUri().getPath(), - finalIndexZipFilePath.toUri().getPath() - ); - } - } - } - } - - private boolean renameIndexFiles( - FileSystem intermediateFS, - FileSystem outputFS, - Path indexBasePath, - Path indexZipFilePath, - Path finalIndexZipFilePath, - DataSegment segment - ) - throws IOException - { - final boolean needRename; - - if (outputFS.exists(finalIndexZipFilePath)) { - // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first - final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath); - final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath); - - if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() - || zipFile.getLen() != finalIndexZipFile.getLen()) { - log.info( - "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", - finalIndexZipFile.getPath(), - new DateTime(finalIndexZipFile.getModificationTime()), - finalIndexZipFile.getLen(), - zipFile.getPath(), - new DateTime(zipFile.getModificationTime()), - zipFile.getLen() - ); - outputFS.delete(finalIndexZipFilePath, false); - needRename = true; - } else { - log.info( - "File[%s / %s / %sB] existed and will be kept", - finalIndexZipFile.getPath(), - new DateTime(finalIndexZipFile.getModificationTime()), - finalIndexZipFile.getLen() - ); - needRename = false; - } - } else { - needRename = true; - } - - if (needRename && !outputFS.rename(indexZipFilePath, finalIndexZipFilePath)) { - return false; - } - - writeSegmentDescriptor(outputFS, segment, new Path(indexBasePath, "descriptor.json")); - final Path descriptorPath = config.makeDescriptorInfoPath(segment); - log.info("Writing descriptor to path[%s]", descriptorPath); - intermediateFS.mkdirs(descriptorPath.getParent()); - writeSegmentDescriptor(intermediateFS, segment, descriptorPath); - - return true; - } - - private void writeSegmentDescriptor(FileSystem outputFS, DataSegment segment, Path descriptorPath) - throws IOException - { - if (outputFS.exists(descriptorPath)) { - outputFS.delete(descriptorPath, false); - } - - final FSDataOutputStream descriptorOut = outputFS.create(descriptorPath); - try { - HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment); - } - finally { - descriptorOut.close(); - } - } - - private long copyFile( - Context context, ZipOutputStream out, File mergedBase, final String filename - ) throws IOException - { - createNewZipEntry(out, filename); - long numRead = 0; - - InputStream in = null; - try { - in = new FileInputStream(new File(mergedBase, filename)); - byte[] buf = new byte[0x10000]; - int read; - while (true) { - read = in.read(buf); - if (read == -1) { - break; - } + Path descriptorPath = config.makeDescriptorInfoPath(segment); + descriptorPath = JobHelper.prependFSIfNullScheme( + FileSystem.get( + descriptorPath.toUri(), + context.getConfiguration() + ), descriptorPath + ); - out.write(buf, 0, read); - numRead += read; - context.progress(); + log.info("Writing descriptor to path[%s]", descriptorPath); + JobHelper.writeSegmentDescriptor( + config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()), + segment, + descriptorPath, + FileContext.getFileContext(descriptorPath.toUri(), context.getConfiguration()), + context + ); + for (File file : toMerge) { + FileUtils.deleteDirectory(file); } } finally { - CloseQuietly.close(in); + index.close(); } - out.closeEntry(); - context.progress(); - - return numRead; } private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs, StupidPool bufferPool) @@ -702,12 +502,6 @@ private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactor ); } } - - private void createNewZipEntry(ZipOutputStream out, String name) throws IOException - { - log.info("Creating new ZipEntry[%s]", name); - out.putNextEntry(new ZipEntry(name)); - } } public static class IndexGeneratorOutputFormat extends TextOutputFormat diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 8c169e8785bc..36ee1ee80a9f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -18,26 +18,47 @@ package io.druid.indexer; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; +import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import io.druid.segment.SegmentUtils; +import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.util.Progressable; +import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; +import java.net.URI; +import java.util.Arrays; +import java.util.EnumSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; /** */ @@ -47,6 +68,9 @@ public class JobHelper private static final Set existing = Sets.newHashSet(); + private static final int NUM_RETRIES = 6; + private static final int SECONDS_BETWEEN_RETRIES = 10; + public static void setupClasspath( HadoopDruidIndexerConfig config, @@ -103,7 +127,8 @@ public static void injectSystemProperties(Job job) injectSystemProperties(job.getConfiguration()); } - public static Configuration injectSystemProperties(Configuration conf) { + public static Configuration injectSystemProperties(Configuration conf) + { for (String propName : System.getProperties().stringPropertyNames()) { if (propName.startsWith("hadoop.")) { conf.set(propName.substring("hadoop.".length()), System.getProperty(propName)); @@ -164,7 +189,7 @@ public static boolean runJobs(List jobs, HadoopDruidIndexerConfig config) public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig) { - if(indexerConfig.getInputFormatClass() != null) { + if (indexerConfig.getInputFormatClass() != null) { job.setInputFormatClass(indexerConfig.getInputFormatClass()); } else if (indexerConfig.isCombineText()) { job.setInputFormatClass(CombineTextInputFormat.class); @@ -172,4 +197,232 @@ public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfi job.setInputFormatClass(TextInputFormat.class); } } + + public static DataSegment serializeOutIndex( + final DataSegment segmentTemplate, + final Configuration configuration, + final Progressable progressable, + final TaskAttemptID taskAttemptID, + final File mergedBase, + final Path segmentBasePath + ) + throws IOException + { + final FileSystem outputFS = FileSystem.get(segmentBasePath.toUri(), configuration); + final FileContext fileContext = FileContext.getFileContext(segmentBasePath.toUri(), configuration); + final Path tmpPath = new Path(segmentBasePath, String.format("index.zip.%d", taskAttemptID.getId())); + final AtomicLong size = new AtomicLong(0L); + final DataPusher zipPusher = (DataPusher) RetryProxy.create( + DataPusher.class, new DataPusher() + { + @Override + public void push() throws IOException + { + try (OutputStream outputStream = fileContext.create( + tmpPath, + EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE), + Options.CreateOpts.createParent(), + Options.CreateOpts.bufferSize(256 * 1024) + )) { + size.set(zipAndCopyDir(mergedBase, outputStream, progressable)); + outputStream.flush(); + } + catch (IOException | RuntimeException exception) { + log.error(exception, "Exception in retry loop"); + throw exception; + } + } + }, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) + ); + zipPusher.push(); + log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri()); + + final Path finalIndexZipFilePath = new Path(segmentBasePath, "index.zip"); + final URI indexOutURI = finalIndexZipFilePath.toUri(); + final ImmutableMap loadSpec; + // TODO: Make this a part of Pushers or Pullers + switch (outputFS.getScheme()) { + case "hdfs": + loadSpec = ImmutableMap.of( + "type", "hdfs", + "path", indexOutURI.toString() + ); + break; + case "s3": + case "s3n": + loadSpec = ImmutableMap.of( + "type", "s3_zip", + "bucket", indexOutURI.getHost(), + "key", indexOutURI.getPath().substring(1) // remove the leading "/" + ); + break; + case "file": + loadSpec = ImmutableMap.of( + "type", "local", + "path", indexOutURI.getPath() + ); + break; + default: + throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme()); + } + final DataSegment finalSegment = segmentTemplate + .withLoadSpec(loadSpec) + .withSize(size.get()) + .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); + fileContext.rename(tmpPath, finalIndexZipFilePath, Options.Rename.OVERWRITE); + writeSegmentDescriptor( + outputFS, + finalSegment, + new Path(segmentBasePath, "descriptor.json"), + fileContext, + progressable + ); + return finalSegment; + } + + public static void writeSegmentDescriptor( + final FileSystem outputFS, + final DataSegment segment, + final Path descriptorPath, + final FileContext fileContext, + final Progressable progressable + ) + throws IOException + { + final DataPusher descriptorPusher = (DataPusher) RetryProxy.create( + DataPusher.class, new DataPusher() + { + @Override + public void push() throws IOException + { + try { + progressable.progress(); + if (outputFS.exists(descriptorPath)) { + if (!fileContext.delete(descriptorPath, false)) { + throw new IOException(String.format("Failed to delete descriptor at [%s]", descriptorPath)); + } + } + try (final OutputStream descriptorOut = fileContext.create( + descriptorPath, + EnumSet.of(CreateFlag.OVERWRITE, CreateFlag.CREATE), + Options.CreateOpts.bufferSize(256 * 1024), + Options.CreateOpts.createParent() + )) { + HadoopDruidIndexerConfig.jsonMapper.writeValue(descriptorOut, segment); + descriptorOut.flush(); + } + } + catch (RuntimeException | IOException ex) { + log.info(ex, "Error in retry loop"); + throw ex; + } + } + }, + RetryPolicies.retryUpToMaximumCountWithFixedSleep(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) + ); + descriptorPusher.push(); + } + + /** + * Simple interface for retry operations + */ + public interface DataPusher + { + void push() throws IOException; + } + + public static long zipAndCopyDir( + File baseDir, + OutputStream baseOutputStream, + Progressable progressable + ) throws IOException + { + long size = 0L; + try (ZipOutputStream outputStream = new ZipOutputStream(baseOutputStream)) { + List filesToCopy = Arrays.asList(baseDir.list()); + for (String fileName : filesToCopy) { + final File fileToCopy = new File(baseDir, fileName); + if (java.nio.file.Files.isRegularFile(fileToCopy.toPath())) { + size += copyFileToZipStream(fileToCopy, outputStream, progressable); + } else { + log.warn("File at [%s] is not a regular file! skipping as part of zip", fileToCopy.getPath()); + } + } + outputStream.flush(); + } + return size; + } + + public static long copyFileToZipStream( + File file, + ZipOutputStream zipOutputStream, + Progressable progressable + ) throws IOException + { + createNewZipEntry(zipOutputStream, file); + long numRead = 0; + try (FileInputStream inputStream = new FileInputStream(file)) { + byte[] buf = new byte[0x10000]; + for (int bytesRead = inputStream.read(buf); bytesRead >= 0; bytesRead = inputStream.read(buf)) { + progressable.progress(); + if (bytesRead == 0) { + continue; + } + zipOutputStream.write(buf, 0, bytesRead); + progressable.progress(); + numRead += bytesRead; + } + } + zipOutputStream.closeEntry(); + progressable.progress(); + return numRead; + } + + private static void createNewZipEntry(ZipOutputStream out, File file) throws IOException + { + log.info("Creating new ZipEntry[%s]", file.getName()); + out.putNextEntry(new ZipEntry(file.getName())); + } + + public static Path makeSegmentOutputPath( + Path basePath, + FileSystem fileSystem, + String dataSource, + String version, + Interval interval, + int partitionNum + ) + { + Path outputPath = new Path(prependFSIfNullScheme(fileSystem, basePath), "./" + dataSource); + if ("hdfs".equals(fileSystem.getScheme())) { + outputPath = new Path( + outputPath, String.format( + "./%s_%s", + interval.getStart().toString(ISODateTimeFormat.basicDateTime()), + interval.getEnd().toString(ISODateTimeFormat.basicDateTime()) + ) + ); + outputPath = new Path(outputPath, version.replace(":", "_")); + } else { + outputPath = new Path( + outputPath, String.format( + "./%s_%s", + interval.getStart().toString(), + interval.getEnd().toString() + ) + ); + outputPath = new Path(outputPath, String.format("./%s", version)); + } + outputPath = new Path(outputPath, Integer.toString(partitionNum)); + return outputPath; + } + + public static Path prependFSIfNullScheme(FileSystem fs, Path path) + { + if (path.toUri().getScheme() == null) { + path = new Path(fs.getUri().toString(), String.format("./%s", path)); + } + return path; + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 2e9359b6ea89..b3aca9d251b3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -96,7 +96,14 @@ public void shouldMakeHDFSCompliantSegmentOutputPath() ); Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); - Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket); + Path path = JobHelper.makeSegmentOutputPath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new DistributedFileSystem(), + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getTuningConfig().getVersion(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + bucket.partitionNum + ); Assert.assertEquals( "hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString() @@ -142,9 +149,16 @@ public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() ); Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); - Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket); + Path path = JobHelper.makeSegmentOutputPath( + new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()), + new LocalFileSystem(), + cfg.getSchema().getDataSchema().getDataSource(), + cfg.getSchema().getTuningConfig().getVersion(), + cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(), + bucket.partitionNum + ); Assert.assertEquals( - "/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", + "file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString() );