diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPuller.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPuller.java index 46f051138a5f..d1bd80e3703a 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPuller.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPuller.java @@ -36,5 +36,5 @@ public interface DataSegmentPuller * * @throws SegmentLoadingException if there are any errors */ - public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException; + public void getSegmentFiles(DataSegment segment, File dir, boolean cacheSegmentsLocally) throws SegmentLoadingException; } diff --git a/api/src/main/java/io/druid/segment/loading/LoadSpec.java b/api/src/main/java/io/druid/segment/loading/LoadSpec.java index 3adef9c4513d..f523221b00ee 100644 --- a/api/src/main/java/io/druid/segment/loading/LoadSpec.java +++ b/api/src/main/java/io/druid/segment/loading/LoadSpec.java @@ -34,7 +34,7 @@ public interface LoadSpec * @param destDir The destination directory * @return The byte count of data put in the destination directory */ - public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException; + public LoadSpecResult loadSegment(File destDir, boolean cacheSegmentsLocally) throws SegmentLoadingException; // Hold interesting data about the results of the segment load public static class LoadSpecResult{ diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index e8b8832a4786..47688bfaad2a 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -37,6 +37,7 @@ The historical node uses several of the global configs in [Configuration](../con |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|1| +|`druid.segmentCache.cacheSegmentsLocally`|Enable download of segments to temp directory before uncompressing them.|false| ### Query Configs diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java index 029301c5cf23..4829913a8907 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPuller.java @@ -51,7 +51,8 @@ public AzureDataSegmentPuller( public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles( final String containerName, final String blobPath, - final File outDir + final File outDir, + final boolean cacheSegmentsLocally ) throws SegmentLoadingException { @@ -63,7 +64,7 @@ public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles( byteSource, outDir, AzureUtils.AZURE_RETRY, - true + cacheSegmentsLocally ); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), blobPath, outDir.getAbsolutePath()); @@ -87,14 +88,14 @@ public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles( } @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + public void getSegmentFiles(DataSegment segment, File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException { final Map loadSpec = segment.getLoadSpec(); final String containerName = MapUtils.getString(loadSpec, "containerName"); final String blobPath = MapUtils.getString(loadSpec, "blobPath"); - getSegmentFiles(containerName, blobPath, outDir); + getSegmentFiles(containerName, blobPath, outDir, cacheSegmentsLocally); } @VisibleForTesting diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java index 72d965003cb9..7c35802d6ea6 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureLoadSpec.java @@ -56,8 +56,8 @@ public AzureLoadSpec( } @Override - public LoadSpecResult loadSegment(File file) throws SegmentLoadingException + public LoadSpecResult loadSegment(File file, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(containerName, blobPath, file).size()); + return new LoadSpecResult(puller.getSegmentFiles(containerName, blobPath, file, cacheSegmentsLocally).size()); } } diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java index 5a5eec038ceb..75f888ccba08 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPullerTest.java @@ -82,7 +82,7 @@ public void testZIPUncompress() throws SegmentLoadingException, URISyntaxExcepti AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage); - FileUtils.FileCopyResult result = puller.getSegmentFiles(containerName, blobPath, toDir); + FileUtils.FileCopyResult result = puller.getSegmentFiles(containerName, blobPath, toDir, false); File expected = new File(toDir, SEGMENT_FILE_NAME); assertEquals(value.length(), result.size()); @@ -118,7 +118,7 @@ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles() AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage); - puller.getSegmentFiles(containerName, blobPath, outDir); + puller.getSegmentFiles(containerName, blobPath, outDir, false); assertFalse(outDir.exists()); @@ -138,13 +138,13 @@ public void getSegmentFilesTest() throws SegmentLoadingException final FileUtils.FileCopyResult result = createMock(FileUtils.FileCopyResult.class); final AzureDataSegmentPuller puller = createMockBuilder(AzureDataSegmentPuller.class).withConstructor( azureStorage - ).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock(); + ).addMockedMethod("getSegmentFiles", String.class, String.class, File.class, boolean.class).createMock(); - expect(puller.getSegmentFiles(containerName, blobPath, outDir)).andReturn(result); + expect(puller.getSegmentFiles(containerName, blobPath, outDir, false)).andReturn(result); replayAll(); - puller.getSegmentFiles(dataSegment, outDir); + puller.getSegmentFiles(dataSegment, outDir, false); verifyAll(); } diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java index 0b48d4964acf..0d8560c7472f 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPuller.java @@ -55,12 +55,16 @@ public CassandraDataSegmentPuller(CassandraDataSegmentConfig config) } @Override - public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException + public void getSegmentFiles(DataSegment segment, File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException { String key = (String) segment.getLoadSpec().get("key"); - getSegmentFiles(key, outDir); + getSegmentFiles(key, outDir, cacheSegmentsLocally); } - public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(final String key, final File outDir) throws SegmentLoadingException{ + public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles( + final String key, + final File outDir, + final boolean cacheSegmentsLocally + ) throws SegmentLoadingException{ log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir); if (!outDir.exists()) { outDir.mkdirs(); diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java index 5667ff6e29f1..8ac2b8d42e0c 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraLoadSpec.java @@ -49,8 +49,8 @@ public CassandraLoadSpec( } @Override - public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + public LoadSpecResult loadSegment(File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(key, outDir).size()); + return new LoadSpecResult(puller.getSegmentFiles(key, outDir, cacheSegmentsLocally).size()); } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java index 65117653bf04..97e70e15674e 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPuller.java @@ -48,7 +48,11 @@ public CloudFilesDataSegmentPuller(final CloudFilesApi cloudFilesApi) } @Override - public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException + public void getSegmentFiles( + final DataSegment segment, + final File outDir, + final boolean cacheSegmentsLocally + ) throws SegmentLoadingException { final Map loadSpec = segment.getLoadSpec(); final String region = MapUtils.getString(loadSpec, "region"); @@ -57,10 +61,15 @@ public void getSegmentFiles(final DataSegment segment, final File outDir) throws log.info("Pulling index at path[%s] to outDir[%s]", path, outDir); prepareOutDir(outDir); - getSegmentFiles(region, container, path, outDir); + getSegmentFiles(region, container, path, outDir, cacheSegmentsLocally); } - public FileUtils.FileCopyResult getSegmentFiles(String region, String container, String path, File outDir) + public FileUtils.FileCopyResult getSegmentFiles( + String region, + String container, + String path, + File outDir, + boolean cacheSegmentsLocally) throws SegmentLoadingException { CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container); @@ -68,8 +77,10 @@ public FileUtils.FileCopyResult getSegmentFiles(String region, String container, try { final FileUtils.FileCopyResult result = CompressionUtils.unzip( - byteSource, outDir, - CloudFilesUtils.CLOUDFILESRETRY, true + byteSource, + outDir, + CloudFilesUtils.CLOUDFILESRETRY, + cacheSegmentsLocally ); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath()); return result; diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java index f13d412fc762..840f10d6bb19 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesLoadSpec.java @@ -60,8 +60,8 @@ public CloudFilesLoadSpec( } @Override - public LoadSpecResult loadSegment(File file) throws SegmentLoadingException + public LoadSpecResult loadSegment(File file, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(region, container, path, file).size()); + return new LoadSpecResult(puller.getSegmentFiles(region, container, path, file, cacheSegmentsLocally).size()); } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java index 3a778efb96ab..cb832f4c5c58 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPuller.java @@ -50,17 +50,25 @@ public GoogleDataSegmentPuller(final GoogleStorage storage) } @Override - public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException + public void getSegmentFiles( + final DataSegment segment, + final File outDir, + final boolean cacheSegmentsLocally + ) throws SegmentLoadingException { final Map loadSpec = segment.getLoadSpec(); final String bucket = MapUtils.getString(loadSpec, "bucket"); final String path = MapUtils.getString(loadSpec, "path"); - getSegmentFiles(bucket, path, outDir); + getSegmentFiles(bucket, path, outDir, cacheSegmentsLocally); } - public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path, File outDir) - throws SegmentLoadingException + public FileUtils.FileCopyResult getSegmentFiles( + final String bucket, + final String path, + File outDir, + boolean cacheSegmentsLocally + ) throws SegmentLoadingException { LOG.info("Pulling index at path[%s] to outDir[%s]", bucket, path, outDir.getAbsolutePath()); @@ -72,7 +80,7 @@ public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final Strin byteSource, outDir, GoogleUtils.GOOGLE_RETRY, - true + cacheSegmentsLocally ); LOG.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath()); return result; diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java index ef6321b1e5f6..8159613b77c4 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleLoadSpec.java @@ -55,8 +55,8 @@ public GoogleLoadSpec( } @Override - public LoadSpecResult loadSegment(File file) throws SegmentLoadingException + public LoadSpecResult loadSegment(File file, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(bucket, path, file).size()); + return new LoadSpecResult(puller.getSegmentFiles(bucket, path, file, cacheSegmentsLocally).size()); } } diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java index a6d3facfd9cb..f3e68fc3ed75 100644 --- a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPullerTest.java @@ -65,7 +65,7 @@ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles() replayAll(); GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage); - puller.getSegmentFiles(bucket, path, outDir); + puller.getSegmentFiles(bucket, path, outDir, false); assertFalse(outDir.exists()); @@ -85,13 +85,13 @@ public void getSegmentFilesTest() throws SegmentLoadingException, IOException GoogleStorage storage = createMock(GoogleStorage.class); GoogleDataSegmentPuller puller = createMockBuilder(GoogleDataSegmentPuller.class).withConstructor( storage - ).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock(); + ).addMockedMethod("getSegmentFiles", String.class, String.class, File.class, boolean.class).createMock(); - expect(puller.getSegmentFiles(bucket, path, outDir)).andReturn(result); + expect(puller.getSegmentFiles(bucket, path, outDir, false)).andReturn(result); replayAll(); - puller.getSegmentFiles(dataSegment, outDir); + puller.getSegmentFiles(dataSegment, outDir, false); verifyAll(); } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java index b265e9b228fb..d377b9a81f20 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -168,12 +168,15 @@ public HdfsDataSegmentPuller(final Configuration config) @Override - public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException + public void getSegmentFiles(DataSegment segment, File dir, boolean cacheSegmentsLocally) throws SegmentLoadingException { - getSegmentFiles(getPath(segment), dir); + getSegmentFiles(getPath(segment), dir, cacheSegmentsLocally); } - public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException + public FileUtils.FileCopyResult getSegmentFiles( + final Path path, + final File outDir, + final boolean cacheSegmentsLocally) throws SegmentLoadingException { try { final FileSystem fs = path.getFileSystem(config); @@ -283,12 +286,16 @@ public InputStream openStream() throws IOException } } - public FileUtils.FileCopyResult getSegmentFiles(URI uri, File outDir) throws SegmentLoadingException + public FileUtils.FileCopyResult getSegmentFiles( + URI uri, + File outDir, + boolean cacheSegmentsLocally + ) throws SegmentLoadingException { if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) { throw new SegmentLoadingException("Don't know how to load SCHEME for URI [%s]", uri.toString()); } - return getSegmentFiles(new Path(uri), outDir); + return getSegmentFiles(new Path(uri), outDir, cacheSegmentsLocally); } public InputStream getInputStream(Path path) throws IOException diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java index 2d93fb9b5631..c1935467dad2 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsLoadSpec.java @@ -57,8 +57,8 @@ public final String getPathString() } @Override - public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + public LoadSpecResult loadSegment(File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(path, outDir).size()); + return new LoadSpecResult(puller.getSegmentFiles(path, outDir, cacheSegmentsLocally).size()); } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java index be0ed7d9ae4c..908eae8f0a54 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentPullerTest.java @@ -133,7 +133,7 @@ public void testZip() throws IOException, SegmentLoadingException } try { Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(uri, outTmpDir); + puller.getSegmentFiles(uri, outTmpDir, false); Assert.assertTrue(outFile.exists()); Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); @@ -174,7 +174,7 @@ public void testGZ() throws IOException, SegmentLoadingException } try { Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(uri, outTmpDir); + puller.getSegmentFiles(uri, outTmpDir, false); Assert.assertTrue(outFile.exists()); Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); @@ -208,7 +208,7 @@ public void testDir() throws IOException, SegmentLoadingException } try { Assert.assertFalse(outFile.exists()); - puller.getSegmentFiles(uri, outTmpDir); + puller.getSegmentFiles(uri, outTmpDir, false); Assert.assertTrue(outFile.exists()); Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath())); diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index eb7ca2ddd2bd..11bc96fc5652 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -160,12 +160,12 @@ public S3DataSegmentPuller( } @Override - public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException + public void getSegmentFiles(final DataSegment segment, final File outDir, final boolean cacheSegmentsLocally) throws SegmentLoadingException { - getSegmentFiles(new S3Coords(segment), outDir); + getSegmentFiles(new S3Coords(segment), outDir, cacheSegmentsLocally); } - public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) + public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir, final boolean cacheSegmentsLocally) throws SegmentLoadingException { @@ -202,7 +202,7 @@ public InputStream openStream() throws IOException byteSource, outDir, S3Utils.S3RETRY, - true + cacheSegmentsLocally ); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath()); return result; diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java index 8fead762a55f..876a16c27ced 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3LoadSpec.java @@ -55,9 +55,9 @@ public S3LoadSpec( } @Override - public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException + public LoadSpecResult loadSegment(File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size()); + return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir, cacheSegmentsLocally).size()); } @JsonProperty(S3DataSegmentPuller.BUCKET) diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java index 72358e55d049..3f55e7c1fea3 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java @@ -114,7 +114,9 @@ public void testGZUncompress() throws ServiceException, IOException, SegmentLoad new S3DataSegmentPuller.S3Coords( bucket, object0.getKey() - ), tmpDir + ), + tmpDir, + false ); EasyMock.verify(s3Client); @@ -172,7 +174,9 @@ public void testGZUncompressRetries() throws ServiceException, IOException, Segm new S3DataSegmentPuller.S3Coords( bucket, object0.getKey() - ), tmpDir + ), + tmpDir, + false ); EasyMock.verify(s3Client); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 7201bd119133..c88dfbc565a4 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -313,7 +313,7 @@ private void testIngestion( Assert.assertEquals(1, spec.getPartitions()); File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); - new LocalDataSegmentPuller().getSegmentFiles(dataSegment, tmpUnzippedSegmentDir); + new LocalDataSegmentPuller().getSegmentFiles(dataSegment, tmpUnzippedSegmentDir, false); QueryableIndex index = INDEX_IO.loadIndex(tmpUnzippedSegmentDir); StorageAdapter adapter = new QueryableIndexStorageAdapter(index); diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java index f2714948aa6f..85eb5c00fd46 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPuller.java @@ -117,12 +117,16 @@ public boolean delete() private static final Logger log = new Logger(LocalDataSegmentPuller.class); @Override - public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException + public void getSegmentFiles(DataSegment segment, File dir, boolean cacheSegmentsLocally) throws SegmentLoadingException { - getSegmentFiles(getFile(segment), dir); + getSegmentFiles(getFile(segment), dir, cacheSegmentsLocally); } - public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException + public FileUtils.FileCopyResult getSegmentFiles( + final File sourceFile, + final File dir, + boolean cacheSegmentsLocally + ) throws SegmentLoadingException { if (sourceFile.isDirectory()) { if (sourceFile.equals(dir)) { diff --git a/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java index cba6f296bca9..9216aa9cec63 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java +++ b/server/src/main/java/io/druid/segment/loading/LocalLoadSpec.java @@ -59,8 +59,8 @@ public String getPath() } @Override - public LoadSpecResult loadSegment(final File outDir) throws SegmentLoadingException + public LoadSpecResult loadSegment(final File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(path.toFile(), outDir).size()); + return new LoadSpecResult(puller.getSegmentFiles(path.toFile(), outDir, cacheSegmentsLocally).size()); } } diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index 3684b2744ccb..e146a2e7dfe0 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -52,6 +52,9 @@ public class SegmentLoaderConfig @JsonProperty private File infoDir = null; + @JsonProperty("cacheSegmentsLocally") + private boolean cacheSegmentsLocally = false; + public List getLocations() { return locations; @@ -99,6 +102,11 @@ public SegmentLoaderConfig withLocations(List locations) return retVal; } + public boolean isCacheSegmentsLocally() + { + return cacheSegmentsLocally; + } + @Override public String toString() { @@ -107,6 +115,7 @@ public String toString() ", deleteOnRemove=" + deleteOnRemove + ", dropSegmentDelayMillis=" + dropSegmentDelayMillis + ", infoDir=" + infoDir + + ", cacheSegmentsLocally=" + cacheSegmentsLocally + '}'; } } diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4995c3230274..44cedc6af925 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -203,7 +203,7 @@ private void loadInLocation(DataSegment segment, File storageDir) throws Segment { // LoadSpec isn't materialized until here so that any system can interpret Segment without having to have all the LoadSpec dependencies. final LoadSpec loadSpec = jsonMapper.convertValue(segment.getLoadSpec(), LoadSpec.class); - final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir); + final LoadSpec.LoadSpecResult result = loadSpec.loadSegment(storageDir, config.isCacheSegmentsLocally()); if (result.getSize() != segment.getSize()) { log.warn( "Segment [%s] is different than expected size. Expected [%d] found [%d]", diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java index 641c737454f5..0e3c3f04f664 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPullerTest.java @@ -72,7 +72,7 @@ public void simpleZipTest() throws IOException, SegmentLoadingException Assert.assertFalse(file.exists()); Assert.assertTrue(zipFile.exists()); - puller.getSegmentFiles(zipFile, tmpDir); + puller.getSegmentFiles(zipFile, tmpDir, false); Assert.assertTrue(file.exists()); } @@ -97,7 +97,7 @@ public void simpleGZTest() throws IOException, SegmentLoadingException Assert.assertTrue(zipFile.exists()); Assert.assertFalse(unZipFile.exists()); - puller.getSegmentFiles(zipFile, tmpDir); + puller.getSegmentFiles(zipFile, tmpDir, false); Assert.assertTrue(unZipFile.exists()); } @@ -108,7 +108,7 @@ public void simpleDirectoryTest() throws IOException, SegmentLoadingException File tmpFile = File.createTempFile("test", "file", srcDir); File expectedOutput = new File(tmpDir, Files.getNameWithoutExtension(tmpFile.getAbsolutePath())); Assert.assertFalse(expectedOutput.exists()); - puller.getSegmentFiles(srcDir, tmpDir); + puller.getSegmentFiles(srcDir, tmpDir, false); Assert.assertTrue(expectedOutput.exists()); } }