diff --git a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java index 2ca3944d844a..d2c7a6d1a9b2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/FileUtils.java @@ -53,6 +53,12 @@ public class FileUtils { + public enum LinkOrCopyResult + { + LINK, + COPY + } + /** * Useful for retry functionality that doesn't want to stop Throwables, but does want to retry on Exceptions */ @@ -461,6 +467,26 @@ public static void deleteDirectory(final File directory) throws IOException org.apache.commons.io.FileUtils.deleteDirectory(directory); } + /** + * Hard-link "src" as "dest", if possible. If not possible -- perhaps they are on separate filesystems -- then + * copy "src" to "dest". + * + * @return whether a link or copy was made. Can be safely ignored if you don't care. + * + * @throws IOException if something went wrong + */ + public static LinkOrCopyResult linkOrCopy(final File src, final File dest) throws IOException + { + try { + Files.createLink(dest.toPath(), src.toPath()); + return LinkOrCopyResult.LINK; + } + catch (IOException e) { + Files.copy(src.toPath(), dest.toPath(), StandardCopyOption.REPLACE_EXISTING); + return LinkOrCopyResult.COPY; + } + } + public interface OutputStreamConsumer { T apply(OutputStream outputStream) throws IOException; diff --git a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java index f76f2e53d203..86909a26210f 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/FileUtilsTest.java @@ -244,4 +244,45 @@ public void testCopyLarge() throws IOException Assert.assertEquals(data.length(), result); Assert.assertEquals(data, StringUtils.fromUtf8(Files.readAllBytes(dstFile.toPath()))); } + + @Test + public void testLinkOrCopy1() throws IOException + { + // Will be a LINK. + + final File fromFile = temporaryFolder.newFile(); + final File toDir = temporaryFolder.newFolder(); + final File toFile = new File(toDir, "toFile"); + + Files.write(fromFile.toPath(), StringUtils.toUtf8("foo")); + final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(fromFile, toFile); + + // Verify the new link. + Assert.assertEquals(FileUtils.LinkOrCopyResult.LINK, linkOrCopyResult); + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath()))); + + // Verify they are actually the same file. + Files.write(fromFile.toPath(), StringUtils.toUtf8("bar")); + Assert.assertEquals("bar", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath()))); + } + + @Test + public void testLinkOrCopy2() throws IOException + { + // Will be a COPY, because the destination file already exists and therefore Files.createLink fails. + + final File fromFile = temporaryFolder.newFile(); + final File toFile = temporaryFolder.newFile(); + + Files.write(fromFile.toPath(), StringUtils.toUtf8("foo")); + final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(fromFile, toFile); + + // Verify the new link. + Assert.assertEquals(FileUtils.LinkOrCopyResult.COPY, linkOrCopyResult); + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath()))); + + // Verify they are not the same file. + Files.write(fromFile.toPath(), StringUtils.toUtf8("bar")); + Assert.assertEquals("foo", StringUtils.fromUtf8(Files.readAllBytes(toFile.toPath()))); + } } diff --git a/docs/dependencies/deep-storage.md b/docs/dependencies/deep-storage.md index 77ae9b27dab0..b63f968bf540 100644 --- a/docs/dependencies/deep-storage.md +++ b/docs/dependencies/deep-storage.md @@ -25,29 +25,52 @@ title: "Deep storage" Deep storage is where segments are stored. It is a storage mechanism that Apache Druid does not provide. This deep storage infrastructure defines the level of durability of your data, as long as Druid processes can see this storage infrastructure and get at the segments stored on it, you will not lose data no matter how many Druid nodes you lose. If segments disappear from this storage layer, then you will lose whatever data those segments represented. -## Local Mount +## Local -A local mount can be used for storage of segments as well. This allows you to use just your local file system or anything else that can be mount locally like NFS, Ceph, etc. This is the default deep storage implementation. +Local storage is intended for use in the following situations: -In order to use a local mount for deep storage, you need to set the following configuration in your common configs. +- You have just one server. +- Or, you have multiple servers, and they all have access to a shared filesystem (for example: NFS). + +In multi-server production clusters, rather than local storage with a shared filesystem, it is instead recommended to +use cloud-based deep storage ([Amazon S3](#amazon-s3-or-s3-compatible), [Google Cloud Storage](#google-cloud-storage), +or [Azure Blob Storage](#azure-blob-storage)), S3-compatible storage (like Minio), or [HDFS](#hdfs). These options are +generally more convenient, more scalable, and more robust than setting up a shared filesystem. + +The following configurations in `common.runtime.properties` apply to local storage: |Property|Possible Values|Description|Default| |--------|---------------|-----------|-------| -|`druid.storage.type`|local||Must be set.| -|`druid.storage.storageDirectory`||Directory for storing segments.|Must be set.| +|`druid.storage.type`|`local`||Must be set.| +|`druid.storage.storageDirectory`|any local directory|Directory for storing segments. Must be different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.|`/tmp/druid/localStorage`| +|`druid.storage.zip`|`true`, `false`|Whether segments in `druid.storage.storageDirectory` are written as directories (`false`) or zip files (`true`).|`false`| + +For example: + +``` +druid.storage.type=local +druid.storage.storageDirectory=/tmp/druid/localStorage +``` + +The `druid.storage.storageDirectory` must be set to a different path than `druid.segmentCache.locations` or +`druid.segmentCache.infoDir`. + +## Amazon S3 or S3-compatible + +See [`druid-s3-extensions`](../development/extensions-core/s3.md). -Note that you should generally set `druid.storage.storageDirectory` to something different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`. +## Google Cloud Storage -If you are using the Hadoop indexer in local mode, then just give it a local file as your output directory and it will work. +See [`druid-google-extensions`](../development/extensions-core/google.md). -## S3-compatible +## Azure Blob Storage -See [druid-s3-extensions extension documentation](../development/extensions-core/s3.md). +See [`druid-azure-extensions`](../development/extensions-core/azure.md). ## HDFS See [druid-hdfs-storage extension documentation](../development/extensions-core/hdfs.md). -## Additional Deep Stores +## Additional options -For additional deep stores, please see our [extensions list](../development/extensions.md). +For additional deep storage options, please see our [extensions list](../development/extensions.md). diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index e262b40da707..280483c11d3b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -54,7 +54,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private static final Logger log = new Logger(HdfsDataSegmentPusher.class); private final Configuration hadoopConfig; - private final ObjectMapper jsonMapper; // We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. // Please see https://github.com/apache/druid/pull/5684 @@ -68,7 +67,6 @@ public HdfsDataSegmentPusher( ) { this.hadoopConfig = hadoopConfig; - this.jsonMapper = jsonMapper; Path storageDir = new Path(config.getStorageDirectory()); this.fullyQualifiedStorageDirectory = Suppliers.memoize( () -> { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index a83bbaf51534..3a9000c3d6e9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -3159,6 +3159,7 @@ public void close() }; final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); dataSegmentPusherConfig.storageDirectory = getSegmentDirectory(); + dataSegmentPusherConfig.zip = true; final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); toolboxFactory = new TaskToolboxFactory( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 553b601f7fc8..99f939433f7d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -3124,6 +3124,7 @@ public void close() }; final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); dataSegmentPusherConfig.storageDirectory = getSegmentDirectory(); + dataSegmentPusherConfig.zip = true; final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); toolboxFactory = new TaskToolboxFactory( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 443f2c6d1e01..fd2b68ef39ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import it.unimi.dsi.fastutil.bytes.ByteArrays; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; @@ -162,6 +163,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -943,7 +945,8 @@ public DataSegment apply(String input) List segmentFiles = new ArrayList<>(); for (DataSegment segment : mdc.retrieveUnusedSegmentsForInterval("test_kill_task", Intervals.of("2011-04-01/P4D"))) { File file = new File((String) segment.getLoadSpec().get("path")); - FileUtils.mkdirp(file); + FileUtils.mkdirp(file.getParentFile()); + Files.write(file.toPath(), ByteArrays.EMPTY_ARRAY); segmentFiles.add(file); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java index f7156073cf02..923467c65bf5 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentKiller.java @@ -29,6 +29,7 @@ import java.io.IOException; /** + * */ public class LocalDataSegmentKiller implements DataSegmentKiller { @@ -51,9 +52,12 @@ public void kill(DataSegment segment) throws SegmentLoadingException log.info("Deleting segment[%s] from directory[%s].", segment.getId(), path); try { - if (path.getName().endsWith(".zip")) { + if ((path.getName().endsWith(".zip") && path.isFile()) || + (path.getName().equals(LocalDataSegmentPusher.INDEX_DIR) && path.isDirectory())) { // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip // or .../dataSource/interval/version/partitionNum/UUID/xxx.zip + // or -- > .../dataSource/interval/version/partitionNum/index/ + // or .../dataSource/interval/version/partitionNum/UUID/index/ File parentDir = path.getParentFile(); FileUtils.deleteDirectory(parentDir); @@ -62,13 +66,18 @@ public void kill(DataSegment segment) throws SegmentLoadingException parentDir = parentDir.getParentFile(); int maxDepth = 4; // if for some reason there's no datasSource directory, stop recursing somewhere reasonable while (parentDir != null && --maxDepth >= 0) { - if (!parentDir.delete() || segment.getDataSource().equals(parentDir.getName())) { + // parentDir.listFiles().length > 0 check not strictly necessary, because parentDir.delete() fails on + // nonempty directories. However, including it here is nice since it makes our intent very clear (only + // remove nonempty directories) and it prevents making delete syscalls that are doomed to failure. + if (parentDir.listFiles().length > 0 + || !parentDir.delete() + || segment.getDataSource().equals(parentDir.getName())) { break; } parentDir = parentDir.getParentFile(); } - } else { + } else if (path.exists()) { throw new SegmentLoadingException("Unknown file type[%s]", path); } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java index 5f63557f98eb..0f27dac9e1bb 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java @@ -43,6 +43,7 @@ import java.nio.charset.Charset; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.CancellationException; @@ -125,40 +126,43 @@ public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoading public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException { if (sourceFile.isDirectory()) { - if (sourceFile.equals(dir)) { - log.info("Asked to load [%s] into itself, done!", dir); - return new FileUtils.FileCopyResult(sourceFile); - } + try { + final File[] files = sourceFile.listFiles(); + if (files == null) { + throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath()); + } - final File[] files = sourceFile.listFiles(); - if (files == null) { - throw new SegmentLoadingException("No files found in [%s]", sourceFile.getAbsolutePath()); - } - final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(sourceFile); - for (final File oldFile : files) { - if (oldFile.isDirectory()) { - log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath()); - continue; + if (sourceFile.equals(dir)) { + log.info("Asked to load [%s] into itself, done!", dir); + return new FileUtils.FileCopyResult(Arrays.asList(files)); } - result.addFiles( - FileUtils.retryCopy( - Files.asByteSource(oldFile), - new File(dir, oldFile.getName()), - shouldRetryPredicate(), - DEFAULT_RETRY_COUNT - ).getFiles() + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); + boolean link = true; + for (final File oldFile : files) { + if (oldFile.isDirectory()) { + log.info("[%s] is a child directory, skipping", oldFile.getAbsolutePath()); + continue; + } + + final File newFile = new File(dir, oldFile.getName()); + final FileUtils.LinkOrCopyResult linkOrCopyResult = FileUtils.linkOrCopy(oldFile, newFile); + link = link && linkOrCopyResult == FileUtils.LinkOrCopyResult.LINK; + result.addFile(newFile); + } + log.info( + "%s %d bytes from [%s] to [%s]", + link ? "Linked" : "Copied", + result.size(), + sourceFile.getAbsolutePath(), + dir.getAbsolutePath() ); + return result; } - log.info( - "Copied %d bytes from [%s] to [%s]", - result.size(), - sourceFile.getAbsolutePath(), - dir.getAbsolutePath() - ); - return result; - } - if (CompressionUtils.isZip(sourceFile.getName())) { + catch (IOException e) { + throw new SegmentLoadingException(e, "Unable to load from local directory [%s]", sourceFile.getAbsolutePath()); + } + } else if (CompressionUtils.isZip(sourceFile.getName())) { try { final FileUtils.FileCopyResult result = CompressionUtils.unzip( Files.asByteSource(sourceFile), @@ -177,8 +181,7 @@ public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final Fil catch (IOException e) { throw new SegmentLoadingException(e, "Unable to unzip file [%s]", sourceFile.getAbsolutePath()); } - } - if (CompressionUtils.isGz(sourceFile.getName())) { + } else if (CompressionUtils.isGz(sourceFile.getName())) { final File outFile = new File(dir, CompressionUtils.getGzBaseName(sourceFile.getName())); final FileUtils.FileCopyResult result = CompressionUtils.gunzip( Files.asByteSource(sourceFile), @@ -192,8 +195,9 @@ public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final Fil outFile.getAbsolutePath() ); return result; + } else { + throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath()); } - throw new SegmentLoadingException("Do not know how to handle source [%s]", sourceFile.getAbsolutePath()); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java index 07222ff0b3b9..a783d5c31bc6 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java @@ -23,6 +23,7 @@ import com.google.inject.Inject; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -31,6 +32,8 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.Map; import java.util.UUID; @@ -38,7 +41,8 @@ public class LocalDataSegmentPusher implements DataSegmentPusher { private static final Logger log = new Logger(LocalDataSegmentPusher.class); - private static final String INDEX_FILENAME = "index.zip"; + public static final String INDEX_DIR = "index"; + public static final String INDEX_ZIP_FILENAME = "index.zip"; private final LocalDataSegmentPusherConfig config; @@ -76,7 +80,11 @@ public DataSegment pushToPath(File dataSegmentFile, DataSegment segment, String log.debug("Copying segment[%s] to local filesystem at location[%s]", segment.getId(), outDir.toString()); + // Add binary version to the DataSegment object. + segment = segment.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)); + if (dataSegmentFile.equals(outDir)) { + // Input and output directories are the same. Compute size, build a loadSpec, and return. long size = 0; for (File file : dataSegmentFile.listFiles()) { size += file.length(); @@ -85,19 +93,35 @@ public DataSegment pushToPath(File dataSegmentFile, DataSegment segment, String return segment.withLoadSpec(makeLoadSpec(outDir.toURI())) .withSize(size) .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)); + } else if (config.isZip()) { + return pushZip(dataSegmentFile, outDir, segment); + } else { + return pushNoZip(dataSegmentFile, outDir, segment); } + } - final File tmpOutDir = new File(config.getStorageDirectory(), makeIntermediateDir()); - log.debug("Creating intermediate directory[%s] for segment[%s].", tmpOutDir.toString(), segment.getId()); - FileUtils.mkdirp(tmpOutDir); + @Override + public Map makeLoadSpec(URI finalIndexZipFilePath) + { + return ImmutableMap.of("type", "local", "path", finalIndexZipFilePath.getPath()); + } - try { - final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME); - final long size = compressSegment(dataSegmentFile, tmpIndexFile); + private String makeIntermediateDir() + { + return "intermediate_pushes/" + UUID.randomUUID(); + } - final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI())) - .withSize(size) - .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)); + private DataSegment pushZip(final File inDir, final File outDir, final DataSegment baseSegment) throws IOException + { + final File tmpSegmentDir = new File(config.getStorageDirectory(), makeIntermediateDir()); + final File tmpIndexFile = new File(tmpSegmentDir, INDEX_ZIP_FILENAME); + + log.debug("Creating intermediate directory[%s] for segment[%s].", tmpSegmentDir.toString(), baseSegment.getId()); + FileUtils.mkdirp(tmpSegmentDir); + + try { + log.debug("Compressing files from[%s] to [%s]", inDir, tmpIndexFile); + final long size = CompressionUtils.zip(inDir, tmpIndexFile, true); FileUtils.mkdirp(outDir); final File indexFileTarget = new File(outDir, tmpIndexFile.getName()); @@ -106,27 +130,61 @@ public DataSegment pushToPath(File dataSegmentFile, DataSegment segment, String throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget); } - return dataSegment; + return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_ZIP_FILENAME).toURI())) + .withSize(size); } finally { - FileUtils.deleteDirectory(tmpOutDir); + FileUtils.deleteDirectory(tmpSegmentDir); } } - @Override - public Map makeLoadSpec(URI finalIndexZipFilePath) + private DataSegment pushNoZip(final File inDir, final File outDir, final DataSegment baseSegment) throws IOException { - return ImmutableMap.of("type", "local", "path", finalIndexZipFilePath.getPath()); - } + final File tmpSegmentDir = new File(config.getStorageDirectory(), makeIntermediateDir()); + FileUtils.mkdirp(tmpSegmentDir); - private String makeIntermediateDir() - { - return "intermediate_pushes/" + UUID.randomUUID(); - } + try { + final File[] files = inDir.listFiles(); + if (files == null) { + throw new IOE("Cannot list directory [%s]", inDir); + } - private long compressSegment(File dataSegmentFile, File dest) throws IOException - { - log.debug("Compressing files from[%s] to [%s]", dataSegmentFile, dest); - return CompressionUtils.zip(dataSegmentFile, dest, true); + long size = 0; + for (final File file : files) { + if (file.isFile()) { + size += file.length(); + FileUtils.linkOrCopy(file, new File(tmpSegmentDir, file.getName())); + } else { + // Segment directories are expected to be flat. + throw new IOE("Unexpected subdirectory [%s]", file.getName()); + } + } + + final File segmentDir = new File(outDir, INDEX_DIR); + FileUtils.mkdirp(outDir); + + try { + Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(), StandardCopyOption.ATOMIC_MOVE); + } + catch (IOException e) { + if (segmentDir.exists()) { + // Move old directory out of the way, then try again. This makes the latest push win when we push to the + // same directory twice, so behavior is compatible with the zip style of pushing. + Files.move( + segmentDir.toPath(), + new File(outDir, StringUtils.format("%s_old_%s", INDEX_DIR, UUID.randomUUID())).toPath(), + StandardCopyOption.ATOMIC_MOVE + ); + + Files.move(tmpSegmentDir.toPath(), segmentDir.toPath(), StandardCopyOption.ATOMIC_MOVE); + } + } + + return baseSegment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_DIR).toURI())) + .withSize(size); + } + finally { + FileUtils.deleteDirectory(tmpSegmentDir); + } } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java index 15868da4de18..e539ad20af66 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusherConfig.java @@ -30,8 +30,16 @@ public class LocalDataSegmentPusherConfig @JsonProperty public File storageDirectory = new File("/tmp/druid/localStorage"); + @JsonProperty + public boolean zip = false; + public File getStorageDirectory() { return storageDirectory; } + + public boolean isZip() + { + return zip; + } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java index bc17677f36ab..2dd6667165ac 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentKillerTest.java @@ -29,13 +29,30 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; import java.util.UUID; +@RunWith(Parameterized.class) public class LocalDataSegmentKillerTest { + private static final String DATASOURCE_NAME = "ds"; + + private final boolean zip; + + public LocalDataSegmentKillerTest(boolean zip) + { + this.zip = zip; + } + + @Parameterized.Parameters(name = "zip = {0}") + public static Iterable constructorFeeder() + { + return ImmutableList.of(new Object[]{false}, new Object[]{true}); + } @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -46,12 +63,12 @@ public void testKill() throws Exception LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()); // Create following segments and then delete them in this order and assert directory deletions - // /tmp/dataSource/interval1/v1/0/index.zip - // /tmp/dataSource/interval1/v1/1/index.zip - // /tmp/dataSource/interval1/v2/0/index.zip - // /tmp/dataSource/interval2/v1/0/index.zip + // /tmp/dataSource/interval1/v1/0/ + // /tmp/dataSource/interval1/v1/1/ + // /tmp/dataSource/interval1/v2/0/ + // /tmp/dataSource/interval2/v1/0/ - final File dataSourceDir = temporaryFolder.newFolder(); + final File dataSourceDir = temporaryFolder.newFolder(DATASOURCE_NAME); File interval1Dir = new File(dataSourceDir, "interval1"); File version11Dir = new File(interval1Dir, "v1"); @@ -72,27 +89,28 @@ public void testKill() throws Exception makePartitionDirWithIndex(partition012Dir); - killer.kill(getSegmentWithPath(new File(partition011Dir, "index.zip").toString())); + killer.kill(getSegmentWithPath(partition011Dir)); Assert.assertFalse(partition011Dir.exists()); Assert.assertTrue(partition111Dir.exists()); Assert.assertTrue(partition021Dir.exists()); Assert.assertTrue(partition012Dir.exists()); - killer.kill(getSegmentWithPath(new File(partition111Dir, "index.zip").toString())); + killer.kill(getSegmentWithPath(partition111Dir)); Assert.assertFalse(version11Dir.exists()); Assert.assertTrue(partition021Dir.exists()); Assert.assertTrue(partition012Dir.exists()); - killer.kill(getSegmentWithPath(new File(partition021Dir, "index.zip").toString())); + killer.kill(getSegmentWithPath(partition021Dir)); Assert.assertFalse(interval1Dir.exists()); Assert.assertTrue(partition012Dir.exists()); - killer.kill(getSegmentWithPath(new File(partition012Dir, "index.zip").toString())); + killer.kill(getSegmentWithPath(partition012Dir)); Assert.assertFalse(dataSourceDir.exists()); + Assert.assertTrue(dataSourceDir.getParentFile().exists()); } @Test @@ -100,7 +118,8 @@ public void testKillUniquePath() throws Exception { final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()); final String uuid = UUID.randomUUID().toString().substring(0, 5); - final File dataSourceDir = temporaryFolder.newFolder("dataSource"); + final File emptyParentDir = temporaryFolder.newFolder(); + final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME); final File intervalDir = new File(dataSourceDir, "interval"); final File versionDir = new File(intervalDir, "1"); final File partitionDir = new File(versionDir, "0"); @@ -108,30 +127,69 @@ public void testKillUniquePath() throws Exception makePartitionDirWithIndex(uuidDir); - killer.kill(getSegmentWithPath(new File(uuidDir, "index.zip").toString())); + killer.kill(getSegmentWithPath(uuidDir)); Assert.assertFalse(uuidDir.exists()); Assert.assertFalse(partitionDir.exists()); Assert.assertFalse(versionDir.exists()); Assert.assertFalse(intervalDir.exists()); Assert.assertFalse(dataSourceDir.exists()); + + // Verify that we stop after the datasource dir, even though the parent is empty. + Assert.assertTrue(emptyParentDir.exists()); + Assert.assertEquals(0, emptyParentDir.listFiles().length); + } + + @Test + public void testKillUniquePathWrongDataSourceNameInDirectory() throws Exception + { + // Verify that + final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()); + final String uuid = UUID.randomUUID().toString().substring(0, 5); + final File emptyParentDir = temporaryFolder.newFolder(); + final File dataSourceDir = new File(emptyParentDir, DATASOURCE_NAME + "_wrong"); + final File intervalDir = new File(dataSourceDir, "interval"); + final File versionDir = new File(intervalDir, "1"); + final File partitionDir = new File(versionDir, "0"); + final File uuidDir = new File(partitionDir, uuid); + + makePartitionDirWithIndex(uuidDir); + + killer.kill(getSegmentWithPath(uuidDir)); + + Assert.assertFalse(uuidDir.exists()); + Assert.assertFalse(partitionDir.exists()); + Assert.assertFalse(versionDir.exists()); + Assert.assertFalse(intervalDir.exists()); + Assert.assertFalse(dataSourceDir.exists()); + + // Verify that we stop at 4 pruned paths, even if we don't encounter the datasource-named directory. + Assert.assertTrue(emptyParentDir.exists()); + Assert.assertEquals(0, emptyParentDir.listFiles().length); } private void makePartitionDirWithIndex(File path) throws IOException { FileUtils.mkdirp(path); - Assert.assertTrue(new File(path, "index.zip").createNewFile()); + + if (zip) { + Assert.assertTrue(new File(path, LocalDataSegmentPusher.INDEX_ZIP_FILENAME).createNewFile()); + } else { + Assert.assertTrue(new File(path, LocalDataSegmentPusher.INDEX_DIR).mkdir()); + } } - private DataSegment getSegmentWithPath(String path) + private DataSegment getSegmentWithPath(File baseDirectory) { + final String fileName = zip ? LocalDataSegmentPusher.INDEX_ZIP_FILENAME : LocalDataSegmentPusher.INDEX_DIR; + final File path = new File(baseDirectory, fileName); return new DataSegment( - "dataSource", + DATASOURCE_NAME, Intervals.of("2000/3000"), "ver", ImmutableMap.of( "type", "local", - "path", path + "path", path.toURI().getPath() ), ImmutableList.of("product"), ImmutableList.of("visited_sum", "unique_hosts"), diff --git a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java index 739075e5e124..8651ac7a8e0a 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -48,7 +48,9 @@ public class LocalDataSegmentPusherTest public ExpectedException exception = ExpectedException.none(); LocalDataSegmentPusher localDataSegmentPusher; + LocalDataSegmentPusher localDataSegmentPusherZip; LocalDataSegmentPusherConfig config; + LocalDataSegmentPusherConfig configZip; File dataSegmentFiles; DataSegment dataSegment = new DataSegment( "ds", @@ -77,14 +79,53 @@ public class LocalDataSegmentPusherTest public void setUp() throws IOException { config = new LocalDataSegmentPusherConfig(); + config.zip = false; config.storageDirectory = temporaryFolder.newFolder(); localDataSegmentPusher = new LocalDataSegmentPusher(config); + + configZip = new LocalDataSegmentPusherConfig(); + configZip.zip = true; + configZip.storageDirectory = temporaryFolder.newFolder(); + localDataSegmentPusherZip = new LocalDataSegmentPusher(configZip); + dataSegmentFiles = temporaryFolder.newFolder(); Files.asByteSink(new File(dataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x9)); } @Test - public void testPush() throws IOException + public void testPushZip() throws IOException + { + /* DataSegment - Used to create LoadSpec and Create outDir (Local Deep Storage location in this case) + File dataSegmentFile - Used to get location of segment files like version.bin, meta.smoosh and xxxxx.smoosh + */ + final DataSegment dataSegment2 = dataSegment.withVersion("v2"); + + DataSegment returnSegment1 = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false); + DataSegment returnSegment2 = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment2, false); + + Assert.assertNotNull(returnSegment1); + Assert.assertEquals(dataSegment, returnSegment1); + + Assert.assertNotNull(returnSegment2); + Assert.assertEquals(dataSegment2, returnSegment2); + + Assert.assertNotEquals( + localDataSegmentPusherZip.getStorageDir(dataSegment, false), + localDataSegmentPusherZip.getStorageDir(dataSegment2, false) + ); + + for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) { + File outDir = new File( + configZip.getStorageDirectory(), + localDataSegmentPusherZip.getStorageDir(returnSegment, false) + ); + File versionFile = new File(outDir, "index.zip"); + Assert.assertTrue(versionFile.exists()); + } + } + + @Test + public void testPushNoZip() throws IOException { /* DataSegment - Used to create LoadSpec and Create outDir (Local Deep Storage location in this case) File dataSegmentFile - Used to get location of segment files like version.bin, meta.smoosh and xxxxx.smoosh @@ -107,19 +148,43 @@ public void testPush() throws IOException for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) { File outDir = new File( - config.getStorageDirectory(), - localDataSegmentPusher.getStorageDir(returnSegment, false) + new File( + config.getStorageDirectory(), + localDataSegmentPusher.getStorageDir(returnSegment, false) + ), + "index" ); - File versionFile = new File(outDir, "index.zip"); + + // Check against loadSpec. + Assert.assertEquals( + outDir.toURI().getPath(), + returnSegment.getLoadSpec().get("path") + ); + + // Check for version.bin. + File versionFile = new File(outDir, "version.bin"); Assert.assertTrue(versionFile.exists()); } } @Test - public void testPushUseUniquePath() throws IOException + public void testPushNoZipUseUniquePath() throws IOException { DataSegment segment = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); + String path = segment.getLoadSpec().get("path").toString(); + Pattern pattern = Pattern.compile( + ".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index/$" + ); + Assert.assertTrue(path, pattern.matcher(path).matches()); + Assert.assertTrue(new File(path).exists()); + } + + @Test + public void testPushZipUseUniquePath() throws IOException + { + DataSegment segment = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, true); + String path = segment.getLoadSpec().get("path").toString(); Pattern pattern = Pattern.compile( ".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index\\.zip" @@ -129,8 +194,12 @@ public void testPushUseUniquePath() throws IOException } @Test - public void testLastPushWinsForConcurrentPushes() throws IOException + public void testLastPushWinsForConcurrentNoZipPushes() throws IOException { + // Behavioral difference between zip and no-zip pushes when the same segment identifier is pushed twice: + // Later zip pushes overwrite earlier ones. Later no-zip pushes throw errors. In situations where the same + // segment may be pushed twice, we expect "useUniquePath" to be set on the pusher. + File replicatedDataSegmentFiles = temporaryFolder.newFolder(); Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); @@ -139,10 +208,38 @@ public void testLastPushWinsForConcurrentPushes() throws IOException Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions()); Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions()); - File unzipDir = new File(config.storageDirectory, "unzip"); + final String expectedPath = StringUtils.format( + "%s/%s", + config.storageDirectory, + "ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index/" + ); + + Assert.assertEquals(expectedPath, returnSegment1.getLoadSpec().get("path")); + Assert.assertEquals(expectedPath, returnSegment2.getLoadSpec().get("path")); + + final File versionFile = new File(expectedPath, "version.bin"); + Assert.assertEquals(0x8, Ints.fromByteArray(Files.toByteArray(versionFile))); + } + + @Test + public void testLastPushWinsForConcurrentZipPushes() throws IOException + { + // Behavioral difference between zip and no-zip pushes when the same segment identifier is pushed twice: + // Later zip pushes overwrite earlier ones. Later no-zip pushes throw errors. In situations where the same + // segment may be pushed twice, we expect "useUniquePath" to be set on the pusher. + + File replicatedDataSegmentFiles = temporaryFolder.newFolder(); + Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); + DataSegment returnSegment1 = localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false); + DataSegment returnSegment2 = localDataSegmentPusherZip.push(replicatedDataSegmentFiles, dataSegment2, false); + + Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions()); + Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions()); + + File unzipDir = new File(configZip.storageDirectory, "unzip"); FileUtils.mkdirp(unzipDir); CompressionUtils.unzip( - new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"), + new File(configZip.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"), unzipDir ); @@ -160,27 +257,38 @@ public void testPushCannotCreateDirectory() throws IOException localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); } + @Test + public void testPushZipCannotCreateDirectory() throws IOException + { + exception.expect(IOException.class); + exception.expectMessage("Cannot create directory"); + configZip.storageDirectory = new File(configZip.storageDirectory, "xxx"); + Assert.assertTrue(configZip.storageDirectory.mkdir()); + configZip.storageDirectory.setWritable(false); + localDataSegmentPusherZip.push(dataSegmentFiles, dataSegment, false); + } + @Test public void testPathForHadoopAbsolute() { - config.storageDirectory = new File("/druid"); + configZip.storageDirectory = new File("/druid"); // If this test fails because the path is returned as "file:/druid/", this can happen // when a /druid directory exists on the local filesystem. Assert.assertEquals( "file:/druid", - new LocalDataSegmentPusher(config).getPathForHadoop() + new LocalDataSegmentPusher(configZip).getPathForHadoop() ); } @Test public void testPathForHadoopRelative() { - config.storageDirectory = new File("druid"); + configZip.storageDirectory = new File("druid"); Assert.assertEquals( StringUtils.format("file:%s/druid", System.getProperty("user.dir")), - new LocalDataSegmentPusher(config).getPathForHadoop() + new LocalDataSegmentPusher(configZip).getPathForHadoop() ); } } diff --git a/website/.spelling b/website/.spelling index a3212dc0ff38..a279efe2a96c 100644 --- a/website/.spelling +++ b/website/.spelling @@ -738,6 +738,8 @@ appenders druid-hdfs-storage druid-s3-extensions druid.sql.planner.maxNumericInFilters +Minio +multi-server - ../docs/dependencies/metadata-storage.md BasicDataSource - ../docs/dependencies/zookeeper.md