From f7e309f44595c92148950b328ec7eec526d290ef Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Wed, 15 Feb 2017 10:37:43 -0800 Subject: [PATCH 1/3] Adding s3a schema and s3a implem to hdfs storage module. --- extensions-core/hdfs-storage/pom.xml | 7 ++++++- .../src/main/java/io/druid/indexer/JobHelper.java | 5 ++++- .../java/io/druid/indexing/common/config/TaskConfig.java | 2 +- pom.xml | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index 9b01d7de3304..d2159278e88d 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -140,12 +140,17 @@ emitter provided + + org.apache.hadoop + hadoop-aws + ${hadoop.compile.version} + commons-io commons-io provided - + junit diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 8cb750422ce7..fe43a1073d8e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -421,6 +421,7 @@ public long push() throws IOException case "hdfs": case "viewfs": case "maprfs": + case "s3a": loadSpec = ImmutableMap.of( "type", "hdfs", "path", indexOutURI.toString() @@ -583,7 +584,9 @@ public static Path makeSegmentOutputPath( DataSegment segment ) { - String segmentDir = "hdfs".equals(fileSystem.getScheme()) || "viewfs".equals(fileSystem.getScheme()) + String segmentDir = "hdfs".equals(fileSystem.getScheme()) + || "viewfs".equals(fileSystem.getScheme()) + || "s3a".equals(fileSystem.getScheme()) ? DataSegmentPusherUtil.getHdfsStorageDir(segment) : DataSegmentPusherUtil.getStorageDir(segment); return new Path(prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir)); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index e3fb7be6f9d7..eff0df46f384 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.3.0" + "org.apache.hadoop:hadoop-client:2.7.0" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/pom.xml b/pom.xml index f28a2c396101..79232154c79e 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.3.0 + 2.7.0 2.0.0 1.6.6 From e1c5ddd33d1067ddc9a53a970acb4b1108b54a99 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 16 Feb 2017 13:29:59 -0800 Subject: [PATCH 2/3] use 2.7.3 --- .../main/java/io/druid/indexing/common/config/TaskConfig.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index eff0df46f384..6a9370324d0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.7.0" + "org.apache.hadoop:hadoop-client:2.7.3" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/pom.xml b/pom.xml index 79232154c79e..cc2831a12074 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 4.1.6.Final 1.7.12 - 2.7.0 + 2.7.3 2.0.0 1.6.6 From 6c7bd1a263867ba279f0ee46c94b73a0db589a69 Mon Sep 17 00:00:00 2001 From: Slim Bouguerra Date: Thu, 23 Feb 2017 08:28:50 -0800 Subject: [PATCH 3/3] use segment pusher to make loadspec --- .../segment/loading/DataSegmentPusher.java | 4 ++ .../storage/azure/AzureDataSegmentPusher.java | 14 ++++++ .../cassandra/CassandraDataSegmentPusher.java | 13 +++++- .../CloudFilesDataSegmentPusher.java | 17 ++++++++ .../google/GoogleDataSegmentPusher.java | 23 ++++++++-- .../storage/hdfs/HdfsDataSegmentPusher.java | 15 ++++--- .../druid/storage/s3/S3DataSegmentPusher.java | 12 ++++++ .../indexer/HadoopDruidIndexerConfig.java | 6 ++- .../io/druid/indexer/IndexGeneratorJob.java | 3 +- .../main/java/io/druid/indexer/JobHelper.java | 43 ++----------------- .../indexer/updater/HadoopConverterJob.java | 3 +- .../updater/HadoopDruidConverterConfig.java | 3 ++ .../indexing/common/task/IndexTaskTest.java | 7 +++ .../IngestSegmentFirehoseFactoryTest.java | 7 +++ .../indexing/overlord/TaskLifecycleTest.java | 13 ++++++ .../indexing/test/TestDataSegmentPusher.java | 8 ++++ .../loading/LocalDataSegmentPusher.java | 17 +++++--- .../appenderator/AppenderatorTester.java | 7 +++ .../java/io/druid/cli/CliRealtimeExample.java | 9 ++++ 19 files changed, 164 insertions(+), 60 deletions(-) diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index f77aa198c8a6..4a235588c7e6 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -23,6 +23,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; public interface DataSegmentPusher { @@ -30,4 +32,6 @@ public interface DataSegmentPusher String getPathForHadoop(String dataSource); String getPathForHadoop(); DataSegment push(File file, DataSegment segment) throws IOException; + //use map instead of LoadSpec class to avoid dependency pollution. + Map makeLoadSpec(URI finalIndexZipFilePath); } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 5e42b951779d..20bca6c124ea 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.Map; import java.util.concurrent.Callable; @@ -174,4 +175,17 @@ public DataSegment call() throws Exception } } } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of( + "type", + AzureStorageDruidModule.SCHEME, + "containerName", + config.getContainer(), + "blobPath", + uri.toString() + ); + } } diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 790f655e73a8..a3e95e474be8 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -27,6 +27,7 @@ import com.netflix.astyanax.recipes.storage.ChunkedStorage; import io.druid.java.util.common.CompressionUtils; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; @@ -36,6 +37,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; /** * Cassandra Segment Pusher @@ -46,7 +49,7 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data { private static final Logger log = new Logger(CassandraDataSegmentPusher.class); private static final int CONCURRENCY = 10; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private static final Joiner JOINER = Joiner.on("/").skipNulls(); private final ObjectMapper jsonMapper; @Inject @@ -96,7 +99,7 @@ public DataSegment push(final File indexFilesDir, DataSegment segment) throws IO MutationBatch mutation = this.keyspace.prepareMutationBatch(); mutation.withRow(descriptorStorage, key) .putColumn("lastmodified", System.currentTimeMillis(), null) - .putColumn("descriptor", json, null); + .putColumn("descriptor", json, null); mutation.execute(); log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); } catch (Exception e) @@ -114,4 +117,10 @@ ImmutableMap. of("type", "c*", "key", key) compressedIndexFile.delete(); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new IAE("not supported"); + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index fbef1bfe01f5..1a7eb05cff4f 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -34,6 +34,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.concurrent.Callable; public class CloudFilesDataSegmentPusher implements DataSegmentPusher @@ -146,4 +148,19 @@ public DataSegment call() throws Exception } } } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of( + "type", + CloudFilesStorageDruidModule.SCHEME, + "region", + objectApi.getRegion(), + "container", + objectApi.getContainer(), + "path", + uri.toString() + ); + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index 8bd9fbb8ee7c..868182cf9a58 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -35,6 +35,8 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.util.Map; public class GoogleDataSegmentPusher implements DataSegmentPusher { @@ -82,7 +84,8 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen return descriptorFile; } - public void insert(final File file, final String contentType, final String path) throws IOException { + public void insert(final File file, final String contentType, final String path) throws IOException + { LOG.info("Inserting [%s] to [%s]", file, path); FileInputStream fileSteam = new FileInputStream(file); @@ -117,7 +120,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr "bucket", config.getBucket(), "path", indexPath ) - ) + ) .withBinaryVersion(version); descriptorFile = createDescriptorFile(jsonMapper, outSegment); @@ -129,7 +132,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr } catch (Exception e) { throw Throwables.propagate(e); - } finally { + } + finally { if (indexFile != null) { LOG.info("Deleting file [%s]", indexFile); indexFile.delete(); @@ -142,6 +146,19 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr } } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of( + "type", + GoogleStorageDruidModule.SCHEME, + "bucket", + config.getBucket(), + "path", + finalIndexZipFilePath.getPath().substring(1) // remove the leading "/" + ); + } + public String buildPath(final String path) { if (config.getPrefix() != "") { diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 47dd748962dd..047fecdbdb12 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -40,6 +40,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.net.URI; +import java.util.Map; /** */ @@ -114,7 +116,7 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException )); final Path outDir = outFile.getParent(); dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outFile)) + segment.withLoadSpec(makeLoadSpec(outFile.toUri())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir)), tmpFile.getParent(), @@ -153,6 +155,12 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException return dataSegment; } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "hdfs", "path", finalIndexZipFilePath.toString()); + } + private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final FileSystem fs) throws IOException { final Path descriptorFile = new Path(outDir, "descriptor.json"); @@ -163,11 +171,6 @@ private DataSegment createDescriptorFile(DataSegment segment, Path outDir, final return segment; } - private ImmutableMap makeLoadSpec(Path outFile) - { - return ImmutableMap.of("type", "hdfs", "path", outFile.toUri().toString()); - } - private static class HdfsOutputStreamSupplier extends ByteSink { private final FileSystem fs; diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index bd3fe6d57889..6e2dc8fd71ab 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -38,6 +38,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.concurrent.Callable; public class S3DataSegmentPusher implements DataSegmentPusher @@ -149,4 +151,14 @@ public DataSegment call() throws Exception throw Throwables.propagate(e); } } + + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of( + "type", "s3_zip", + "bucket", finalIndexZipFilePath.getHost(), + "key", finalIndexZipFilePath.getPath().substring(1) // remove the leading "/" + ); + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 170cae5027d6..04aff62bc6d9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -54,6 +54,7 @@ import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.ShardSpec; @@ -92,9 +93,11 @@ public class HadoopDruidIndexerConfig public static final IndexMerger INDEX_MERGER; public static final IndexMergerV9 INDEX_MERGER_V9; public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG; - + public static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing"; + + static { injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -118,6 +121,7 @@ public void configure(Binder binder) INDEX_MERGER = injector.getInstance(IndexMerger.class); INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class); HADOOP_KERBEROS_CONFIG = injector.getInstance(HadoopKerberosConfig.class); + DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class); } public static enum IndexJobCounters diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 7de1194b3994..61d3a57aed26 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -741,7 +741,8 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator new Path(config.getSchema().getIOConfig().getSegmentOutputPath()), outputFS, segmentTemplate - ) + ), + config.DATA_SEGMENT_PUSHER ); Path descriptorPath = config.makeDescriptorInfoPath(segment); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index fe43a1073d8e..3fa4f437d2ec 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -23,7 +23,6 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; @@ -36,6 +35,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.DataSegmentPusherUtil; import io.druid.timeline.DataSegment; import org.apache.hadoop.conf.Configuration; @@ -379,7 +379,8 @@ public static DataSegment serializeOutIndex( final Progressable progressable, final TaskAttemptID taskAttemptID, final File mergedBase, - final Path segmentBasePath + final Path segmentBasePath, + DataSegmentPusher dataSegmentPusher ) throws IOException { @@ -415,44 +416,8 @@ public long push() throws IOException final Path finalIndexZipFilePath = new Path(segmentBasePath, "index.zip"); final URI indexOutURI = finalIndexZipFilePath.toUri(); - final ImmutableMap loadSpec; - // TODO: Make this a part of Pushers or Pullers - switch (outputFS.getScheme()) { - case "hdfs": - case "viewfs": - case "maprfs": - case "s3a": - loadSpec = ImmutableMap.of( - "type", "hdfs", - "path", indexOutURI.toString() - ); - break; - case "gs": - loadSpec = ImmutableMap.of( - "type", "google", - "bucket", indexOutURI.getHost(), - "path", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - break; - case "s3": - case "s3n": - loadSpec = ImmutableMap.of( - "type", "s3_zip", - "bucket", indexOutURI.getHost(), - "key", indexOutURI.getPath().substring(1) // remove the leading "/" - ); - break; - case "file": - loadSpec = ImmutableMap.of( - "type", "local", - "path", indexOutURI.getPath() - ); - break; - default: - throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme()); - } final DataSegment finalSegment = segmentTemplate - .withLoadSpec(loadSpec) + .withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI)) .withSize(size.get()) .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index 4c0c65a5a8fe..8458229949a3 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -559,7 +559,8 @@ protected void map( baseOutputPath, outputFS, finalSegmentTemplate - ) + ), + config.DATA_SEGMENT_PUSHER ); context.progress(); context.setStatus("Finished PUSH"); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java index 57f2b8f8c51c..a3574f16cb25 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopDruidConverterConfig.java @@ -37,6 +37,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; +import io.druid.segment.loading.DataSegmentPusher; import io.druid.server.DruidNode; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -53,6 +54,7 @@ public class HadoopDruidConverterConfig public static final ObjectMapper jsonMapper; public static final IndexIO INDEX_IO; public static final IndexMerger INDEX_MERGER; + public static final DataSegmentPusher DATA_SEGMENT_PUSHER; private static final Injector injector = Initialization.makeInjectorWithModules( GuiceInjectors.makeStartupInjector(), @@ -75,6 +77,7 @@ public void configure(Binder binder) jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class); INDEX_IO = injector.getInstance(IndexIO.class); INDEX_MERGER = injector.getInstance(IndexMerger.class); + DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class); } private static final TypeReference> mapTypeReference = new TypeReference>() diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 6ce78e6db8db..cbabaf24e008 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -423,6 +424,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), indexMerger, indexIO, null, null, indexMergerV9 ) diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 96ec7f44d050..f93a2cc1d19e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -93,6 +93,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -231,6 +232,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException { return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }, new DataSegmentKiller() { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index d65177c600bf..83eab76c932f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -122,6 +122,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -477,6 +478,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException pushedSegments++; return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; } @@ -1022,6 +1029,12 @@ public DataSegment push(File file, DataSegment dataSegment) throws IOException { throw new RuntimeException("FAILURE"); } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc); diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java index ad99827052d2..923a80299605 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -26,6 +26,8 @@ import java.io.File; import java.io.IOException; +import java.net.URI; +import java.util.Map; import java.util.Set; public class TestDataSegmentPusher implements DataSegmentPusher @@ -52,6 +54,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException return segment; } + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + public Set getPushedSegments() { return ImmutableSet.copyOf(pushedSegments); diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index 45269d084786..d1af155b0b17 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -33,7 +33,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.file.FileAlreadyExistsException; +import java.util.Map; import java.util.UUID; /** @@ -86,7 +88,7 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce } return createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outDir)) + segment.withLoadSpec(makeLoadSpec(outDir.toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), outDir @@ -98,7 +100,7 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce final long size = compressSegment(dataSegmentFile, tmpOutDir); final DataSegment dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip"))) + segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip").toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), tmpOutDir @@ -118,6 +120,12 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce return dataSegment; } + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "local", "path", finalIndexZipFilePath.getPath()); + } + private void createDirectoryIfNotExists(File directory) throws IOException { if (!directory.mkdirs() && !directory.isDirectory()) { @@ -145,9 +153,4 @@ private DataSegment createDescriptorFile(DataSegment segment, File outDir) throw Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile); return segment; } - - private ImmutableMap makeLoadSpec(File outFile) - { - return ImmutableMap.of("type", "local", "path", outFile.toString()); - } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 692cabcbdeee..cd446f2fde18 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -62,6 +62,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -186,6 +187,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException pushedSegments.add(segment); return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } }; appenderator = Appenderators.createRealtime( schema, diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 9d9122d8a435..60af89176083 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -20,6 +20,7 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; @@ -40,7 +41,9 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; @@ -141,6 +144,12 @@ public DataSegment push(File file, DataSegment segment) throws IOException { return segment; } + + @Override + public Map makeLoadSpec(URI uri) + { + return ImmutableMap.of(); + } } private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer