From 1184d6d7a3d31d74c00cb742feab08b971353c2a Mon Sep 17 00:00:00 2001 From: David Lim Date: Sun, 29 Apr 2018 22:59:48 -0600 Subject: [PATCH] Use unique segment paths for Kafka indexing (#5692) * support unique segment file paths * forbiddenapis * code review changes * code review changes * code review changes * checkstyle fix --- .../segment/loading/DataSegmentFinder.java | 27 ++++ .../segment/loading/DataSegmentKiller.java | 33 +++- .../segment/loading/DataSegmentPusher.java | 55 ++++--- .../operations/insert-segment-to-db.md | 45 ++++-- .../storage/azure/AzureDataSegmentPusher.java | 18 +-- .../io/druid/storage/azure/AzureStorage.java | 18 +-- .../io/druid/storage/azure/AzureTaskLogs.java | 2 +- .../azure/AzureDataSegmentPusherTest.java | 36 +++-- .../storage/azure/AzureTaskLogsTest.java | 2 +- .../cassandra/CassandraDataSegmentPusher.java | 29 ++-- .../CloudFilesDataSegmentPusher.java | 38 +++-- .../CloudFilesDataSegmentPusherTest.java | 2 +- .../google/GoogleDataSegmentPusher.java | 16 +- .../google/GoogleDataSegmentPusherTest.java | 14 +- .../storage/hdfs/HdfsDataSegmentFinder.java | 50 ++++-- .../storage/hdfs/HdfsDataSegmentKiller.java | 85 +++++----- .../storage/hdfs/HdfsDataSegmentPusher.java | 53 ++++--- .../org/apache/hadoop/fs/HadoopFsWrapper.java | 7 +- .../loading/HdfsDataSegmentFinderTest.java | 25 +++ .../hdfs/HdfsDataSegmentKillerTest.java | 62 +++++--- .../hdfs/HdfsDataSegmentPusherTest.java | 148 +++++++++++------- .../druid/indexing/kafka/KafkaIndexTask.java | 1 + .../indexing/kafka/KafkaIndexTaskTest.java | 7 +- .../druid/storage/s3/S3DataSegmentFinder.java | 16 +- .../druid/storage/s3/S3DataSegmentMover.java | 7 +- .../druid/storage/s3/S3DataSegmentPusher.java | 20 +-- .../storage/s3/S3DataSegmentFinderTest.java | 91 ++++++++--- .../storage/s3/S3DataSegmentPusherTest.java | 23 ++- .../main/java/io/druid/indexer/JobHelper.java | 14 +- .../common/index/YeOldePlumberSchool.java | 5 +- .../common/task/ConvertSegmentTask.java | 5 +- .../druid/indexing/common/task/IndexTask.java | 3 +- .../indexing/common/task/MergeTaskBase.java | 5 +- .../indexing/common/task/IndexTaskTest.java | 20 ++- .../task/SameIntervalMergeTaskTest.java | 6 +- .../IngestSegmentFirehoseFactoryTest.java | 13 +- .../indexing/overlord/TaskLifecycleTest.java | 36 +++-- .../indexing/test/TestDataSegmentPusher.java | 3 +- .../loading/LocalDataSegmentFinder.java | 26 +-- .../loading/LocalDataSegmentKiller.java | 22 +-- .../loading/LocalDataSegmentPusher.java | 40 ++--- .../SegmentLoaderLocalCacheManager.java | 12 +- .../realtime/appenderator/Appenderator.java | 7 +- .../appenderator/AppenderatorImpl.java | 24 +-- .../appenderator/AppenderatorPlumber.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 32 +++- .../appenderator/BatchAppenderatorDriver.java | 23 +-- .../StreamAppenderatorDriver.java | 12 +- .../realtime/plumber/RealtimePlumber.java | 7 - .../loading/LocalDataSegmentFinderTest.java | 47 +++++- .../loading/LocalDataSegmentKillerTest.java | 23 +++ .../loading/LocalDataSegmentPusherTest.java | 40 ++--- .../appenderator/AppenderatorTest.java | 3 +- .../appenderator/AppenderatorTester.java | 2 +- .../BatchAppenderatorDriverTest.java | 14 +- .../StreamAppenderatorDriverFailTest.java | 110 ++++++++++++- .../StreamAppenderatorDriverTest.java | 39 +++-- .../java/io/druid/cli/CliRealtimeExample.java | 2 +- 58 files changed, 996 insertions(+), 531 deletions(-) diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentFinder.java b/api/src/main/java/io/druid/segment/loading/DataSegmentFinder.java index 937a42e72c23..66af8353dafb 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentFinder.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentFinder.java @@ -20,8 +20,11 @@ package io.druid.segment.loading; import io.druid.guice.annotations.ExtensionPoint; +import io.druid.java.util.common.Pair; +import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; +import java.util.Map; import java.util.Set; /** @@ -31,6 +34,8 @@ @ExtensionPoint public interface DataSegmentFinder { + Logger log = new Logger(DataSegmentFinder.class); + /** * This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath * workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder. @@ -46,4 +51,26 @@ public interface DataSegmentFinder * @return a set of segments that were found underneath workingDirPath */ Set findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException; + + /** + * Adds dataSegment if it does not exist in timestampedSegments. If it exists, replaces entry if segmentModifiedAt is + * newer than stored timestamp. + * + * @param timestampedSegments map of > containing segments with modified time + * @param dataSegment segment to add + * @param segmentModifiedAt segment modified timestamp + */ + static void putInMapRetainingNewest( + Map> timestampedSegments, DataSegment dataSegment, long segmentModifiedAt + ) + { + timestampedSegments.merge( + dataSegment.getIdentifier(), + Pair.of(dataSegment, segmentModifiedAt), + (previous, current) -> { + log.warn("Multiple copies of segmentId [%s] found, using newest version", current.lhs.getIdentifier()); + return previous.rhs > current.rhs ? previous : current; + } + ); + } } diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentKiller.java b/api/src/main/java/io/druid/segment/loading/DataSegmentKiller.java index c26a73daeb10..96bbeb9362d0 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentKiller.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentKiller.java @@ -20,16 +20,41 @@ package io.druid.segment.loading; import io.druid.guice.annotations.ExtensionPoint; +import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; import java.io.IOException; -/** - */ @ExtensionPoint public interface DataSegmentKiller { - void kill(DataSegment segments) throws SegmentLoadingException; - void killAll() throws IOException; + Logger log = new Logger(DataSegmentKiller.class); + + /** + * Removes segment files (index and metadata) from deep storage. + * @param segment the segment to kill + * @throws SegmentLoadingException if the segment could not be completely removed + */ + void kill(DataSegment segment) throws SegmentLoadingException; + /** + * A more stoic killer who doesn't throw a tantrum if things get messy. Use when killing segments for best-effort + * cleanup. + * @param segment the segment to kill + */ + default void killQuietly(DataSegment segment) + { + try { + kill(segment); + } + catch (Exception e) { + log.debug(e, "Failed to kill segment %s", segment); + } + } + + /** + * Like a nuke. Use wisely. Used by the 'reset-cluster' command, and of the built-in deep storage implementations, it + * is only implemented by local and HDFS. + */ + void killAll() throws IOException; } diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index 2ce046665977..7c4cead40cb2 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.UUID; @ExtensionPoint public interface DataSegmentPusher @@ -44,36 +45,48 @@ public interface DataSegmentPusher * Pushes index files and segment descriptor to deep storage. * @param file directory containing index files * @param segment segment descriptor - * @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict. - * The behavior of the indexer determines whether this should be true or false. For example, - * since Tranquility does not guarantee that replica tasks will generate indexes with the same - * data, the first segment pushed should be favored since otherwise multiple historicals may - * load segments with the same identifier but different contents which is a bad situation. On - * the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can - * lose or repeat data if it fails to write a segment because it already exists and overwriting - * is not permitted. This situation can occur if a task fails after pushing to deep storage but - * before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161. + * @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica + * tasks can either overwrite or fail to overwrite existing segments leading to the possibility + * of different versions of the same segment ID containing different data. As an example, a Kafka + * indexing task starting at offset A and ending at offset B may push a segment to deep storage + * and then fail before writing the loadSpec to the metadata table, resulting in a replacement + * task being spawned. This replacement will also start at offset A but will read to offset C and + * will then push a segment to deep storage and write the loadSpec metadata. Without unique file + * paths, this can only work correctly if new segments overwrite existing segments. Suppose that + * at this point the task then fails so that the supervisor retries again from offset A. This 3rd + * attempt will overwrite the segments in deep storage before failing to write the loadSpec + * metadata, resulting in inconsistencies in the segment data now in deep storage and copies of + * the segment already loaded by historicals. * - * If replaceExisting is true, existing objects MUST be overwritten, since failure to do so - * will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be - * prioritized but it is acceptable if they are overwritten (deep storages may be eventually - * consistent or otherwise unable to support transactional writes). + * If unique paths are used, caller is responsible for cleaning up segments that were pushed but + * were not written to the metadata table (for example when using replica tasks). * @return segment descriptor * @throws IOException */ - DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException; + DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException; //use map instead of LoadSpec class to avoid dependency pollution. Map makeLoadSpec(URI finalIndexZipFilePath); + /** + * @deprecated backward-compatibiliy shim that should be removed on next major release; + * use {@link #getStorageDir(DataSegment, boolean)} instead. + */ + @Deprecated default String getStorageDir(DataSegment dataSegment) { - return getDefaultStorageDir(dataSegment); + return getStorageDir(dataSegment, false); + } + + default String getStorageDir(DataSegment dataSegment, boolean useUniquePath) + { + return getDefaultStorageDir(dataSegment, useUniquePath); } default String makeIndexPathName(DataSegment dataSegment, String indexName) { - return StringUtils.format("./%s/%s", getStorageDir(dataSegment), indexName); + // This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false + return StringUtils.format("./%s/%s", getStorageDir(dataSegment, false), indexName); } /** @@ -89,13 +102,19 @@ default List getAllowedPropertyPrefixesForHadoop() // If above format is ever changed, make sure to change it appropriately in other places // e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories // on segment deletion if segment being deleted was the only segment - static String getDefaultStorageDir(DataSegment segment) + static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath) { return JOINER.join( segment.getDataSource(), StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()), segment.getVersion(), - segment.getShardSpec().getPartitionNum() + segment.getShardSpec().getPartitionNum(), + useUniquePath ? generateUniquePath() : null ); } + + static String generateUniquePath() + { + return UUID.randomUUID().toString(); + } } diff --git a/docs/content/operations/insert-segment-to-db.md b/docs/content/operations/insert-segment-to-db.md index 60549e69d6c2..cd5ba2264aeb 100644 --- a/docs/content/operations/insert-segment-to-db.md +++ b/docs/content/operations/insert-segment-to-db.md @@ -5,24 +5,43 @@ layout: doc_page `insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used to update the segment table in metadata storage after people manually migrate segments from one place to another. -It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the +It can also be used to insert missing segments into Druid, or even recover metadata storage by telling it where the segments are stored. -Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere -the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering. +**Note:** This tool simply scans the deep storage directory to reconstruct the metadata entries used to locate and +identify each segment. It does not have any understanding about whether those segments _should actually_ be written to +the metadata storage. In certain cases, this can lead to undesired or inconsistent results. Some examples of things to +watch out for: + - Dropped datasources will be re-enabled. + - The latest version of each segment set will be loaded by Druid, which in some cases may not be the version you + actually want. An example of this is a bad compaction job that generates segments which need to be manually rolled + back by removing that version from the metadata table. If these segments are not also removed from deep storage, + they will be imported back into the metadata table and overshadow the correct version. + - Some indexers such as the Kafka indexing service have the potential to generate more than one set of segments that + have the same segment ID but different contents. When the metadata is first written, the correct set of segments is + referenced and the other set is normally deleted from deep storage. It is possible however that an unhandled + exception could result in multiple sets of segments with the same segment ID remaining in deep storage. Since this + tool does not know which one is the 'correct' one to use, it will simply select the newest segment set and ignore + the other versions. If the wrong segment set is picked, the exactly-once semantics of the Kafka indexing service + will no longer hold true and you may get duplicated or dropped events. + +With these considerations in mind, it is recommended that data migrations be done by exporting the original metadata +storage directly, since that is the definitive cluster state. This tool should be used as a last resort when a direct +export is not possible. + +**Note:** This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere +with the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering. In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument -or runtime.properties file. Specifically, this tool needs to know +or runtime.properties file. Specifically, this tool needs to know: -`druid.metadata.storage.type` - -`druid.metadata.storage.connector.connectURI` - -`druid.metadata.storage.connector.user` - -`druid.metadata.storage.connector.password` - -`druid.storage.type` +``` +druid.metadata.storage.type +druid.metadata.storage.connector.connectURI +druid.metadata.storage.connector.user +druid.metadata.storage.connector.password +druid.storage.type +``` Besides the properties above, you also need to specify the location where the segments are stored and whether you want to update descriptor.json (`partitionNum_descriptor.json` for HDFS data storage). These two can be provided through command line arguments. diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 39ab34258e0d..592b956c260d 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.microsoft.azure.storage.StorageException; - import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; @@ -92,9 +91,9 @@ public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final Dat return descriptorFile; } - public Map getAzurePaths(final DataSegment segment) + public Map getAzurePaths(final DataSegment segment, final boolean useUniquePath) { - final String storageDir = this.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment, useUniquePath); return ImmutableMap.of( "index", StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), @@ -109,13 +108,12 @@ public DataSegment uploadDataSegment( final long size, final File compressedSegmentData, final File descriptorFile, - final Map azurePaths, - final boolean replaceExisting + final Map azurePaths ) throws StorageException, IOException, URISyntaxException { - azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"), replaceExisting); - azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"), replaceExisting); + azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index")); + azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor")); final DataSegment outSegment = segment .withSize(size) @@ -132,7 +130,7 @@ public DataSegment uploadDataSegment( } @Override - public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting) + public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean useUniquePath) throws IOException { log.info("Uploading [%s] to Azure.", indexFilesDir); @@ -146,7 +144,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin final long size = CompressionUtils.zip(indexFilesDir, zipOutFile); final File descFile = descriptorFile = createSegmentDescriptorFile(jsonMapper, segment); - final Map azurePaths = getAzurePaths(segment); + final Map azurePaths = getAzurePaths(segment, useUniquePath); return AzureUtils.retryAzureOperation( new Callable() @@ -154,7 +152,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin @Override public DataSegment call() throws Exception { - return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting); + return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths); } }, config.getMaxTries() diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java index 28fa9dad56c6..25a4764ad845 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java @@ -23,7 +23,6 @@ import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.ListBlobItem; import io.druid.java.util.common.logger.Logger; @@ -77,27 +76,14 @@ public List emptyCloudBlobDirectory(final String containerName, final St } return deletedFiles; - } - public void uploadBlob( - final File file, - final String containerName, - final String blobPath, - final boolean replaceExisting - ) + public void uploadBlob(final File file, final String containerName, final String blobPath) throws IOException, StorageException, URISyntaxException - { CloudBlobContainer container = getCloudBlobContainer(containerName); try (FileInputStream stream = new FileInputStream(file)) { - CloudBlockBlob blob = container.getBlockBlobReference(blobPath); - - if (!replaceExisting && blob.exists()) { - log.info("Skipping push because blob [%s] exists && replaceExisting == false", blobPath); - } else { - blob.upload(stream, file.length()); - } + container.getBlockBlobReference(blobPath).upload(stream, file.length()); } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index 50092ceff62c..f4b6093ed760 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -59,7 +59,7 @@ public void pushTaskLog(final String taskid, final File logFile) throws IOExcept try { AzureUtils.retryAzureOperation( (Callable) () -> { - azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true); + azureStorage.uploadBlob(logFile, config.getContainer(), taskKey); return null; }, config.getMaxTries() diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java index 30c571a4f163..3201c4afeca3 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -82,6 +82,17 @@ public void before() @Test public void testPush() throws Exception + { + testPushInternal(false, "foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip"); + } + + @Test + public void testPushUseUniquePath() throws Exception + { + testPushInternal(true, "foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip"); + } + + private void testPushInternal(boolean useUniquePath, String matcher) throws Exception { AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); @@ -104,7 +115,12 @@ public void testPush() throws Exception size ); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath); + + Assert.assertTrue( + segment.getLoadSpec().get("blobPath").toString(), + segment.getLoadSpec().get("blobPath").toString().matches(matcher) + ); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); } @@ -114,10 +130,13 @@ public void getAzurePathsTest() { AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper); - final String storageDir = pusher.getStorageDir(dataSegment); - Map paths = pusher.getAzurePaths(dataSegment); + final String storageDir = pusher.getStorageDir(dataSegment, false); + Map paths = pusher.getAzurePaths(dataSegment, false); - assertEquals(StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index")); + assertEquals( + StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), + paths.get("index") + ); assertEquals( StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.DESCRIPTOR_FILE_NAME), paths.get("descriptor") @@ -131,11 +150,11 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn final int version = 9; final File compressedSegmentData = new File("index.zip"); final File descriptorFile = new File("descriptor.json"); - final Map azurePaths = pusher.getAzurePaths(dataSegment); + final Map azurePaths = pusher.getAzurePaths(dataSegment, false); - azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"), true); + azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index")); expectLastCall(); - azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"), true); + azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor")); expectLastCall(); replayAll(); @@ -146,8 +165,7 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn 0, // empty file compressedSegmentData, descriptorFile, - azurePaths, - true + azurePaths ); assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize()); diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java index 5f8394a735a4..26ff677af177 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java @@ -65,7 +65,7 @@ public void testPushTaskLog() throws Exception try { final File logFile = new File(tmpDir, "log"); - azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log", true); + azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log"); expectLastCall(); replayAll(); diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 3b3ad8e5539b..16ff585327db 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -75,13 +75,12 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean replaceExisting) - throws IOException + public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean useUniquePath) throws IOException { log.info("Writing [%s] to C*", indexFilesDir); String key = JOINER.join( config.getKeyspace().isEmpty() ? null : config.getKeyspace(), - this.getStorageDir(segment) + this.getStorageDir(segment, useUniquePath) ); // Create index @@ -92,20 +91,16 @@ public DataSegment push(final File indexFilesDir, DataSegment segment, final boo int version = SegmentUtils.getVersionFromDir(indexFilesDir); try { - if (!replaceExisting && doesObjectExist(indexStorage, key)) { - log.info("Skipping push because key [%s] exists && replaceExisting == false", key); - } else { - long start = System.currentTimeMillis(); - ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) - .withConcurrencyLevel(CONCURRENCY).call(); - byte[] json = jsonMapper.writeValueAsBytes(segment); - MutationBatch mutation = this.keyspace.prepareMutationBatch(); - mutation.withRow(descriptorStorage, key) - .putColumn("lastmodified", System.currentTimeMillis(), null) - .putColumn("descriptor", json, null); - mutation.execute(); - log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); - } + long start = System.currentTimeMillis(); + ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + .withConcurrencyLevel(CONCURRENCY).call(); + byte[] json = jsonMapper.writeValueAsBytes(segment); + MutationBatch mutation = this.keyspace.prepareMutationBatch(); + mutation.withRow(descriptorStorage, key) + .putColumn("lastmodified", System.currentTimeMillis(), null) + .putColumn("descriptor", json, null); + mutation.execute(); + log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); } catch (Exception e) { throw new IOException(e); diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index e224a5b83b78..be3e25472354 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -31,7 +31,6 @@ import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; import java.io.File; -import java.io.IOException; import java.net.URI; import java.nio.file.Files; import java.util.Map; @@ -74,10 +73,12 @@ public String getPathForHadoop(final String dataSource) } @Override - public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting) - throws IOException + public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath) { - final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment)); + final String segmentPath = CloudFilesUtils.buildCloudFilesPath( + this.config.getBasePath(), + getStorageDir(inSegment, useUniquePath) + ); File descriptorFile = null; File zipOutFile = null; @@ -100,22 +101,19 @@ public DataSegment call() throws Exception objectApi.getContainer() ); - if (!replaceExisting && objectApi.exists(segmentData.getPath())) { - log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath()); - } else { - log.info("Pushing %s.", segmentData.getPath()); - objectApi.put(segmentData); - - // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in - // runtime, and because Guava deletes methods over time, that causes incompatibilities. - Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment)); - CloudFilesObject descriptorData = new CloudFilesObject( - segmentPath, descFile, - objectApi.getRegion(), objectApi.getContainer() - ); - log.info("Pushing %s.", descriptorData.getPath()); - objectApi.put(descriptorData); - } + log.info("Pushing %s.", segmentData.getPath()); + objectApi.put(segmentData); + + // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in + // runtime, and because Guava deletes methods over time, that causes incompatibilities. + Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment)); + CloudFilesObject descriptorData = new CloudFilesObject( + segmentPath, descFile, + objectApi.getRegion(), objectApi.getContainer() + ); + log.info("Pushing %s.", descriptorData.getPath()); + objectApi.put(descriptorData); + final DataSegment outSegment = inSegment .withSize(indexSize) diff --git a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java index 73be6ce0e1c1..2be6f584811b 100644 --- a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java +++ b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java @@ -84,7 +84,7 @@ public void testPush() throws Exception size ); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, false); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index e8489ea8bd86..7a3581462d36 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -93,7 +93,7 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen return descriptorFile; } - public void insert(final File file, final String contentType, final String path, final boolean replaceExisting) + public void insert(final File file, final String contentType, final String path) throws IOException { LOG.info("Inserting [%s] to [%s]", file, path); @@ -103,15 +103,11 @@ public void insert(final File file, final String contentType, final String path, InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam); mediaContent.setLength(file.length()); - if (!replaceExisting && storage.exists(config.getBucket(), path)) { - LOG.info("Skipping push because path [%s] exists && replaceExisting == false", path); - } else { - storage.insert(config.getBucket(), path, mediaContent); - } + storage.insert(config.getBucket(), path, mediaContent); } @Override - public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting) + public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean useUniquePath) throws IOException { LOG.info("Uploading [%s] to Google.", indexFilesDir); @@ -123,7 +119,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin try { indexFile = File.createTempFile("index", ".zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile); - final String storageDir = this.getStorageDir(segment); + final String storageDir = this.getStorageDir(segment, useUniquePath); final String indexPath = buildPath(storageDir + "/" + "index.zip"); final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json"); @@ -134,8 +130,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin descriptorFile = createDescriptorFile(jsonMapper, outSegment); - insert(indexFile, "application/zip", indexPath, replaceExisting); - insert(descriptorFile, "application/json", descriptorPath, replaceExisting); + insert(indexFile, "application/zip", indexPath); + insert(descriptorFile, "application/json", descriptorPath); return outSegment; } diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java index 11372c243aa0..b90f0d5f560e 100644 --- a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java @@ -89,7 +89,7 @@ public void testPush() throws Exception "foo", Intervals.of("2015/2016"), "0", - Maps.newHashMap(), + Maps.newHashMap(), Lists.newArrayList(), Lists.newArrayList(), new NoneShardSpec(), @@ -103,30 +103,28 @@ public void testPush() throws Exception storage, googleAccountConfig, jsonMapper - ).addMockedMethod("insert", File.class, String.class, String.class, boolean.class).createMock(); + ).addMockedMethod("insert", File.class, String.class, String.class).createMock(); - final String storageDir = pusher.getStorageDir(segmentToPush); + final String storageDir = pusher.getStorageDir(segmentToPush, false); final String indexPath = prefix + "/" + storageDir + "/" + "index.zip"; final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json"; pusher.insert( EasyMock.anyObject(File.class), EasyMock.eq("application/zip"), - EasyMock.eq(indexPath), - EasyMock.eq(true) + EasyMock.eq(indexPath) ); expectLastCall(); pusher.insert( EasyMock.anyObject(File.class), EasyMock.eq("application/json"), - EasyMock.eq(descriptorPath), - EasyMock.eq(true) + EasyMock.eq(descriptorPath) ); expectLastCall(); replayAll(); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, false); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); Assert.assertEquals(segmentToPush, segment); diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java index 6fba009cf865..8e75b36057c2 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java @@ -20,13 +20,14 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; +import com.google.common.base.Preconditions; import com.google.inject.Inject; +import io.druid.java.util.common.Pair; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentFinder; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import io.druid.java.util.common.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -34,14 +35,15 @@ import org.apache.hadoop.fs.RemoteIterator; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** */ public class HdfsDataSegmentFinder implements DataSegmentFinder { - private static final Logger log = new Logger(HdfsDataSegmentFinder.class); private final Configuration config; @@ -58,7 +60,7 @@ public HdfsDataSegmentFinder(Configuration config, ObjectMapper mapper) public Set findSegments(String workingDirPathStr, boolean updateDescriptor) throws SegmentLoadingException { - final Set segments = Sets.newHashSet(); + final Map> timestampedSegments = new HashMap<>(); final Path workingDirPath = new Path(workingDirPathStr); FileSystem fs; try { @@ -80,15 +82,31 @@ public Set findSegments(String workingDirPathStr, boolean updateDes final LocatedFileStatus locatedFileStatus = it.next(); final Path path = locatedFileStatus.getPath(); if (path.getName().endsWith("descriptor.json")) { - final Path indexZip; + + // There are 3 supported path formats: + // - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/descriptor.json + // - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_descriptor.json + // - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_UUID_descriptor.json final String descriptorParts[] = path.getName().split("_"); - if (descriptorParts.length == 2 - && descriptorParts[1].equals("descriptor.json") - && org.apache.commons.lang.StringUtils.isNumeric(descriptorParts[0])) { - indexZip = new Path(path.getParent(), StringUtils.format("%s_index.zip", descriptorParts[0])); - } else { - indexZip = new Path(path.getParent(), "index.zip"); + + Path indexZip = new Path(path.getParent(), "index.zip"); + if (descriptorParts.length > 1) { + Preconditions.checkState(descriptorParts.length <= 3 && + org.apache.commons.lang.StringUtils.isNumeric(descriptorParts[0]) && + "descriptor.json".equals(descriptorParts[descriptorParts.length - 1]), + "Unexpected descriptor filename format [%s]", path + ); + + indexZip = new Path( + path.getParent(), + StringUtils.format( + "%s_%sindex.zip", + descriptorParts[0], + descriptorParts.length == 2 ? "" : descriptorParts[1] + "_" + ) + ); } + if (fs.exists(indexZip)) { final DataSegment dataSegment = mapper.readValue(fs.open(path), DataSegment.class); log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip); @@ -105,7 +123,12 @@ public Set findSegments(String workingDirPathStr, boolean updateDes mapper.writeValue(fs.create(path, true), dataSegment); } } - segments.add(dataSegment); + + DataSegmentFinder.putInMapRetainingNewest( + timestampedSegments, + dataSegment, + locatedFileStatus.getModificationTime() + ); } else { throw new SegmentLoadingException( "index.zip didn't exist at [%s] while descripter.json exists!?", @@ -119,7 +142,6 @@ public Set findSegments(String workingDirPathStr, boolean updateDes throw new SegmentLoadingException(e, "Problems interacting with filesystem[%s].", workingDirPath); } - return segments; + return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet()); } - } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java index 9ce604e7d203..6cbd1eccd7c3 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentKiller.java @@ -19,6 +19,7 @@ package io.druid.storage.hdfs; +import com.google.common.base.Preconditions; import com.google.inject.Inject; import io.druid.java.util.emitter.EmittingLogger; import io.druid.segment.loading.DataSegmentKiller; @@ -57,65 +58,53 @@ private static Path getPath(DataSegment segment) public void kill(DataSegment segment) throws SegmentLoadingException { final Path segmentPath = getPath(segment); - log.info("killing segment[%s] mapped to path[%s]", segment.getIdentifier(), segmentPath); + log.info("Killing segment[%s] mapped to path[%s]", segment.getIdentifier(), segmentPath); try { - String segmentLocation = segmentPath.getName(); + String filename = segmentPath.getName(); final FileSystem fs = segmentPath.getFileSystem(config); - if (!segmentLocation.endsWith(".zip")) { + if (!filename.endsWith(".zip")) { throw new SegmentLoadingException("Unknown file type[%s]", segmentPath); } else { if (!fs.exists(segmentPath)) { - log.warn("Segment Path [%s] does not exist. It appears to have been deleted already.", segmentPath); + log.warn("Segment path [%s] does not exist", segmentPath); return; } - String[] zipParts = segmentLocation.split("_"); - // for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip - if (zipParts.length == 2 - && zipParts[1].equals("index.zip") - && StringUtils.isNumeric(zipParts[0])) { - if (!fs.delete(segmentPath, false)) { - throw new SegmentLoadingException( - "Unable to kill segment, failed to delete [%s]", - segmentPath.toString() - ); - } - Path descriptorPath = new Path( + // There are 3 supported path formats: + // - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/index.zip + // - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip + // - hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_UUID_index.zip + String[] zipParts = filename.split("_"); + + Path descriptorPath = new Path(segmentPath.getParent(), "descriptor.json"); + if (zipParts.length > 1) { + Preconditions.checkState(zipParts.length <= 3 && + StringUtils.isNumeric(zipParts[0]) && + "index.zip".equals(zipParts[zipParts.length - 1]), + "Unexpected segmentPath format [%s]", segmentPath + ); + + descriptorPath = new Path( segmentPath.getParent(), - io.druid.java.util.common.StringUtils.format("%s_descriptor.json", zipParts[0]) + io.druid.java.util.common.StringUtils.format( + "%s_%sdescriptor.json", + zipParts[0], + zipParts.length == 2 ? "" : zipParts[1] + "_" + ) ); - //delete partitionNumber_descriptor.json - if (!fs.delete(descriptorPath, false)) { - throw new SegmentLoadingException( - "Unable to kill segment, failed to delete [%s]", - descriptorPath.toString() - ); - } - //for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum_index.zip - // max depth to look is 2, i.e version directory and interval. - mayBeDeleteParentsUpto(fs, segmentPath, 2); - - } else { //for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/ - // index.zip - if (!fs.delete(segmentPath, false)) { - throw new SegmentLoadingException( - "Unable to kill segment, failed to delete [%s]", - segmentPath.toString() - ); - } - Path descriptorPath = new Path(segmentPath.getParent(), "descriptor.json"); - if (!fs.delete(descriptorPath, false)) { - throw new SegmentLoadingException( - "Unable to kill segment, failed to delete [%s]", - descriptorPath.toString() - ); - } - //for segments stored as hdfs://nn1/hdfs_base_directory/data_source_name/interval/version/shardNum/index.zip - //max depth to look is 3, i.e partition number directory,version directory and interval. - mayBeDeleteParentsUpto(fs, segmentPath, 3); } + + if (!fs.delete(segmentPath, false)) { + throw new SegmentLoadingException("Unable to kill segment, failed to delete [%s]", segmentPath.toString()); + } + + if (!fs.delete(descriptorPath, false)) { + throw new SegmentLoadingException("Unable to kill segment, failed to delete [%s]", descriptorPath.toString()); + } + + removeEmptyParentDirectories(fs, segmentPath, zipParts.length > 1 ? 2 : 3); } } catch (IOException e) { @@ -131,11 +120,11 @@ public void killAll() throws IOException fs.delete(storageDirectory, true); } - private void mayBeDeleteParentsUpto(final FileSystem fs, final Path segmentPath, final int maxDepthTobeDeleted) + private void removeEmptyParentDirectories(final FileSystem fs, final Path segmentPath, final int depth) { Path path = segmentPath; try { - for (int i = 1; i <= maxDepthTobeDeleted; i++) { + for (int i = 1; i <= depth; i++) { path = path.getParent(); if (fs.listStatus(path).length != 0 || !fs.delete(path, false)) { break; diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 3374d4333329..94b41e70e199 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -20,6 +20,7 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; @@ -57,11 +58,8 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final String fullyQualifiedStorageDirectory; @Inject - public HdfsDataSegmentPusher( - HdfsDataSegmentPusherConfig config, - Configuration hadoopConfig, - ObjectMapper jsonMapper - ) throws IOException + public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper) + throws IOException { this.config = config; this.hadoopConfig = hadoopConfig; @@ -89,9 +87,11 @@ public String getPathForHadoop() } @Override - public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(final File inDir, final DataSegment segment, final boolean useUniquePath) throws IOException { - final String storageDir = this.getStorageDir(segment); + // For HDFS, useUniquePath does not affect the directory tree but instead affects the filename, which is of the form + // '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths. + final String storageDir = this.getStorageDir(segment, false); log.info( "Copying segment[%s] to HDFS at location[%s/%s]", @@ -116,17 +116,20 @@ public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting final DataSegment dataSegment; try (FSDataOutputStream out = fs.create(tmpIndexFile)) { size = CompressionUtils.zip(inDir, out); + final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : ""; final Path outIndexFile = new Path(StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_%sindex.zip", fullyQualifiedStorageDirectory, storageDir, - segment.getShardSpec().getPartitionNum() + segment.getShardSpec().getPartitionNum(), + uniquePrefix )); final Path outDescriptorFile = new Path(StringUtils.format( - "%s/%s/%d_descriptor.json", + "%s/%s/%d_%sdescriptor.json", fullyQualifiedStorageDirectory, storageDir, - segment.getShardSpec().getPartitionNum() + segment.getShardSpec().getPartitionNum(), + uniquePrefix )); dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) @@ -145,8 +148,8 @@ public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting // Create parent if it does not exist, recreation is not an error fs.mkdirs(outIndexFile.getParent()); - copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile, replaceExisting); - copyFilesWithChecks(fs, tmpIndexFile, outIndexFile, replaceExisting); + copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile); + copyFilesWithChecks(fs, tmpIndexFile, outIndexFile); } finally { try { @@ -162,19 +165,17 @@ public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting return dataSegment; } - private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to, final boolean replaceExisting) - throws IOException + private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException { - if (!HadoopFsWrapper.rename(fs, from, to, replaceExisting)) { + if (!HadoopFsWrapper.rename(fs, from, to)) { if (fs.exists(to)) { log.info( - "Unable to rename temp Index file[%s] to final segment path [%s]. " - + "It is already pushed by a replica task.", + "Unable to rename temp file [%s] to segment path [%s], it may have already been pushed by a replica task.", from, to ); } else { - throw new IOE("Failed to rename temp Index file[%s] and final segment path[%s] is not present.", from, to); + throw new IOE("Failed to rename temp file [%s] and final segment path [%s] is not present.", from, to); } } } @@ -209,8 +210,17 @@ public Map makeLoadSpec(URI finalIndexZipFilePath) */ @Override - public String getStorageDir(DataSegment segment) + public String getStorageDir(DataSegment segment, boolean useUniquePath) { + // This is only called by HdfsDataSegmentPusher.push(), which will always set useUniquePath to false since any + // 'uniqueness' will be applied not to the directory but to the filename along with the shard number. This is done + // to avoid performance issues due to excessive HDFS directories. Hence useUniquePath is ignored here and we + // expect it to be false. + Preconditions.checkArgument( + !useUniquePath, + "useUniquePath must be false for HdfsDataSegmentPusher.getStorageDir()" + ); + return JOINER.join( segment.getDataSource(), StringUtils.format( @@ -225,9 +235,10 @@ public String getStorageDir(DataSegment segment) @Override public String makeIndexPathName(DataSegment dataSegment, String indexName) { + // This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false return StringUtils.format( "./%s/%d_%s", - this.getStorageDir(dataSegment), + this.getStorageDir(dataSegment, false), dataSegment.getShardSpec().getPartitionNum(), indexName ); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java index 4604241dd76a..913adb0977bc 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java @@ -41,16 +41,15 @@ private HadoopFsWrapper() {} * * @param from * @param to - * @param replaceExisting if existing files should be overwritten * - * @return true if operation succeeded, false if replaceExisting == false and destination already exists + * @return true if operation succeeded, false if destination already exists * * @throws IOException if trying to overwrite a non-empty directory */ - public static boolean rename(FileSystem fs, Path from, Path to, boolean replaceExisting) throws IOException + public static boolean rename(FileSystem fs, Path from, Path to) throws IOException { try { - fs.rename(from, to, replaceExisting ? Options.Rename.OVERWRITE : Options.Rename.NONE); + fs.rename(from, to, Options.Rename.NONE); return true; } catch (FileAlreadyExistsException ex) { diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java index 626124622527..5e052df1e6d1 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/segment/loading/HdfsDataSegmentFinderTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.java.util.common.IOE; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; import io.druid.segment.TestHelper; import io.druid.storage.hdfs.HdfsDataSegmentFinder; import io.druid.timeline.DataSegment; @@ -278,6 +279,30 @@ public void testFindSegmentsFail() throws Exception hdfsDataSegmentFinder.findSegments(dataSourceDir.toString(), false); } + @Test + public void testPreferNewestSegment() throws Exception + { + dataSourceDir = new Path(new Path(uriBase), "/usr/replicaDataSource"); + descriptor1 = new Path(dataSourceDir, StringUtils.format("interval1/v1/%d_%s_%s", 0, "older", DESCRIPTOR_JSON)); + descriptor2 = new Path(dataSourceDir, StringUtils.format("interval1/v1/%d_%s_%s", 0, "newer", DESCRIPTOR_JSON)); + indexZip1 = new Path(descriptor1.getParent(), StringUtils.format("%d_%s_%s", 0, "older", INDEX_ZIP)); + indexZip2 = new Path(descriptor2.getParent(), StringUtils.format("%d_%s_%s", 0, "newer", INDEX_ZIP)); + + mapper.writeValue(fs.create(descriptor1), SEGMENT_1); + mapper.writeValue(fs.create(descriptor2), SEGMENT_1); + + create(indexZip1); + Thread.sleep(1000); + create(indexZip2); + + final Set segments = new HdfsDataSegmentFinder(conf, mapper).findSegments( + dataSourceDir.toString(), false + ); + + Assert.assertEquals(1, segments.size()); + Assert.assertEquals(indexZip2.toUri().getPath(), segments.iterator().next().getLoadSpec().get("path")); + } + private String getDescriptorPath(DataSegment segment) { final Path indexzip = new Path(String.valueOf(segment.getLoadSpec().get("path"))); diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java index bdf499dcd904..dba43507a699 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentKillerTest.java @@ -26,13 +26,13 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.util.UUID; /** */ @@ -129,7 +129,10 @@ public String getStorageDirectory() Path interval1Dir = new Path(dataSourceDir, "intervalNew"); Path version11Dir = new Path(interval1Dir, "v1"); - makePartitionDirWithIndexWitNewFormat(fs, version11Dir, 3); + Assert.assertTrue(fs.mkdirs(version11Dir)); + fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_index.zip", 3))); + fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_descriptor.json", 3))); + killer.kill(getSegmentWithPath(new Path(version11Dir, "3_index.zip").toString())); Assert.assertFalse(fs.exists(version11Dir)); @@ -141,7 +144,7 @@ public String getStorageDirectory() } @Test - public void testKillNonExistingSegment() throws Exception + public void testKillForSegmentWithUniquePath() throws Exception { Configuration config = new Configuration(); HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller( @@ -155,28 +158,51 @@ public String getStorageDirectory() } } ); - killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString())); + + FileSystem fs = FileSystem.get(config); + Path dataSourceDir = new Path("/tmp/dataSourceNew"); + + Path interval1Dir = new Path(dataSourceDir, "intervalNew"); + Path version11Dir = new Path(interval1Dir, "v1"); + String uuid = UUID.randomUUID().toString().substring(0, 5); + + Assert.assertTrue(fs.mkdirs(version11Dir)); + fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_%s_index.zip", 3, uuid))); + fs.createNewFile(new Path(version11Dir, StringUtils.format("%s_%s_descriptor.json", 3, uuid))); + + killer.kill(getSegmentWithPath(new Path(version11Dir, StringUtils.format("%s_%s_index.zip", 3, uuid)).toString())); + + Assert.assertFalse(fs.exists(version11Dir)); + Assert.assertFalse(fs.exists(interval1Dir)); + Assert.assertTrue(fs.exists(dataSourceDir)); + Assert.assertTrue(fs.exists(new Path("/tmp"))); + Assert.assertTrue(fs.exists(dataSourceDir)); + Assert.assertTrue(fs.delete(dataSourceDir, false)); } - private void makePartitionDirWithIndex(FileSystem fs, Path path) throws IOException + @Test + public void testKillNonExistingSegment() throws Exception { - Assert.assertTrue(fs.mkdirs(path)); - try (FSDataOutputStream os = fs.create(new Path(path, "index.zip")); FSDataOutputStream oos = fs.create(new Path( - path, - "descriptor.json" - ))) { - } + Configuration config = new Configuration(); + HdfsDataSegmentKiller killer = new HdfsDataSegmentKiller( + config, + new HdfsDataSegmentPusherConfig() + { + @Override + public String getStorageDirectory() + { + return "/tmp"; + } + } + ); + killer.kill(getSegmentWithPath(new Path("/xxx/", "index.zip").toString())); } - private void makePartitionDirWithIndexWitNewFormat(FileSystem fs, Path path, Integer partitionNumber) - throws IOException + private void makePartitionDirWithIndex(FileSystem fs, Path path) throws IOException { Assert.assertTrue(fs.mkdirs(path)); - try (FSDataOutputStream os = fs.create(new Path( - path, - StringUtils.format("%s_index.zip", partitionNumber) - )); FSDataOutputStream oos = fs.create(new Path(path, StringUtils.format("%s_descriptor.json", partitionNumber)))) { - } + fs.createNewFile(new Path(path, "index.zip")); + fs.createNewFile(new Path(path, "descriptor.json")); } private DataSegment getSegmentWithPath(String path) diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index bfc119f6498a..9bb25f90707f 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -127,7 +127,8 @@ public void testPushWithMultipleSegments() throws Exception testUsingSchemeForMultipleSegments("file", 3); } - private void testUsingScheme(final String scheme) throws Exception + @Test + public void testUsingUniqueFilePath() throws Exception { Configuration conf = new Configuration(true); @@ -142,11 +143,7 @@ private void testUsingScheme(final String scheme) throws Exception HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); final File storageDirectory = tempFolder.newFolder(); - config.setStorageDirectory( - scheme != null - ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) - : storageDirectory.getAbsolutePath() - ); + config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath())); HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); DataSegment segmentToPush = new DataSegment( @@ -163,49 +160,11 @@ private void testUsingScheme(final String scheme) throws Exception DataSegment segment = pusher.push(segmentDir, segmentToPush, true); - - String indexUri = StringUtils.format( - "%s/%s/%d_index.zip", - FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), - pusher.getStorageDir(segmentToPush), - segmentToPush.getShardSpec().getPartitionNum() + String matcher = ".*/foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0_[A-Za-z0-9-]{36}_index\\.zip"; + Assert.assertTrue( + segment.getLoadSpec().get("path").toString(), + segment.getLoadSpec().get("path").toString().matches(matcher) ); - - Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); - Assert.assertEquals(segmentToPush, segment); - Assert.assertEquals(ImmutableMap.of( - "type", - "hdfs", - "path", - indexUri - ), segment.getLoadSpec()); - // rename directory after push - final String segmentPath = pusher.getStorageDir(segment); - - File indexFile = new File(StringUtils.format( - "%s/%s/%d_index.zip", - storageDirectory, - segmentPath, - segment.getShardSpec().getPartitionNum() - )); - Assert.assertTrue(indexFile.exists()); - File descriptorFile = new File(StringUtils.format( - "%s/%s/%d_descriptor.json", - storageDirectory, - segmentPath, - segment.getShardSpec().getPartitionNum() - )); - Assert.assertTrue(descriptorFile.exists()); - - // push twice will fail and temp dir cleaned - File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath)); - outDir.setReadOnly(); - try { - pusher.push(segmentDir, segmentToPush, true); - } - catch (IOException e) { - Assert.fail("should not throw exception"); - } } private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception @@ -246,12 +205,12 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n } for (int i = 0; i < numberOfSegments; i++) { - final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], true); + final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], false); String indexUri = StringUtils.format( "%s/%s/%d_index.zip", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), - pusher.getStorageDir(segments[i]), + pusher.getStorageDir(segments[i], false), segments[i].getShardSpec().getPartitionNum() ); @@ -264,7 +223,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n indexUri ), pushedSegment.getLoadSpec()); // rename directory after push - String segmentPath = pusher.getStorageDir(pushedSegment); + String segmentPath = pusher.getStorageDir(pushedSegment, false); File indexFile = new File(StringUtils.format( "%s/%s/%d_index.zip", @@ -293,7 +252,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n indexUri ), fromDescriptorFileDataSegment.getLoadSpec()); // rename directory after push - segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment); + segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment, false); indexFile = new File(StringUtils.format( "%s/%s/%d_index.zip", @@ -308,7 +267,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath)); outDir.setReadOnly(); try { - pusher.push(segmentDir, segments[i], true); + pusher.push(segmentDir, segments[i], false); } catch (IOException e) { Assert.fail("should not throw exception"); @@ -316,6 +275,87 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n } } + private void testUsingScheme(final String scheme) throws Exception + { + Configuration conf = new Configuration(true); + + // Create a mock segment on disk + File segmentDir = tempFolder.newFolder(); + File tmp = new File(segmentDir, "version.bin"); + + final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1}; + Files.write(data, tmp); + final long size = data.length; + + HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); + final File storageDirectory = tempFolder.newFolder(); + + config.setStorageDirectory( + scheme != null + ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) + : storageDirectory.getAbsolutePath() + ); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + + DataSegment segmentToPush = new DataSegment( + "foo", + Intervals.of("2015/2016"), + "0", + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + NoneShardSpec.instance(), + 0, + size + ); + + DataSegment segment = pusher.push(segmentDir, segmentToPush, false); + + + String indexUri = StringUtils.format( + "%s/%s/%d_index.zip", + FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), + pusher.getStorageDir(segmentToPush, false), + segmentToPush.getShardSpec().getPartitionNum() + ); + + Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); + Assert.assertEquals(segmentToPush, segment); + Assert.assertEquals(ImmutableMap.of( + "type", + "hdfs", + "path", + indexUri + ), segment.getLoadSpec()); + // rename directory after push + final String segmentPath = pusher.getStorageDir(segment, false); + + File indexFile = new File(StringUtils.format( + "%s/%s/%d_index.zip", + storageDirectory, + segmentPath, + segment.getShardSpec().getPartitionNum() + )); + Assert.assertTrue(indexFile.exists()); + File descriptorFile = new File(StringUtils.format( + "%s/%s/%d_descriptor.json", + storageDirectory, + segmentPath, + segment.getShardSpec().getPartitionNum() + )); + Assert.assertTrue(descriptorFile.exists()); + + // push twice will fail and temp dir cleaned + File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath)); + outDir.setReadOnly(); + try { + pusher.push(segmentDir, segmentToPush, false); + } + catch (IOException e) { + Assert.fail("should not throw exception"); + } + } + public static class TestObjectMapper extends ObjectMapper { public TestObjectMapper() @@ -371,7 +411,7 @@ public void shouldNotHaveColonsInHdfsStorageDir() throws Exception 1 ); - String storageDir = hdfsDataSegmentPusher.getStorageDir(segment); + String storageDir = hdfsDataSegmentPusher.getStorageDir(segment, false); Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 17d742a7589c..d7bffedea17b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -1811,6 +1811,7 @@ private StreamAppenderatorDriver newDriver( new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema), toolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), + toolbox.getDataSegmentKiller(), toolbox.getObjectMapper(), metrics ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index ad6459f526f6..45f3003638f0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2001,9 +2001,9 @@ private File getSegmentDirectory() private List readSegmentColumn(final String column, final SegmentDescriptor descriptor) throws IOException { - File indexZip = new File( + File indexBasePath = new File( StringUtils.format( - "%s/%s/%s_%s/%s/%d/index.zip", + "%s/%s/%s_%s/%s/%d", getSegmentDirectory(), DATA_SCHEMA.getDataSource(), descriptor.getInterval().getStart(), @@ -2012,6 +2012,7 @@ private List readSegmentColumn(final String column, final SegmentDescrip descriptor.getPartitionNumber() ) ); + File outputLocation = new File( directory, StringUtils.format( @@ -2024,7 +2025,7 @@ private List readSegmentColumn(final String column, final SegmentDescrip ); outputLocation.mkdir(); CompressionUtils.unzip( - Files.asByteSource(indexZip), + Files.asByteSource(new File(indexBasePath.listFiles()[0], "index.zip")), outputLocation, Predicates.alwaysFalse(), false diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java index d6d773640e8e..3f4529ec99a0 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java @@ -21,9 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; -import com.google.common.collect.Sets; import com.google.inject.Inject; - +import io.druid.java.util.common.Pair; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentFinder; import io.druid.segment.loading.SegmentLoadingException; @@ -35,9 +34,11 @@ import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public class S3DataSegmentFinder implements DataSegmentFinder { @@ -62,7 +63,7 @@ public S3DataSegmentFinder( @Override public Set findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException { - final Set segments = Sets.newHashSet(); + final Map> timestampedSegments = new HashMap<>(); try { Iterator objectsIterator = S3Utils.storageObjectsIterator( @@ -103,7 +104,12 @@ public Set findSegments(String workingDirPath, boolean updateDescri s3Client.putObject(config.getBucket(), newDescJsonObject); } } - segments.add(dataSegment); + + DataSegmentFinder.putInMapRetainingNewest( + timestampedSegments, + dataSegment, + indexObject.getLastModifiedDate() == null ? 0 : indexObject.getLastModifiedDate().getTime() + ); } } else { throw new SegmentLoadingException( @@ -124,6 +130,6 @@ public Set findSegments(String workingDirPath, boolean updateDescri Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class); Throwables.propagate(e); } - return segments; + return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet()); } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 983837865e8b..643b30407d9f 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -72,8 +72,11 @@ public DataSegment move(DataSegment segment, Map targetLoadSpec) final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); - final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, DataSegmentPusher.getDefaultStorageDir(segment)); - String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); + final String targetS3Path = S3Utils.constructSegmentPath( + targetS3BaseKey, + DataSegmentPusher.getDefaultStorageDir(segment, false) + ); + final String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path); if (targetS3Bucket.isEmpty()) { throw new SegmentLoadingException("Target S3 bucket is not specified"); diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 3473cffe4a80..eb5c4a9c10dd 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; @@ -88,10 +88,10 @@ public List getAllowedPropertyPrefixesForHadoop() } @Override - public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting) + public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath) throws IOException { - final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment)); + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment, useUniquePath)); log.info("Copying segment[%s] to S3 at location[%s]", inSegment.getIdentifier(), s3Path); @@ -106,7 +106,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment inSegment, f public DataSegment call() throws Exception { S3Object toPush = new S3Object(zipOutFile); - putObject(config.getBucket(), s3Path, toPush, replaceExisting); + putObject(config.getBucket(), s3Path, toPush); final DataSegment outSegment = inSegment.withSize(indexSize) .withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey())) @@ -121,8 +121,7 @@ public DataSegment call() throws Exception putObject( config.getBucket(), S3Utils.descriptorPathForSegmentPath(s3Path), - descriptorObject, - replaceExisting + descriptorObject ); log.info("Deleting zipped index File[%s]", zipOutFile); @@ -169,8 +168,7 @@ private Map makeLoadSpec(String bucket, String key) ); } - private void putObject(String bucketName, String path, S3Object object, boolean replaceExisting) - throws ServiceException + private void putObject(String bucketName, String path, S3Object object) throws ServiceException { object.setBucketName(bucketName); object.setKey(path); @@ -180,10 +178,6 @@ private void putObject(String bucketName, String path, S3Object object, boolean log.info("Pushing %s.", object); - if (!replaceExisting && S3Utils.isObjectInBucket(s3Client, bucketName, object.getKey())) { - log.info("Skipping push because key [%s] exists && replaceExisting == false", object.getKey()); - } else { - s3Client.putObject(bucketName, object); - } + s3Client.putObject(bucketName, object); } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java index f0df427ccb29..55ad817003d2 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java @@ -124,7 +124,6 @@ public class S3DataSegmentFinderTest private String indexZip4_1; - @BeforeClass public static void setUpStatic() { @@ -210,31 +209,51 @@ public void testFindSegments() throws Exception final String serializedSegment4_0 = mapper.writeValueAsString(updatedSegment4_0); final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1); - Assert.assertNotEquals(serializedSegment1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream())); - Assert.assertNotEquals(serializedSegment2, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream())); - Assert.assertNotEquals(serializedSegment3, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream())); - Assert.assertNotEquals(serializedSegment4_0, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream())); - Assert.assertNotEquals(serializedSegment4_1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream())); + Assert.assertNotEquals( + serializedSegment1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream()) + ); + Assert.assertNotEquals( + serializedSegment2, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream()) + ); + Assert.assertNotEquals( + serializedSegment3, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream()) + ); + Assert.assertNotEquals( + serializedSegment4_0, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream()) + ); + Assert.assertNotEquals( + serializedSegment4_1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream()) + ); final Set segments2 = s3DataSegmentFinder.findSegments("", true); Assert.assertEquals(segments, segments2); - Assert.assertEquals(serializedSegment1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream())); - Assert.assertEquals(serializedSegment2, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream())); - Assert.assertEquals(serializedSegment3, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream())); - Assert.assertEquals(serializedSegment4_0, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream())); - Assert.assertEquals(serializedSegment4_1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream())); + Assert.assertEquals( + serializedSegment1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream()) + ); + Assert.assertEquals( + serializedSegment2, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream()) + ); + Assert.assertEquals( + serializedSegment3, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream()) + ); + Assert.assertEquals( + serializedSegment4_0, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream()) + ); + Assert.assertEquals( + serializedSegment4_1, + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream()) + ); } @Test(expected = SegmentLoadingException.class) @@ -268,9 +287,7 @@ public void testFindSegmentsWithworkingDirPath() throws SegmentLoadingException public void testFindSegmentsUpdateLoadSpec() throws Exception { config.setBucket("amazing"); - final DataSegment segmentMissingLoadSpec = DataSegment.builder(SEGMENT_1) - .loadSpec(ImmutableMap.of()) - .build(); + final DataSegment segmentMissingLoadSpec = DataSegment.builder(SEGMENT_1).loadSpec(ImmutableMap.of()).build(); final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper); final String segmentPath = baseKey + "/interval_missing_load_spec/v1/1/"; final String descriptorPath = S3Utils.descriptorPathForSegmentPath(segmentPath); @@ -304,6 +321,32 @@ public void testFindSegmentsUpdateLoadSpec() throws Exception Assert.assertEquals(indexPath, testLoadSpec.get("key")); } + @Test + public void testPreferNewestSegment() throws Exception + { + baseKey = "replicaDataSource"; + + config = new S3DataSegmentPusherConfig(); + config.setBucket(bucket); + config.setBaseKey(baseKey); + + descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval10/v1/0/older/"); + descriptor2 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval10/v1/0/newer/"); + + indexZip1 = S3Utils.indexZipForSegmentPath(descriptor1); + indexZip2 = S3Utils.indexZipForSegmentPath(descriptor2); + + mockS3Client.putObject(bucket, new S3Object(descriptor1, mapper.writeValueAsString(SEGMENT_1))); + mockS3Client.putObject(bucket, new S3Object(indexZip1, "dummy")); + mockS3Client.putObject(bucket, new S3Object(descriptor2, mapper.writeValueAsString(SEGMENT_1))); + mockS3Client.putObject(bucket, new S3Object(indexZip2, "dummy")); + + final S3DataSegmentFinder s3DataSegmentFinder = new S3DataSegmentFinder(mockS3Client, config, mapper); + final Set segments = s3DataSegmentFinder.findSegments("", false); + + Assert.assertEquals(1, segments.size()); + } + private String getDescriptorPath(DataSegment segment) { return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key"))); diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java index f26bd1610398..841f10e1c663 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java @@ -64,6 +64,20 @@ public void setValue(T value) @Test public void testPush() throws Exception + { + testPushInternal(false, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip"); + } + + @Test + public void testPushUseUniquePath() throws Exception + { + testPushInternal( + true, + "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip" + ); + } + + private void testPushInternal(boolean useUniquePath, String matcher) throws Exception { RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); @@ -113,14 +127,15 @@ public S3Object answer() throws Throwable size ); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, useUniquePath); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); Assert.assertEquals(1, (int) segment.getBinaryVersion()); Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket")); - Assert.assertEquals( - "key/foo/2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z/0/0/index.zip", - segment.getLoadSpec().get("key")); + Assert.assertTrue( + segment.getLoadSpec().get("key").toString(), + segment.getLoadSpec().get("key").toString().matches(matcher) + ); Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type")); // Verify that the pushed S3Object contains the correct data diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 7ce678045c7f..8975c01d86a0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -91,6 +91,7 @@ public static Path distributedClassPath(Path base) { return new Path(base, "classpath"); } + public static final String INDEX_ZIP = "index.zip"; public static final String DESCRIPTOR_JSON = "descriptor.json"; @@ -568,8 +569,10 @@ public static Path makeFileNamePath( DataSegmentPusher dataSegmentPusher ) { - return new Path(prependFSIfNullScheme(fs, basePath), - dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName)); + return new Path( + prependFSIfNullScheme(fs, basePath), + dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName) + ); } public static Path makeTmpPath( @@ -582,9 +585,10 @@ public static Path makeTmpPath( { return new Path( prependFSIfNullScheme(fs, basePath), - StringUtils.format("./%s.%d", - dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), - taskAttemptID.getId() + StringUtils.format( + "./%s.%d", + dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), + taskAttemptID.getId() ) ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 0bba29fbe881..4364f50f40a8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -199,10 +199,7 @@ public void finishJob() .withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions())) .withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload)); - // This plumber is only used in batch ingestion situations where you do not have replica tasks pushing - // segments with the same identifier but potentially different contents. In case of conflict, favor the most - // recently pushed segment (replaceExisting == true). - dataSegmentPusher.push(fileToUpload, segmentToUpload, true); + dataSegmentPusher.push(fileToUpload, segmentToUpload, false); log.info( "Uploaded segment[%s]", diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index 0e752cb988ce..460225af1fe0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -441,10 +441,7 @@ private void convertSegment(TaskToolbox toolbox) throws SegmentLoadingException, // version, but is "newer" than said original version. DataSegment updatedSegment = segment.withVersion(StringUtils.format("%s_v%s", segment.getVersion(), outVersion)); - // The convert segment task does not support replicas where different tasks could generate segments with the - // same identifier but potentially different contents. In case of conflict, favor the most recently pushed - // segment (replaceExisting == true). - updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, true); + updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, false); actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); } else { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index f0987b21ea8c..1d88733f9522 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -792,7 +792,8 @@ private static BatchAppenderatorDriver newDriver( return new BatchAppenderatorDriver( appenderator, segmentAllocator, - new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()) + new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), + toolbox.getDataSegmentKiller() ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 747cf58d4162..454899f5c774 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -186,10 +186,7 @@ public String apply(DataSegment input) // Upload file - // The merge task does not support replicas where different tasks could generate segments with the - // same identifier but potentially different contents. In case of conflict, favor the most recently pushed - // segment (replaceExisting == true). - final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, true); + final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, false); emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index f813220e25d2..7113efa6aa50 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -60,6 +60,7 @@ import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -1006,7 +1007,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) { segments.add(segment); return segment; @@ -1019,12 +1020,27 @@ public Map makeLoadSpec(URI uri) } }; + final DataSegmentKiller killer = new DataSegmentKiller() + { + @Override + public void kill(DataSegment segment) + { + + } + + @Override + public void killAll() + { + + } + }; + final TaskToolbox box = new TaskToolbox( null, actionClient, null, pusher, - null, + killer, null, null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index e42f55d9f605..50932dbb2311 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -185,7 +185,8 @@ public RetType submit(TaskAction taskAction) throws IOExcepti return null; } }, - new NoopServiceEmitter(), new DataSegmentPusher() + new NoopServiceEmitter(), + new DataSegmentPusher() { @Deprecated @Override @@ -201,12 +202,13 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) { // the merged segment is pushed to storage segments.add(segment); return segment; } + @Override public Map makeLoadSpec(URI finalIndexZipFilePath) { diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 13f2d4f1e88a..9767ebf513ac 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -31,7 +31,6 @@ import com.google.common.io.Files; import com.google.inject.Binder; import com.google.inject.Module; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.data.input.InputRow; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -61,6 +60,7 @@ import io.druid.java.util.common.JodaUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -184,7 +184,8 @@ public List getUsedSegmentsForInterval(String dataSource, Interval } @Override - public List getUsedSegmentsForIntervals(String dataSource, List interval) throws IOException + public List getUsedSegmentsForIntervals(String dataSource, List interval) + throws IOException { return ImmutableList.copyOf(segmentSet); } @@ -249,7 +250,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) { return segment; } @@ -537,7 +538,11 @@ public void simpleFirehoseReadingTest() throws IOException Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray()); Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray()); Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME)); - Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001); + Assert.assertEquals( + METRIC_FLOAT_VALUE, + row.getMetric(METRIC_FLOAT_NAME).floatValue(), + METRIC_FLOAT_VALUE * 0.0001 + ); ++rowcount; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 6ce005867133..e583a85b2671 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -34,10 +34,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; -import io.druid.java.util.metrics.Monitor; -import io.druid.java.util.metrics.MonitorScheduler; import io.druid.client.cache.MapCache; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -82,6 +78,10 @@ import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Comparators; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.metrics.Monitor; +import io.druid.java.util.metrics.MonitorScheduler; import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -485,7 +485,7 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) { pushedSegments++; return segment; @@ -527,8 +527,11 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(emitter); taskLockbox = new TaskLockbox(taskStorage); - tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( - SupervisorManager.class))); + tac = new LocalTaskActionClientFactory( + taskStorage, + new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( + SupervisorManager.class)) + ); File tmpDir = temporaryFolder.newFolder(); taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); @@ -1034,7 +1037,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment dataSegment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment dataSegment, boolean useUniquePath) { throw new RuntimeException("FAILURE"); } @@ -1094,7 +1097,22 @@ public void testResumeTasks() throws Exception mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null, null, null) + new IndexTuningConfig( + 10000, + 10, + null, + null, + null, + indexSpec, + null, + false, + null, + null, + null, + null, + null, + null + ) ), null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java index d518931a5faa..b9b27f48d07a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -25,7 +25,6 @@ import io.druid.timeline.DataSegment; import java.io.File; -import java.io.IOException; import java.net.URI; import java.util.Map; import java.util.Set; @@ -48,7 +47,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) { pushedSegments.add(segment); return segment; diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java index 018dcce3e281..177461c39f5f 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentFinder.java @@ -20,24 +20,24 @@ package io.druid.segment.loading; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; import com.google.inject.Inject; - import io.druid.guice.LocalDataStorageDruidModule; +import io.druid.java.util.common.Pair; import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** */ public class LocalDataSegmentFinder implements DataSegmentFinder { - private static final Logger log = new Logger(LocalDataSegmentFinder.class); private final ObjectMapper mapper; @@ -49,25 +49,26 @@ public LocalDataSegmentFinder(ObjectMapper mapper) } @Override - public Set findSegments(String workingDirPath, boolean updateDescriptor) - throws SegmentLoadingException + public Set findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException { + final Map> timestampedSegments = new HashMap<>(); - final Set segments = Sets.newHashSet(); final File workingDir = new File(workingDirPath); if (!workingDir.isDirectory()) { throw new SegmentLoadingException("Working directory [%s] didn't exist !?", workingDir); } - recursiveSearchSegments(segments, workingDir, updateDescriptor); - return segments; + recursiveSearchSegments(timestampedSegments, workingDir, updateDescriptor); + + return timestampedSegments.values().stream().map(x -> x.lhs).collect(Collectors.toSet()); } - private void recursiveSearchSegments(Set segments, File workingDir, boolean updateDescriptor) - throws SegmentLoadingException + private void recursiveSearchSegments( + Map> timestampedSegments, File workingDir, boolean updateDescriptor + ) throws SegmentLoadingException { for (File file : workingDir.listFiles()) { if (file.isDirectory()) { - recursiveSearchSegments(segments, file, updateDescriptor); + recursiveSearchSegments(timestampedSegments, file, updateDescriptor); } else if (file.getName().equals("descriptor.json")) { final File indexZip = new File(file.getParentFile(), "index.zip"); if (indexZip.exists()) { @@ -88,7 +89,8 @@ private void recursiveSearchSegments(Set segments, File workingDir, FileUtils.writeStringToFile(file, mapper.writeValueAsString(dataSegment)); } } - segments.add(dataSegment); + + DataSegmentFinder.putInMapRetainingNewest(timestampedSegments, dataSegment, indexZip.lastModified()); } catch (IOException e) { throw new SegmentLoadingException( diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java index 8c8715d1f9d9..770c89a4a023 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentKiller.java @@ -52,19 +52,21 @@ public void kill(DataSegment segment) throws SegmentLoadingException try { if (path.getName().endsWith(".zip")) { - // path format -- > .../dataSource/interval/version/partitionNum/xxx.zip - File partitionNumDir = path.getParentFile(); - FileUtils.deleteDirectory(partitionNumDir); + // or .../dataSource/interval/version/partitionNum/UUID/xxx.zip + + File parentDir = path.getParentFile(); + FileUtils.deleteDirectory(parentDir); - //try to delete other directories if possible - File versionDir = partitionNumDir.getParentFile(); - if (versionDir.delete()) { - File intervalDir = versionDir.getParentFile(); - if (intervalDir.delete()) { - File dataSourceDir = intervalDir.getParentFile(); - dataSourceDir.delete(); + // possibly recursively delete empty parent directories up to 'dataSource' + parentDir = parentDir.getParentFile(); + int maxDepth = 4; // if for some reason there's no datasSource directory, stop recursing somewhere reasonable + while (parentDir != null && --maxDepth >= 0) { + if (!parentDir.delete() || segment.getDataSource().equals(parentDir.getName())) { + break; } + + parentDir = parentDir.getParentFile(); } } else { throw new SegmentLoadingException("Unknown file type[%s]", path); diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index f60228dfeb3e..830af44be3d9 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -32,7 +32,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.Map; @@ -71,10 +70,11 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(File dataSegmentFile, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(final File dataSegmentFile, final DataSegment segment, final boolean useUniquePath) + throws IOException { final File baseStorageDir = config.getStorageDirectory(); - final File outDir = new File(baseStorageDir, this.getStorageDir(segment)); + final File outDir = new File(baseStorageDir, this.getStorageDir(segment, useUniquePath)); log.info("Copying segment[%s] to local filesystem at location[%s]", segment.getIdentifier(), outDir.toString()); @@ -109,31 +109,15 @@ public DataSegment push(File dataSegmentFile, DataSegment segment, boolean repla ); FileUtils.forceMkdir(outDir); - if (replaceExisting) { - final File indexFileTarget = new File(outDir, tmpIndexFile.getName()); - final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName()); - - if (!tmpIndexFile.renameTo(indexFileTarget)) { - throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget); - } - - if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) { - throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget); - } - } else { - try { - Files.move(tmpIndexFile.toPath(), outDir.toPath().resolve(tmpIndexFile.toPath().getFileName())); - } - catch (FileAlreadyExistsException e) { - log.info("[%s] already exists at [%s], ignore if replication is configured", INDEX_FILENAME, outDir); - } - try { - Files.move(tmpDescriptorFile.toPath(), outDir.toPath().resolve(tmpDescriptorFile.toPath().getFileName())); - } - catch (FileAlreadyExistsException e) { - log.info("[%s] already exists at [%s], ignore if replication is configured", DESCRIPTOR_FILENAME, outDir); - dataSegment = jsonMapper.readValue(new File(outDir, DESCRIPTOR_FILENAME), DataSegment.class); - } + final File indexFileTarget = new File(outDir, tmpIndexFile.getName()); + final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName()); + + if (!tmpIndexFile.renameTo(indexFileTarget)) { + throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget); + } + + if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) { + throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget); } return dataSegment; diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 83c95f595c7e..75377a463ec0 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -91,10 +91,10 @@ public boolean isSegmentLoaded(final DataSegment segment) return findStorageLocationIfLoaded(segment) != null; } - public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) { for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); if (localStorageDir.exists()) { return location; } @@ -127,7 +127,7 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { StorageLocation loc = findStorageLocationIfLoaded(segment); - String storageDir = DataSegmentPusher.getDefaultStorageDir(segment); + String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); if (loc == null) { loc = loadSegmentWithRetry(segment, storageDir); @@ -232,11 +232,11 @@ public void cleanup(DataSegment segment) throws SegmentLoadingException // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. // So we should always clean all possible locations here for (StorageLocation location : getSortedList(locations)) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); + File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); if (localStorageDir.exists()) { // Druid creates folders of the form dataSource/interval/version/partitionNum. // We need to clean up all these directories if they are all empty. - File cacheFile = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment)); + File cacheFile = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); cleanupCacheFiles(location.getPath(), cacheFile); location.removeSegment(segment); } @@ -272,7 +272,7 @@ public void cleanupCacheFiles(File baseFile, File cacheFile) throws IOException } } - public List getSortedList(List locs) + private List getSortedList(List locs) { List locations = new ArrayList<>(locs); Collections.sort(locations, COMPARATOR); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java index dbd1ed831378..51708f4f31cd 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderator.java @@ -194,11 +194,16 @@ default ListenableFuture persistAll(@Nullable Committer committer) * * @param identifiers list of segments to push * @param committer a committer associated with all data that has been added so far + * @param useUniquePath true if the segment should be written to a path with a unique identifier * * @return future that resolves when all segments have been pushed. The segment list will be the list of segments * that have been pushed and the commit metadata from the Committer. */ - ListenableFuture push(Collection identifiers, @Nullable Committer committer); + ListenableFuture push( + Collection identifiers, + @Nullable Committer committer, + boolean useUniquePath + ); /** * Stop any currently-running processing and clean up after ourselves. This allows currently running persists and pushes diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 2f4a851bc57e..3d872e25e6af 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -506,7 +506,8 @@ public ListenableFuture persistAll(@Nullable final Committer committer) @Override public ListenableFuture push( final Collection identifiers, - @Nullable final Committer committer + @Nullable final Committer committer, + final boolean useUniquePath ) { final Map theSinks = Maps.newHashMap(); @@ -530,7 +531,7 @@ public ListenableFuture push( continue; } - final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue()); + final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue(), useUniquePath); if (dataSegment != null) { dataSegments.add(dataSegment); } else { @@ -560,17 +561,13 @@ private ListenableFuture pushBarrier() * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only * be run in the single-threaded pushExecutor. * - * Note that this calls DataSegmentPusher.push() with replaceExisting == true which is appropriate for the indexing - * tasks it is currently being used for (local indexing and Kafka indexing). If this is going to be used by an - * indexing task type that requires replaceExisting == false, this setting will need to be pushed to the caller. - * * @param identifier sink identifier * @param sink sink to push + * @param useUniquePath true if the segment should be written to a path with a unique identifier * * @return segment descriptor, or null if the sink is no longer valid */ - - private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink sink) + private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink sink, final boolean useUniquePath) { // Bail out if this sink is null or otherwise not what we expect. if (sinks.get(identifier) != sink) { @@ -642,17 +639,12 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types final DataSegment segment = RetryUtils.retry( // The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the - // Kafka indexing task, pushers MUST overwrite any existing objects in deep storage with the same identifier - // in order to maintain exactly-once semantics. If they do not and instead favor existing objects, in case of - // failure during publishing, the indexed data may not represent the checkpointed state and data loss or - // duplication may occur. See: https://github.com/druid-io/druid/issues/5161. The local indexing task does not - // support replicas where different tasks could generate segments with the same identifier but potentially - // different contents so it is okay if existing objects are overwritten. In both of these cases, we want to - // favor the most recently pushed segment so replaceExisting == true. + // Kafka indexing task, pushers must use unique file paths in deep storage in order to maintain exactly-once + // semantics. () -> dataSegmentPusher.push( mergedFile, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), - true + useUniquePath ), exception -> exception instanceof Exception, 5 diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java index 2edfa4ed6cbc..a3683d0950a2 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -463,7 +463,7 @@ public String apply(SegmentIdentifier input) // WARNING: Committers.nil() here means that on-disk data can get out of sync with committing. Futures.addCallback( - appenderator.push(segmentsToPush, Committers.nil()), + appenderator.push(segmentsToPush, Committers.nil(), false), new FutureCallback() { @Override diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 24482d19c96c..c578a65d6cd6 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -39,6 +39,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.logger.Logger; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.joda.time.DateTime; @@ -128,6 +129,7 @@ Stream segmentStateStream() private final SegmentAllocator segmentAllocator; private final UsedSegmentChecker usedSegmentChecker; + private final DataSegmentKiller dataSegmentKiller; protected final Appenderator appenderator; // sequenceName -> segmentsForSequence @@ -141,12 +143,14 @@ Stream segmentStateStream() BaseAppenderatorDriver( Appenderator appenderator, SegmentAllocator segmentAllocator, - UsedSegmentChecker usedSegmentChecker + UsedSegmentChecker usedSegmentChecker, + DataSegmentKiller dataSegmentKiller ) { this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); + this.dataSegmentKiller = Preconditions.checkNotNull(dataSegmentKiller, "dataSegmentKiller"); this.executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d")); } @@ -331,24 +335,32 @@ Stream getSegmentWithStates(Collection sequenceNames) * * @param wrappedCommitter should not be null if you want to persist intermediate states * @param segmentIdentifiers identifiers of the segments to be pushed + * @param useUniquePath true if the segment should be written to a path with a unique identifier * * @return a future for pushing segments */ ListenableFuture pushInBackground( @Nullable final WrappedCommitter wrappedCommitter, - final Collection segmentIdentifiers + final Collection segmentIdentifiers, + final boolean useUniquePath ) { log.info("Pushing segments in background: [%s]", Joiner.on(", ").join(segmentIdentifiers)); return Futures.transform( - appenderator.push(segmentIdentifiers, wrappedCommitter), + appenderator.push(segmentIdentifiers, wrappedCommitter, useUniquePath), (Function) segmentsAndMetadata -> { // Sanity check final Set pushedSegments = segmentsAndMetadata.getSegments().stream() .map(SegmentIdentifier::fromDataSegment) .collect(Collectors.toSet()); if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) { + log.warn( + "Removing segments from deep storage because sanity check failed: %s", segmentsAndMetadata.getSegments() + ); + + segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + throw new ISE( "WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", pushedSegments, @@ -436,13 +448,25 @@ ListenableFuture publishInBackground( .collect(Collectors.toSet()); if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers) .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { + log.info( + "Removing our segments from deep storage because someone else already published them: %s", + segmentsAndMetadata.getSegments() + ); + segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + log.info("Our segments really do exist, awaiting handoff."); } else { throw new ISE("Failed to publish segments[%s]", segmentsAndMetadata.getSegments()); } } } - catch (IOException e) { + catch (Exception e) { + log.warn( + "Removing segments from deep storage after failed publish: %s", + segmentsAndMetadata.getSegments() + ); + segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); + throw Throwables.propagate(e); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index c67b31054bf1..4ceb5264097f 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import io.druid.timeline.DataSegment; @@ -41,11 +42,11 @@ /** * This class is specifialized for batch ingestion. In batch ingestion, the segment lifecycle is like: - * + *

*

  * APPENDING -> PUSHED_AND_DROPPED -> PUBLISHED
  * 
- * + *

*

    *
  • APPENDING: Segment is available for appending.
  • *
  • PUSHED_AND_DROPPED: Segment is pushed to deep storage and dropped from the local storage.
  • @@ -57,22 +58,23 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver /** * Create a driver. * - * @param appenderator appenderator - * @param segmentAllocator segment allocator - * @param usedSegmentChecker used segment checker + * @param appenderator appenderator + * @param segmentAllocator segment allocator + * @param usedSegmentChecker used segment checker */ public BatchAppenderatorDriver( Appenderator appenderator, SegmentAllocator segmentAllocator, - UsedSegmentChecker usedSegmentChecker + UsedSegmentChecker usedSegmentChecker, + DataSegmentKiller dataSegmentKiller ) { - super(appenderator, segmentAllocator, usedSegmentChecker); + super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller); } /** * This method always returns null because batch ingestion doesn't support restoring tasks on failures. - + * * @return always null */ @Override @@ -133,7 +135,7 @@ private SegmentsAndMetadata pushAndClear( .collect(Collectors.toList()); final ListenableFuture future = Futures.transform( - pushInBackground(null, segmentIdentifierList), + pushInBackground(null, segmentIdentifierList, false), this::dropInBackground ); @@ -198,7 +200,8 @@ public ListenableFuture publishAll(final TransactionalSegme .map(segmentWithState -> Preconditions.checkNotNull( segmentWithState.getDataSegment(), "dataSegment for segmentId[%s]", - segmentWithState.getSegmentIdentifier()) + segmentWithState.getSegmentIdentifier() + ) ) .collect(Collectors.toList()), null diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index a5b1a3fbdd65..4b0c171ef8a9 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -35,6 +35,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.query.SegmentDescriptor; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -54,11 +55,11 @@ /** * This class is specialized for streaming ingestion. In streaming ingestion, the segment lifecycle is like: - * + *

    *

      * APPENDING -> APPEND_FINISHED -> PUBLISHED
      * 
    - * + *

    *

      *
    • APPENDING: Segment is available for appending.
    • *
    • APPEND_FINISHED: Segment cannot be updated (data cannot be added anymore) and is waiting for being published.
    • @@ -89,11 +90,12 @@ public StreamAppenderatorDriver( SegmentAllocator segmentAllocator, SegmentHandoffNotifierFactory handoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, + DataSegmentKiller dataSegmentKiller, ObjectMapper objectMapper, FireDepartmentMetrics metrics ) { - super(appenderator, segmentAllocator, usedSegmentChecker); + super(appenderator, segmentAllocator, usedSegmentChecker, dataSegmentKiller); this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") .createSegmentHandoffNotifier(appenderator.getDataSource()); @@ -261,7 +263,9 @@ public ListenableFuture publish( .collect(Collectors.toList()); final ListenableFuture publishFuture = Futures.transform( - pushInBackground(wrapCommitter(committer), theSegments), + // useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second + // version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs + pushInBackground(wrapCommitter(committer), theSegments, true), (AsyncFunction) segmentsAndMetadata -> publishInBackground( segmentsAndMetadata, publisher diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 2f6a4a1dd995..91ccae79407a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -444,13 +444,6 @@ public void doRun() log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); - // The realtime plumber can generate segments with the same identifier (i.e. replica tasks) but does not - // have any strict requirement that the contents of these segments be identical. It is possible that two - // tasks generate a segment with the same identifier containing different data, and in this situation we - // want to favor the data from the task which pushed first. This is because it is possible that one - // historical could load the segment after the first task pushed and another historical load the same - // segment after the second task pushed. If the second task's segment overwrote the first one, the second - // historical node would be serving different data from the first. Hence set replaceExisting == false. DataSegment segment = dataSegmentPusher.push( mergedFile, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java index 78b48a0ab6aa..b399bb8c5985 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentFinderTest.java @@ -36,6 +36,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.util.Set; @@ -204,12 +205,6 @@ public void testFindSegments() throws SegmentLoadingException, IOException Assert.assertEquals(serializedSegment4_1, FileUtils.readFileToString(descriptor4_1)); } - private String getDescriptorPath(DataSegment segment) - { - final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path"))); - return indexzip.getParent() + "/" + DESCRIPTOR_JSON; - } - @Test(expected = SegmentLoadingException.class) public void testFindSegmentsFail() throws SegmentLoadingException { @@ -219,4 +214,44 @@ public void testFindSegmentsFail() throws SegmentLoadingException final LocalDataSegmentFinder localDataSegmentFinder = new LocalDataSegmentFinder(mapper); localDataSegmentFinder.findSegments(dataSourceDir.getAbsolutePath(), false); } + + @Test + public void testPreferNewestSegment() throws Exception + { + dataSourceDir = temporaryFolder.newFolder(); + descriptor1 = new File(dataSourceDir.getAbsolutePath() + "/interval10/v10/0/older", DESCRIPTOR_JSON); + descriptor2 = new File(dataSourceDir.getAbsolutePath() + "/interval10/v10/0/newer", DESCRIPTOR_JSON); + + descriptor1.getParentFile().mkdirs(); + descriptor2.getParentFile().mkdirs(); + + mapper.writeValue(descriptor1, SEGMENT_1); + mapper.writeValue(descriptor2, SEGMENT_1); + + indexZip1 = new File(descriptor1.getParentFile(), INDEX_ZIP); + indexZip2 = new File(descriptor2.getParentFile(), INDEX_ZIP); + + FileOutputStream fos1 = new FileOutputStream(indexZip1); + fos1.getFD().sync(); + fos1.close(); + + Thread.sleep(1000); + + FileOutputStream fos2 = new FileOutputStream(indexZip2); + fos2.getFD().sync(); + fos2.close(); + + final Set segments = new LocalDataSegmentFinder(mapper).findSegments( + dataSourceDir.getAbsolutePath(), false + ); + + Assert.assertEquals(1, segments.size()); + Assert.assertEquals(indexZip2.getAbsolutePath(), segments.iterator().next().getLoadSpec().get("path")); + } + + private String getDescriptorPath(DataSegment segment) + { + final File indexzip = new File(String.valueOf(segment.getLoadSpec().get("path"))); + return indexzip.getParent() + "/" + DESCRIPTOR_JSON; + } } diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java index b5eaad7fc9c1..4c75b4d5f681 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentKillerTest.java @@ -31,6 +31,7 @@ import java.io.File; import java.io.IOException; +import java.util.UUID; public class LocalDataSegmentKillerTest { @@ -93,6 +94,28 @@ public void testKill() throws Exception Assert.assertFalse(dataSourceDir.exists()); } + @Test + public void testKillUniquePath() throws Exception + { + final LocalDataSegmentKiller killer = new LocalDataSegmentKiller(new LocalDataSegmentPusherConfig()); + final String uuid = UUID.randomUUID().toString().substring(0, 5); + final File dataSourceDir = temporaryFolder.newFolder("dataSource"); + final File intervalDir = new File(dataSourceDir, "interval"); + final File versionDir = new File(intervalDir, "1"); + final File partitionDir = new File(versionDir, "0"); + final File uuidDir = new File(partitionDir, uuid); + + makePartitionDirWithIndex(uuidDir); + + killer.kill(getSegmentWithPath(new File(uuidDir, "index.zip").toString())); + + Assert.assertFalse(uuidDir.exists()); + Assert.assertFalse(partitionDir.exists()); + Assert.assertFalse(versionDir.exists()); + Assert.assertFalse(intervalDir.exists()); + Assert.assertFalse(dataSourceDir.exists()); + } + private void makePartitionDirWithIndex(File path) throws IOException { Assert.assertTrue(path.mkdirs()); diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java index e8168cb668fe..6ff0ab06ec63 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -92,8 +92,8 @@ public void testPush() throws IOException */ final DataSegment dataSegment2 = dataSegment.withVersion("v2"); - DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); - DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2, true); + DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); + DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2, false); Assert.assertNotNull(returnSegment1); Assert.assertEquals(dataSegment, returnSegment1); @@ -102,14 +102,14 @@ public void testPush() throws IOException Assert.assertEquals(dataSegment2, returnSegment2); Assert.assertNotEquals( - localDataSegmentPusher.getStorageDir(dataSegment), - localDataSegmentPusher.getStorageDir(dataSegment2) + localDataSegmentPusher.getStorageDir(dataSegment, false), + localDataSegmentPusher.getStorageDir(dataSegment2, false) ); for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) { File outDir = new File( config.getStorageDirectory(), - localDataSegmentPusher.getStorageDir(returnSegment) + localDataSegmentPusher.getStorageDir(returnSegment, false) ); File versionFile = new File(outDir, "index.zip"); File descriptorJson = new File(outDir, "descriptor.json"); @@ -119,33 +119,23 @@ public void testPush() throws IOException } @Test - public void testFirstPushWinsForConcurrentPushesWhenReplaceExistingFalse() throws IOException + public void testPushUseUniquePath() throws IOException { - File replicatedDataSegmentFiles = temporaryFolder.newFolder(); - Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); - DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); - DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, false); - - Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions()); - Assert.assertEquals(dataSegment.getDimensions(), returnSegment2.getDimensions()); - - File unzipDir = new File(config.storageDirectory, "unzip"); - FileUtils.forceMkdir(unzipDir); - CompressionUtils.unzip( - new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"), - unzipDir - ); + DataSegment segment = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); - Assert.assertEquals(0x9, Ints.fromByteArray(Files.toByteArray(new File(unzipDir, "version.bin")))); + String path = segment.getLoadSpec().get("path").toString(); + String matcher = ".*/ds/1970-01-01T00:00:00\\.000Z_1970-01-01T00:00:00\\.001Z/v1/0/[A-Za-z0-9-]{36}/index\\.zip"; + Assert.assertTrue(path, path.matches(matcher)); + Assert.assertTrue(new File(path).exists()); } @Test - public void testLastPushWinsForConcurrentPushesWhenReplaceExistingTrue() throws IOException + public void testLastPushWinsForConcurrentPushes() throws IOException { File replicatedDataSegmentFiles = temporaryFolder.newFolder(); Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); - DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); - DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, true); + DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); + DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, false); Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions()); Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions()); @@ -168,7 +158,7 @@ public void testPushCannotCreateDirectory() throws IOException config.storageDirectory = new File(config.storageDirectory, "xxx"); Assert.assertTrue(config.storageDirectory.mkdir()); config.storageDirectory.setWritable(false); - localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); + localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); } @Test diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index 8a5bbda46c19..85bcd22ced33 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -118,7 +118,8 @@ public void testSimpleIngestion() throws Exception // push all final SegmentsAndMetadata segmentsAndMetadata = appenderator.push( appenderator.getSegments(), - committerSupplier.get() + committerSupplier.get(), + false ).get(); Assert.assertEquals(ImmutableMap.of("x", "3"), (Map) segmentsAndMetadata.getCommitMetadata()); Assert.assertEquals( diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 91ea83cc449e..ac0af8334487 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -193,7 +193,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException { if (enablePushFailure && mustFail) { mustFail = false; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index c69c8e443a7d..5ca72100a945 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -27,10 +27,13 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import io.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -41,7 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class BatchAppenderatorDriverTest +public class BatchAppenderatorDriverTest extends EasyMockSupport { private static final String DATA_SOURCE = "foo"; private static final String VERSION = "abc123"; @@ -69,22 +72,29 @@ public class BatchAppenderatorDriverTest private SegmentAllocator allocator; private AppenderatorTester appenderatorTester; private BatchAppenderatorDriver driver; + private DataSegmentKiller dataSegmentKiller; @Before public void setup() { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + dataSegmentKiller = createStrictMock(DataSegmentKiller.class); driver = new BatchAppenderatorDriver( appenderatorTester.getAppenderator(), allocator, - new TestUsedSegmentChecker(appenderatorTester) + new TestUsedSegmentChecker(appenderatorTester), + dataSegmentKiller ); + + EasyMock.replay(dataSegmentKiller); } @After public void tearDown() throws Exception { + EasyMock.verify(dataSegmentKiller); + driver.clear(); driver.close(); } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index a249ccb7f088..a89d9565c8c8 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -34,16 +34,20 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.SegmentDescriptor; -import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier; import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.junit.After; @@ -65,7 +69,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -public class StreamAppenderatorDriverFailTest +public class StreamAppenderatorDriverFailTest extends EasyMockSupport { private static final String DATA_SOURCE = "foo"; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); @@ -92,6 +96,7 @@ public class StreamAppenderatorDriverFailTest SegmentAllocator allocator; TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; StreamAppenderatorDriver driver; + DataSegmentKiller dataSegmentKiller; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -101,6 +106,7 @@ public void setUp() { allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); + dataSegmentKiller = createStrictMock(DataSegmentKiller.class); } @After @@ -126,6 +132,7 @@ public void testFailDuringPersist() throws IOException, InterruptedException, Ti allocator, segmentHandoffNotifierFactory, new NoopUsedSegmentChecker(), + dataSegmentKiller, OBJECT_MAPPER, new FireDepartmentMetrics() ); @@ -163,6 +170,7 @@ public void testFailDuringPush() throws IOException, InterruptedException, Timeo allocator, segmentHandoffNotifierFactory, new NoopUsedSegmentChecker(), + dataSegmentKiller, OBJECT_MAPPER, new FireDepartmentMetrics() ); @@ -200,6 +208,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo allocator, segmentHandoffNotifierFactory, new NoopUsedSegmentChecker(), + dataSegmentKiller, OBJECT_MAPPER, new FireDepartmentMetrics() ); @@ -225,6 +234,94 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo driver.registerHandoff(published).get(); } + @Test + public void testFailDuringPublish() throws Exception + { + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); + expectedException.expectMessage( + "Failed to publish segments[[DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', binaryVersion='0'}, DataSegment{size=0, shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], dimensions=[], version='abc123', loadSpec={}, interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', binaryVersion='0'}]]"); + + testFailDuringPublishInternal(false); + } + + @Test + public void testFailWithExceptionDuringPublish() throws Exception + { + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(RuntimeException.class)); + expectedException.expectMessage("test"); + + testFailDuringPublishInternal(true); + } + + private void testFailDuringPublishInternal(boolean failWithException) throws Exception + { + driver = new StreamAppenderatorDriver( + new FailableAppenderator(), + allocator, + segmentHandoffNotifierFactory, + new NoopUsedSegmentChecker(), + dataSegmentKiller, + OBJECT_MAPPER, + new FireDepartmentMetrics() + ); + + driver.startJob(); + + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + segmentHandoffNotifierFactory.setHandoffDelay(100); + + Assert.assertNull(driver.startJob()); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); + } + + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + + dataSegmentKiller.killQuietly(new DataSegment( + "foo", + Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"), + "abc123", + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NumberedShardSpec(0, 0), + 0, + 0 + )); + EasyMock.expectLastCall().once(); + + EasyMock.replay(dataSegmentKiller); + + try { + driver.publish( + StreamAppenderatorDriverTest.makeFailingPublisher(failWithException), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + throw e; + } + finally { + EasyMock.verify(dataSegmentKiller); + } + } + private static class NoopUsedSegmentChecker implements UsedSegmentChecker { @Override @@ -305,8 +402,11 @@ public Object startJob() @Override public AppenderatorAddResult add( - SegmentIdentifier identifier, InputRow row, Supplier committerSupplier, boolean allowIncrementalPersists - ) throws IndexSizeExceededException, SegmentNotWritableException + SegmentIdentifier identifier, + InputRow row, + Supplier committerSupplier, + boolean allowIncrementalPersists + ) { rows.computeIfAbsent(identifier, k -> new ArrayList<>()).add(row); numRows++; @@ -368,7 +468,7 @@ public ListenableFuture persist( @Override public ListenableFuture push( - Collection identifiers, Committer committer + Collection identifiers, Committer committer, boolean useUniquePath ) { if (pushEnabled) { diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index ee92d2e2d7d0..aff1e020c530 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -20,7 +20,6 @@ package io.druid.segment.realtime.appenderator; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -38,11 +37,14 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.SegmentDescriptor; +import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; @@ -61,7 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -public class StreamAppenderatorDriverTest +public class StreamAppenderatorDriverTest extends EasyMockSupport { private static final String DATA_SOURCE = "foo"; private static final String VERSION = "abc123"; @@ -93,26 +95,33 @@ public class StreamAppenderatorDriverTest private AppenderatorTester appenderatorTester; private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory; private StreamAppenderatorDriver driver; + private DataSegmentKiller dataSegmentKiller; @Before - public void setUp() + public void setUp() throws Exception { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); + dataSegmentKiller = createStrictMock(DataSegmentKiller.class); driver = new StreamAppenderatorDriver( appenderatorTester.getAppenderator(), allocator, segmentHandoffNotifierFactory, new TestUsedSegmentChecker(appenderatorTester), + dataSegmentKiller, OBJECT_MAPPER, new FireDepartmentMetrics() ); + + EasyMock.replay(dataSegmentKiller); } @After public void tearDown() throws Exception { + EasyMock.verify(dataSegmentKiller); + driver.clear(); driver.close(); } @@ -345,19 +354,7 @@ public void testIncrementalHandoff() throws Exception private Set asIdentifiers(Iterable segments) { - return ImmutableSet.copyOf( - Iterables.transform( - segments, - new Function() - { - @Override - public SegmentIdentifier apply(DataSegment input) - { - return SegmentIdentifier.fromDataSegment(input); - } - } - ) - ); + return ImmutableSet.copyOf(Iterables.transform(segments, SegmentIdentifier::fromDataSegment)); } static TransactionalSegmentPublisher makeOkPublisher() @@ -372,6 +369,16 @@ public boolean publishSegments(Set segments, Object commitMetadata) }; } + static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) + { + return (segments, commitMetadata) -> { + if (failWithException) { + throw new RuntimeException("test"); + } + return false; + }; + } + static class TestCommitterSupplier implements Supplier { private final AtomicReference metadata = new AtomicReference<>(); diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index a50f51bf0204..7ec1c32ac4db 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -155,7 +155,7 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) { return segment; }