Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@

public class FileUtils
{
public enum LinkOrCopyResult
{
LINK,
COPY
}

/**
* Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions
*/
Expand Down Expand Up @@ -461,6 +467,26 @@ public static void deleteDirectory(final File directory) throws IOException
org.apache.commons.io.FileUtils.deleteDirectory(directory);
}

/**
* Hard-link "src" as "dest", if possible. If not possible -- perhaps they are on separate filesystems -- then
* copy "src" to "dest".
*
* @return whether a link or copy was made. Can be safely ignored if you don't care.
*
* @throws IOException if something went wrong
*/
public static LinkOrCopyResult linkOrCopy(final File src, final File dest) throws IOException
{
try {
Files.createLink(dest.toPath(), src.toPath());
return LinkOrCopyResult.LINK;
}
catch (IOException e) {
Files.copy(src.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING);
return LinkOrCopyResult.COPY;
}
}

public interface OutputStreamConsumer<T>
{
T apply(OutputStream outputStream) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,45 @@ public void testCopyLarge() throws IOException
Assert.assertEquals(data.length(), result);
Assert.assertEquals(data, StringUtils.fromUtf8(Files.readAllBytes(dstFile.toPath())));
}

@Test
public void testLinkOrCopy1() throws IOException
{
// Will be a LINK.

final File fromFile = temporaryFolder.newFile();
final File toDir = temporaryFolder.newFolder();
final File toFile = new File(toDir, "toFile");

Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(fromFile, toFile);

// Verify the new link.
Assert.assertEquals(FileUtils.LinkOrCopyResult.LINK, linkOrCopyResult);
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));

// Verify they are actually the same file.
Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
Assert.assertEquals("bar", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
}

@Test
public void testLinkOrCopy2() throws IOException
{
// Will be a COPY, because the destination file already exists and therefore Files.createLink fails.

final File fromFile = temporaryFolder.newFile();
final File toFile = temporaryFolder.newFile();

Files.write(fromFile.toPath(), StringUtils.toUtf8("foo"));
final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(fromFile, toFile);

// Verify the new link.
Assert.assertEquals(FileUtils.LinkOrCopyResult.COPY, linkOrCopyResult);
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));

// Verify they are not the same file.
Files.write(fromFile.toPath(), StringUtils.toUtf8("bar"));
Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath())));
}
}
45 changes: 34 additions & 11 deletions docs/dependencies/deep-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,52 @@ title: "Deep storage"

Deep storage is where segments are stored. It is a storage mechanism that Apache Druid does not provide. This deep storage infrastructure defines the level of durability of your data, as long as Druid processes can see this storage infrastructure and get at the segments stored on it, you will not lose data no matter how many Druid nodes you lose. If segments disappear from this storage layer, then you will lose whatever data those segments represented.

## Local Mount
## Local

A local mount can be used for storage of segments as well. This allows you to use just your local file system or anything else that can be mount locally like NFS, Ceph, etc. This is the default deep storage implementation.
Local storage is intended for use in the following situations:

In order to use a local mount for deep storage, you need to set the following configuration in your common configs.
- You have just one server.
- Or, you have multiple servers, and they all have access to a shared filesystem (for example: NFS).

In multi-server production clusters, rather than local storage with a shared filesystem, it is instead recommended to
use cloud-based deep storage ([Amazon S3](#amazon-s3-or-s3-compatible), [Google Cloud Storage](#google-cloud-storage),
or [Azure Blob Storage](#azure-blob-storage)), S3-compatible storage (like Minio), or [HDFS](#hdfs). These options are
generally more convenient, more scalable, and more robust than setting up a shared filesystem.

The following configurations in `common.runtime.properties` apply to local storage:

|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.storage.type`|local||Must be set.|
|`druid.storage.storageDirectory`||Directory for storing segments.|Must be set.|
|`druid.storage.type`|`local`||Must be set.|
|`druid.storage.storageDirectory`|any local directory|Directory for storing segments. Must be different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.|`/tmp/druid/localStorage`|
|`druid.storage.zip`|`true`, `false`|Whether segments in `druid.storage.storageDirectory` are written as directories (`false`) or zip files (`true`).|`false`|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd be down for just making the segment storage stop zipping up the files. It'd be good for a wide range of things, I think. The only risk is that it might create extra data on deep storage for people, but I'm not sure that's really the top-of-mind concern for most people.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this an option for risk-reduction purposes, and would prefer to keep it an option for now for that reason, but I am definitely down to make it always-not-zipped in the not-too-distant future.


For example:

```
druid.storage.type=local
druid.storage.storageDirectory=/tmp/druid/localStorage
```

The `druid.storage.storageDirectory` must be set to a different path than `druid.segmentCache.locations` or
`druid.segmentCache.infoDir`.

## Amazon S3 or S3-compatible

See [`druid-s3-extensions`](../development/extensions-core/s3.md).

Note that you should generally set `druid.storage.storageDirectory` to something different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.
## Google Cloud Storage

If you are using the Hadoop indexer in local mode, then just give it a local file as your output directory and it will work.
See [`druid-google-extensions`](../development/extensions-core/google.md).

## S3-compatible
## Azure Blob Storage

See [druid-s3-extensions extension documentation](../development/extensions-core/s3.md).
See [`druid-azure-extensions`](../development/extensions-core/azure.md).

## HDFS

See [druid-hdfs-storage extension documentation](../development/extensions-core/hdfs.md).

## Additional Deep Stores
## Additional options

For additional deep stores, please see our [extensions list](../development/extensions.md).
For additional deep storage options, please see our [extensions list](../development/extensions.md).
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);

private final Configuration hadoopConfig;
private final ObjectMapper jsonMapper;

// We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA.
// Please see https://github.com/apache/druid/pull/5684
Expand All @@ -68,7 +67,6 @@ public HdfsDataSegmentPusher(
)
{
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = Suppliers.memoize(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3159,6 +3159,7 @@ public void close()
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);

toolboxFactory = new TaskToolboxFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3124,6 +3124,7 @@ public void close()
};
final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
dataSegmentPusherConfig.zip = true;
final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);

toolboxFactory = new TaskToolboxFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
Expand Down Expand Up @@ -162,6 +163,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -943,7 +945,8 @@ public DataSegment apply(String input)
List<File> segmentFiles = new ArrayList<>();
for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) {
File file = new File((String) segment.getLoadSpec().get("path"));
FileUtils.mkdirp(file);
FileUtils.mkdirp(file.getParentFile());
Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY);
segmentFiles.add(file);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;

/**
*
*/
public class LocalDataSegmentKiller implements DataSegmentKiller
{
Expand All @@ -51,9 +52,12 @@ public void kill(DataSegment segment) throws SegmentLoadingException
log.info("Deleting segment[%s] from directory[%s].", segment.getId(), path);

try {
if (path.getName().endsWith(".zip")) {
if ((path.getName().endsWith(".zip") && path.isFile()) ||
(path.getName().equals(LocalDataSegmentPusher.INDEX_DIR) && path.isDirectory())) {
// path format -- > .../dataSource/interval/version/partitionNum/xxx.zip
// or .../dataSource/interval/version/partitionNum/UUID/xxx.zip
// or -- > .../dataSource/interval/version/partitionNum/index/
// or .../dataSource/interval/version/partitionNum/UUID/index/

File parentDir = path.getParentFile();
FileUtils.deleteDirectory(parentDir);
Expand All @@ -62,13 +66,18 @@ public void kill(DataSegment segment) throws SegmentLoadingException
parentDir = parentDir.getParentFile();
int maxDepth = 4; // if for some reason there's no datasSource directory, stop recursing somewhere reasonable
while (parentDir != null && --maxDepth >= 0) {
if (!parentDir.delete() || segment.getDataSource().equals(parentDir.getName())) {
// parentDir.listFiles().length > 0 check not strictly necessary, because parentDir.delete() fails on
// nonempty directories. However, including it here is nice since it makes our intent very clear (only
// remove nonempty directories) and it prevents making delete syscalls that are doomed to failure.
if (parentDir.listFiles().length > 0
|| !parentDir.delete()
|| segment.getDataSource().equals(parentDir.getName())) {
break;
}

parentDir = parentDir.getParentFile();
}
} else {
} else if (path.exists()) {
throw new SegmentLoadingException("Unknown file type[%s]", path);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CancellationException;

Expand Down Expand Up @@ -125,40 +126,43 @@ public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoading
public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
if (sourceFile.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return new FileUtils.FileCopyResult(sourceFile);
}
try {
final File[] files = sourceFile.listFiles();
if (files == null) {
throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath());
}

final File[] files = sourceFile.listFiles();
if (files == null) {
throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath());
}
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile);
for (final File oldFile : files) {
if (oldFile.isDirectory()) {
log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath());
continue;
if (sourceFile.equals(dir)) {
log.info("Asked to load [%s] into itself, done!", dir);
return new FileUtils.FileCopyResult(Arrays.asList(files));
}

result.addFiles(
FileUtils.retryCopy(
Files.asByteSource(oldFile),
new File(dir, oldFile.getName()),
shouldRetryPredicate(),
DEFAULT_RETRY_COUNT
).getFiles()
final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult();
boolean link = true;
for (final File oldFile : files) {
if (oldFile.isDirectory()) {
log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath());
continue;
}

final File newFile = new File(dir, oldFile.getName());
final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(oldFile, newFile);
link = link && linkOrCopyResult == FileUtils.LinkOrCopyResult.LINK;
result.addFile(newFile);
}
log.info(
"%s %d bytes from [%s] to [%s]",
link ? "Linked" : "Copied",
result.size(),
sourceFile.getAbsolutePath(),
dir.getAbsolutePath()
);
return result;
}
log.info(
"Copied %d bytes from [%s] to [%s]",
result.size(),
sourceFile.getAbsolutePath(),
dir.getAbsolutePath()
);
return result;
}
if (CompressionUtils.isZip(sourceFile.getName())) {
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to load from local directory [%s]", sourceFile.getAbsolutePath());
}
} else if (CompressionUtils.isZip(sourceFile.getName())) {
try {
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
Files.asByteSource(sourceFile),
Expand All @@ -177,8 +181,7 @@ public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final Fil
catch (IOException e) {
throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath());
}
}
if (CompressionUtils.isGz(sourceFile.getName())) {
} else if (CompressionUtils.isGz(sourceFile.getName())) {
final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName()));
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(
Files.asByteSource(sourceFile),
Expand All @@ -192,8 +195,9 @@ public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final Fil
outFile.getAbsolutePath()
);
return result;
} else {
throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath());
}
throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath());
}


Expand Down
Loading