Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ public interface DataSegmentPuller
*
* @throws SegmentLoadingException if there are any errors
*/
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException;
public void getSegmentFiles(DataSegment segment, File dir, boolean cacheSegmentsLocally) throws SegmentLoadingException;
}
2 changes: 1 addition & 1 deletion api/src/main/java/io/druid/segment/loading/LoadSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface LoadSpec
* @param destDir The destination directory
* @return The byte count of data put in the destination directory
*/
public LoadSpecResult loadSegment(File destDir) throws SegmentLoadingException;
public LoadSpecResult loadSegment(File destDir, boolean cacheSegmentsLocally) throws SegmentLoadingException;

// Hold interesting data about the results of the segment load
public static class LoadSpecResult{
Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The historical node uses several of the global configs in [Configuration](../con
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|1|
|`druid.segmentCache.cacheSegmentsLocally`|Enable download of segments to temp directory before uncompressing them.|false|

### Query Configs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public AzureDataSegmentPuller(
public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(
final String containerName,
final String blobPath,
final File outDir
final File outDir,
final boolean cacheSegmentsLocally
)
throws SegmentLoadingException
{
Expand All @@ -63,7 +64,7 @@ public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(
byteSource,
outDir,
AzureUtils.AZURE_RETRY,
true
cacheSegmentsLocally
);

log.info("Loaded %d bytes from [%s] to [%s]", result.size(), blobPath, outDir.getAbsolutePath());
Expand All @@ -87,14 +88,14 @@ public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(
}

@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
public void getSegmentFiles(DataSegment segment, File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException
{

final Map<String, Object> loadSpec = segment.getLoadSpec();
final String containerName = MapUtils.getString(loadSpec, "containerName");
final String blobPath = MapUtils.getString(loadSpec, "blobPath");

getSegmentFiles(containerName, blobPath, outDir);
getSegmentFiles(containerName, blobPath, outDir, cacheSegmentsLocally);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public AzureLoadSpec(
}

@Override
public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
public LoadSpecResult loadSegment(File file, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(containerName, blobPath, file).size());
return new LoadSpecResult(puller.getSegmentFiles(containerName, blobPath, file, cacheSegmentsLocally).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testZIPUncompress() throws SegmentLoadingException, URISyntaxExcepti

AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);

FileUtils.FileCopyResult result = puller.getSegmentFiles(containerName, blobPath, toDir);
FileUtils.FileCopyResult result = puller.getSegmentFiles(containerName, blobPath, toDir, false);

File expected = new File(toDir, SEGMENT_FILE_NAME);
assertEquals(value.length(), result.size());
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()

AzureDataSegmentPuller puller = new AzureDataSegmentPuller(azureStorage);

puller.getSegmentFiles(containerName, blobPath, outDir);
puller.getSegmentFiles(containerName, blobPath, outDir, false);

assertFalse(outDir.exists());

Expand All @@ -138,13 +138,13 @@ public void getSegmentFilesTest() throws SegmentLoadingException
final FileUtils.FileCopyResult result = createMock(FileUtils.FileCopyResult.class);
final AzureDataSegmentPuller puller = createMockBuilder(AzureDataSegmentPuller.class).withConstructor(
azureStorage
).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock();
).addMockedMethod("getSegmentFiles", String.class, String.class, File.class, boolean.class).createMock();

expect(puller.getSegmentFiles(containerName, blobPath, outDir)).andReturn(result);
expect(puller.getSegmentFiles(containerName, blobPath, outDir, false)).andReturn(result);

replayAll();

puller.getSegmentFiles(dataSegment, outDir);
puller.getSegmentFiles(dataSegment, outDir, false);

verifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,16 @@ public CassandraDataSegmentPuller(CassandraDataSegmentConfig config)
}

@Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
public void getSegmentFiles(DataSegment segment, File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
String key = (String) segment.getLoadSpec().get("key");
getSegmentFiles(key, outDir);
getSegmentFiles(key, outDir, cacheSegmentsLocally);
}
public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(final String key, final File outDir) throws SegmentLoadingException{
public io.druid.java.util.common.FileUtils.FileCopyResult getSegmentFiles(
final String key,
final File outDir,
final boolean cacheSegmentsLocally
) throws SegmentLoadingException{
log.info("Pulling index from C* at path[%s] to outDir[%s]", key, outDir);
if (!outDir.exists()) {
outDir.mkdirs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public CassandraLoadSpec(
}

@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
public LoadSpecResult loadSegment(File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(key, outDir).size());
return new LoadSpecResult(puller.getSegmentFiles(key, outDir, cacheSegmentsLocally).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ public CloudFilesDataSegmentPuller(final CloudFilesApi cloudFilesApi)
}

@Override
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
public void getSegmentFiles(
final DataSegment segment,
final File outDir,
final boolean cacheSegmentsLocally
) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final String region = MapUtils.getString(loadSpec, "region");
Expand All @@ -57,19 +61,26 @@ public void getSegmentFiles(final DataSegment segment, final File outDir) throws

log.info("Pulling index at path[%s] to outDir[%s]", path, outDir);
prepareOutDir(outDir);
getSegmentFiles(region, container, path, outDir);
getSegmentFiles(region, container, path, outDir, cacheSegmentsLocally);
}

public FileUtils.FileCopyResult getSegmentFiles(String region, String container, String path, File outDir)
public FileUtils.FileCopyResult getSegmentFiles(
String region,
String container,
String path,
File outDir,
boolean cacheSegmentsLocally)
throws SegmentLoadingException
{
CloudFilesObjectApiProxy objectApi = new CloudFilesObjectApiProxy(cloudFilesApi, region, container);
final CloudFilesByteSource byteSource = new CloudFilesByteSource(objectApi, path);

try {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource, outDir,
CloudFilesUtils.CLOUDFILESRETRY, true
byteSource,
outDir,
CloudFilesUtils.CLOUDFILESRETRY,
cacheSegmentsLocally
);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public CloudFilesLoadSpec(
}

@Override
public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
public LoadSpecResult loadSegment(File file, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(region, container, path, file).size());
return new LoadSpecResult(puller.getSegmentFiles(region, container, path, file, cacheSegmentsLocally).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,25 @@ public GoogleDataSegmentPuller(final GoogleStorage storage)
}

@Override
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
public void getSegmentFiles(
final DataSegment segment,
final File outDir,
final boolean cacheSegmentsLocally
) throws SegmentLoadingException
{
final Map<String, Object> loadSpec = segment.getLoadSpec();
final String bucket = MapUtils.getString(loadSpec, "bucket");
final String path = MapUtils.getString(loadSpec, "path");

getSegmentFiles(bucket, path, outDir);
getSegmentFiles(bucket, path, outDir, cacheSegmentsLocally);
}

public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final String path, File outDir)
throws SegmentLoadingException
public FileUtils.FileCopyResult getSegmentFiles(
final String bucket,
final String path,
File outDir,
boolean cacheSegmentsLocally
) throws SegmentLoadingException
{
LOG.info("Pulling index at path[%s] to outDir[%s]", bucket, path, outDir.getAbsolutePath());

Expand All @@ -72,7 +80,7 @@ public FileUtils.FileCopyResult getSegmentFiles(final String bucket, final Strin
byteSource,
outDir,
GoogleUtils.GOOGLE_RETRY,
true
cacheSegmentsLocally
);
LOG.info("Loaded %d bytes from [%s] to [%s]", result.size(), path, outDir.getAbsolutePath());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public GoogleLoadSpec(
}

@Override
public LoadSpecResult loadSegment(File file) throws SegmentLoadingException
public LoadSpecResult loadSegment(File file, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(bucket, path, file).size());
return new LoadSpecResult(puller.getSegmentFiles(bucket, path, file, cacheSegmentsLocally).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testDeleteOutputDirectoryWhenErrorIsRaisedPullingSegmentFiles()
replayAll();

GoogleDataSegmentPuller puller = new GoogleDataSegmentPuller(storage);
puller.getSegmentFiles(bucket, path, outDir);
puller.getSegmentFiles(bucket, path, outDir, false);

assertFalse(outDir.exists());

Expand All @@ -85,13 +85,13 @@ public void getSegmentFilesTest() throws SegmentLoadingException, IOException
GoogleStorage storage = createMock(GoogleStorage.class);
GoogleDataSegmentPuller puller = createMockBuilder(GoogleDataSegmentPuller.class).withConstructor(
storage
).addMockedMethod("getSegmentFiles", String.class, String.class, File.class).createMock();
).addMockedMethod("getSegmentFiles", String.class, String.class, File.class, boolean.class).createMock();

expect(puller.getSegmentFiles(bucket, path, outDir)).andReturn(result);
expect(puller.getSegmentFiles(bucket, path, outDir, false)).andReturn(result);

replayAll();

puller.getSegmentFiles(dataSegment, outDir);
puller.getSegmentFiles(dataSegment, outDir, false);

verifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,15 @@ public HdfsDataSegmentPuller(final Configuration config)


@Override
public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
public void getSegmentFiles(DataSegment segment, File dir, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
getSegmentFiles(getPath(segment), dir);
getSegmentFiles(getPath(segment), dir, cacheSegmentsLocally);
}

public FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException
public FileUtils.FileCopyResult getSegmentFiles(
final Path path,
final File outDir,
final boolean cacheSegmentsLocally) throws SegmentLoadingException
{
try {
final FileSystem fs = path.getFileSystem(config);
Expand Down Expand Up @@ -283,12 +286,16 @@ public InputStream openStream() throws IOException
}
}

public FileUtils.FileCopyResult getSegmentFiles(URI uri, File outDir) throws SegmentLoadingException
public FileUtils.FileCopyResult getSegmentFiles(
URI uri,
File outDir,
boolean cacheSegmentsLocally
) throws SegmentLoadingException
{
if (!uri.getScheme().equalsIgnoreCase(HdfsStorageDruidModule.SCHEME)) {
throw new SegmentLoadingException("Don't know how to load SCHEME for URI [%s]", uri.toString());
}
return getSegmentFiles(new Path(uri), outDir);
return getSegmentFiles(new Path(uri), outDir, cacheSegmentsLocally);
}

public InputStream getInputStream(Path path) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public final String getPathString()
}

@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
public LoadSpecResult loadSegment(File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(path, outDir).size());
return new LoadSpecResult(puller.getSegmentFiles(path, outDir, cacheSegmentsLocally).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testZip() throws IOException, SegmentLoadingException
}
try {
Assert.assertFalse(outFile.exists());
puller.getSegmentFiles(uri, outTmpDir);
puller.getSegmentFiles(uri, outTmpDir, false);
Assert.assertTrue(outFile.exists());

Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
Expand Down Expand Up @@ -174,7 +174,7 @@ public void testGZ() throws IOException, SegmentLoadingException
}
try {
Assert.assertFalse(outFile.exists());
puller.getSegmentFiles(uri, outTmpDir);
puller.getSegmentFiles(uri, outTmpDir, false);
Assert.assertTrue(outFile.exists());

Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testDir() throws IOException, SegmentLoadingException
}
try {
Assert.assertFalse(outFile.exists());
puller.getSegmentFiles(uri, outTmpDir);
puller.getSegmentFiles(uri, outTmpDir, false);
Assert.assertTrue(outFile.exists());

Assert.assertArrayEquals(pathByteContents, Files.readAllBytes(outFile.toPath()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ public S3DataSegmentPuller(
}

@Override
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
public void getSegmentFiles(final DataSegment segment, final File outDir, final boolean cacheSegmentsLocally) throws SegmentLoadingException
{
getSegmentFiles(new S3Coords(segment), outDir);
getSegmentFiles(new S3Coords(segment), outDir, cacheSegmentsLocally);
}

public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir)
public FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir, final boolean cacheSegmentsLocally)
throws SegmentLoadingException
{

Expand Down Expand Up @@ -202,7 +202,7 @@ public InputStream openStream() throws IOException
byteSource,
outDir,
S3Utils.S3RETRY,
true
cacheSegmentsLocally
);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public S3LoadSpec(
}

@Override
public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
public LoadSpecResult loadSegment(File outDir, boolean cacheSegmentsLocally) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size());
return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir, cacheSegmentsLocally).size());
}

@JsonProperty(S3DataSegmentPuller.BUCKET)
Expand Down
Loading