diff --git a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java index b9bf810f72ce..2ce046665977 100644 --- a/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java +++ b/api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java @@ -39,7 +39,30 @@ public interface DataSegmentPusher @Deprecated String getPathForHadoop(String dataSource); String getPathForHadoop(); - DataSegment push(File file, DataSegment segment) throws IOException; + + /** + * Pushes index files and segment descriptor to deep storage. + * @param file directory containing index files + * @param segment segment descriptor + * @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict. + * The behavior of the indexer determines whether this should be true or false. For example, + * since Tranquility does not guarantee that replica tasks will generate indexes with the same + * data, the first segment pushed should be favored since otherwise multiple historicals may + * load segments with the same identifier but different contents which is a bad situation. On + * the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can + * lose or repeat data if it fails to write a segment because it already exists and overwriting + * is not permitted. This situation can occur if a task fails after pushing to deep storage but + * before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161. + * + * If replaceExisting is true, existing objects MUST be overwritten, since failure to do so + * will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be + * prioritized but it is acceptable if they are overwritten (deep storages may be eventually + * consistent or otherwise unable to support transactional writes). + * @return segment descriptor + * @throws IOException + */ + DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException; + //use map instead of LoadSpec class to avoid dependency pollution. Map makeLoadSpec(URI finalIndexZipFilePath); diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java index 97bc1a0167a7..39ab34258e0d 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureDataSegmentPusher.java @@ -109,12 +109,13 @@ public DataSegment uploadDataSegment( final long size, final File compressedSegmentData, final File descriptorFile, - final Map azurePaths + final Map azurePaths, + final boolean replaceExisting ) throws StorageException, IOException, URISyntaxException { - azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index")); - azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor")); + azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"), replaceExisting); + azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"), replaceExisting); final DataSegment outSegment = segment .withSize(size) @@ -131,9 +132,9 @@ public DataSegment uploadDataSegment( } @Override - public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException + public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting) + throws IOException { - log.info("Uploading [%s] to Azure.", indexFilesDir); final int version = SegmentUtils.getVersionFromDir(indexFilesDir); @@ -153,7 +154,7 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr @Override public DataSegment call() throws Exception { - return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths); + return uploadDataSegment(segment, version, size, outFile, descFile, azurePaths, replaceExisting); } }, config.getMaxTries() diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java index 8585bf043797..28fa9dad56c6 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureStorage.java @@ -25,7 +25,6 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.ListBlobItem; - import io.druid.java.util.common.logger.Logger; import java.io.File; @@ -81,14 +80,24 @@ public List emptyCloudBlobDirectory(final String containerName, final St } - public void uploadBlob(final File file, final String containerName, final String blobPath) + public void uploadBlob( + final File file, + final String containerName, + final String blobPath, + final boolean replaceExisting + ) throws IOException, StorageException, URISyntaxException { CloudBlobContainer container = getCloudBlobContainer(containerName); try (FileInputStream stream = new FileInputStream(file)) { CloudBlockBlob blob = container.getBlockBlobReference(blobPath); - blob.upload(stream, file.length()); + + if (!replaceExisting && blob.exists()) { + log.info("Skipping push because blob [%s] exists && replaceExisting == false", blobPath); + } else { + blob.upload(stream, file.length()); + } } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index f4b6093ed760..50092ceff62c 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -59,7 +59,7 @@ public void pushTaskLog(final String taskid, final File logFile) throws IOExcept try { AzureUtils.retryAzureOperation( (Callable) () -> { - azureStorage.uploadBlob(logFile, config.getContainer(), taskKey); + azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true); return null; }, config.getMaxTries() diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java index af76f3571424..30c571a4f163 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -104,7 +104,7 @@ public void testPush() throws Exception size ); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); } @@ -133,9 +133,9 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn final File descriptorFile = new File("descriptor.json"); final Map azurePaths = pusher.getAzurePaths(dataSegment); - azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index")); + azureStorage.uploadBlob(compressedSegmentData, containerName, azurePaths.get("index"), true); expectLastCall(); - azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor")); + azureStorage.uploadBlob(descriptorFile, containerName, azurePaths.get("descriptor"), true); expectLastCall(); replayAll(); @@ -146,7 +146,8 @@ public void uploadDataSegmentTest() throws StorageException, IOException, URISyn 0, // empty file compressedSegmentData, descriptorFile, - azurePaths + azurePaths, + true ); assertEquals(compressedSegmentData.length(), pushedDataSegment.getSize()); diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java index 26ff677af177..5f8394a735a4 100644 --- a/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/storage/azure/AzureTaskLogsTest.java @@ -65,7 +65,7 @@ public void testPushTaskLog() throws Exception try { final File logFile = new File(tmpDir, "log"); - azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log"); + azureStorage.uploadBlob(logFile, container, prefix + "/" + taskid + "/log", true); expectLastCall(); replayAll(); diff --git a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java index 3595147ace85..3b3ad8e5539b 100644 --- a/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java +++ b/extensions-contrib/cassandra-storage/src/main/java/io/druid/storage/cassandra/CassandraDataSegmentPusher.java @@ -24,8 +24,9 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.exceptions.NotFoundException; import com.netflix.astyanax.recipes.storage.ChunkedStorage; - +import com.netflix.astyanax.recipes.storage.ChunkedStorageProvider; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; @@ -53,7 +54,8 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data @Inject public CassandraDataSegmentPusher( CassandraDataSegmentConfig config, - ObjectMapper jsonMapper) + ObjectMapper jsonMapper + ) { super(config); this.jsonMapper = jsonMapper; @@ -73,13 +75,14 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(final File indexFilesDir, DataSegment segment) throws IOException + public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean replaceExisting) + throws IOException { log.info("Writing [%s] to C*", indexFilesDir); String key = JOINER.join( config.getKeyspace().isEmpty() ? null : config.getKeyspace(), this.getStorageDir(segment) - ); + ); // Create index final File compressedIndexFile = File.createTempFile("druid", "index.zip"); @@ -89,26 +92,28 @@ public DataSegment push(final File indexFilesDir, DataSegment segment) throws IO int version = SegmentUtils.getVersionFromDir(indexFilesDir); try { - long start = System.currentTimeMillis(); - ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) - .withConcurrencyLevel(CONCURRENCY).call(); - byte[] json = jsonMapper.writeValueAsBytes(segment); - MutationBatch mutation = this.keyspace.prepareMutationBatch(); - mutation.withRow(descriptorStorage, key) - .putColumn("lastmodified", System.currentTimeMillis(), null) - .putColumn("descriptor", json, null); - mutation.execute(); - log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); + if (!replaceExisting && doesObjectExist(indexStorage, key)) { + log.info("Skipping push because key [%s] exists && replaceExisting == false", key); + } else { + long start = System.currentTimeMillis(); + ChunkedStorage.newWriter(indexStorage, key, new FileInputStream(compressedIndexFile)) + .withConcurrencyLevel(CONCURRENCY).call(); + byte[] json = jsonMapper.writeValueAsBytes(segment); + MutationBatch mutation = this.keyspace.prepareMutationBatch(); + mutation.withRow(descriptorStorage, key) + .putColumn("lastmodified", System.currentTimeMillis(), null) + .putColumn("descriptor", json, null); + mutation.execute(); + log.info("Wrote index to C* in [%s] ms", System.currentTimeMillis() - start); + } } catch (Exception e) { throw new IOException(e); } segment = segment.withSize(indexSize) - .withLoadSpec( - ImmutableMap. of("type", "c*", "key", key) - ) - .withBinaryVersion(version); + .withLoadSpec(ImmutableMap.of("type", "c*", "key", key)) + .withBinaryVersion(version); log.info("Deleting zipped index File[%s]", compressedIndexFile); compressedIndexFile.delete(); @@ -120,4 +125,14 @@ public Map makeLoadSpec(URI uri) { throw new UnsupportedOperationException("not supported"); } + + private boolean doesObjectExist(ChunkedStorageProvider provider, String objectName) throws Exception + { + try { + return ChunkedStorage.newInfoReader(provider, objectName).call().isValidForRead(); + } + catch (NotFoundException e) { + return false; + } + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java index a08105b0575e..e224a5b83b78 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java @@ -74,7 +74,8 @@ public String getPathForHadoop(final String dataSource) } @Override - public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException + public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting) + throws IOException { final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment)); @@ -98,18 +99,23 @@ public DataSegment call() throws Exception segmentPath, outFile, objectApi.getRegion(), objectApi.getContainer() ); - log.info("Pushing %s.", segmentData.getPath()); - objectApi.put(segmentData); - - // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in - // runtime, and because Guava deletes methods over time, that causes incompatibilities. - Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment)); - CloudFilesObject descriptorData = new CloudFilesObject( - segmentPath, descFile, - objectApi.getRegion(), objectApi.getContainer() - ); - log.info("Pushing %s.", descriptorData.getPath()); - objectApi.put(descriptorData); + + if (!replaceExisting && objectApi.exists(segmentData.getPath())) { + log.info("Skipping push because object [%s] exists && replaceExisting == false", segmentData.getPath()); + } else { + log.info("Pushing %s.", segmentData.getPath()); + objectApi.put(segmentData); + + // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in + // runtime, and because Guava deletes methods over time, that causes incompatibilities. + Files.write(descFile.toPath(), jsonMapper.writeValueAsBytes(inSegment)); + CloudFilesObject descriptorData = new CloudFilesObject( + segmentPath, descFile, + objectApi.getRegion(), objectApi.getContainer() + ); + log.info("Pushing %s.", descriptorData.getPath()); + objectApi.put(descriptorData); + } final DataSegment outSegment = inSegment .withSize(indexSize) diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java index 7f41497b37c9..d495fcc1c77f 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/storage/cloudfiles/CloudFilesObjectApiProxy.java @@ -58,4 +58,9 @@ public CloudFilesObject get(String path) Payload payload = swiftObject.getPayload(); return new CloudFilesObject(payload, this.region, this.container, path); } + + public boolean exists(String path) + { + return objectApi.getWithoutBody(path) != null; + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java index b257efb1282a..73be6ce0e1c1 100644 --- a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java +++ b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/storage/cloudfiles/CloudFilesDataSegmentPusherTest.java @@ -84,7 +84,7 @@ public void testPush() throws Exception size ); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java index 527e7e14025e..e8489ea8bd86 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleDataSegmentPusher.java @@ -93,7 +93,8 @@ public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegmen return descriptorFile; } - public void insert(final File file, final String contentType, final String path) throws IOException + public void insert(final File file, final String contentType, final String path, final boolean replaceExisting) + throws IOException { LOG.info("Inserting [%s] to [%s]", file, path); @@ -102,11 +103,16 @@ public void insert(final File file, final String contentType, final String path) InputStreamContent mediaContent = new InputStreamContent(contentType, fileSteam); mediaContent.setLength(file.length()); - storage.insert(config.getBucket(), path, mediaContent); + if (!replaceExisting && storage.exists(config.getBucket(), path)) { + LOG.info("Skipping push because path [%s] exists && replaceExisting == false", path); + } else { + storage.insert(config.getBucket(), path, mediaContent); + } } @Override - public DataSegment push(final File indexFilesDir, final DataSegment segment) throws IOException + public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting) + throws IOException { LOG.info("Uploading [%s] to Google.", indexFilesDir); @@ -128,8 +134,8 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment) thr descriptorFile = createDescriptorFile(jsonMapper, outSegment); - insert(indexFile, "application/zip", indexPath); - insert(descriptorFile, "application/json", descriptorPath); + insert(indexFile, "application/zip", indexPath, replaceExisting); + insert(descriptorFile, "application/json", descriptorPath, replaceExisting); return outSegment; } diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java index 6c845d433c77..11372c243aa0 100644 --- a/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/storage/google/GoogleDataSegmentPusherTest.java @@ -103,20 +103,30 @@ public void testPush() throws Exception storage, googleAccountConfig, jsonMapper - ).addMockedMethod("insert", File.class, String.class, String.class).createMock(); + ).addMockedMethod("insert", File.class, String.class, String.class, boolean.class).createMock(); final String storageDir = pusher.getStorageDir(segmentToPush); final String indexPath = prefix + "/" + storageDir + "/" + "index.zip"; final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json"; - pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/zip"), EasyMock.eq(indexPath)); + pusher.insert( + EasyMock.anyObject(File.class), + EasyMock.eq("application/zip"), + EasyMock.eq(indexPath), + EasyMock.eq(true) + ); expectLastCall(); - pusher.insert(EasyMock.anyObject(File.class), EasyMock.eq("application/json"), EasyMock.eq(descriptorPath)); + pusher.insert( + EasyMock.anyObject(File.class), + EasyMock.eq("application/json"), + EasyMock.eq(descriptorPath), + EasyMock.eq(true) + ); expectLastCall(); replayAll(); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); Assert.assertEquals(segmentToPush, segment); diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java index 33a85ac2147a..3374d4333329 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -89,7 +89,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File inDir, DataSegment segment) throws IOException + public DataSegment push(File inDir, DataSegment segment, boolean replaceExisting) throws IOException { final String storageDir = this.getStorageDir(segment); @@ -145,8 +145,8 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException // Create parent if it does not exist, recreation is not an error fs.mkdirs(outIndexFile.getParent()); - copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile); - copyFilesWithChecks(fs, tmpIndexFile, outIndexFile); + copyFilesWithChecks(fs, tmpDescriptorFile, outDescriptorFile, replaceExisting); + copyFilesWithChecks(fs, tmpIndexFile, outIndexFile, replaceExisting); } finally { try { @@ -162,9 +162,10 @@ public DataSegment push(File inDir, DataSegment segment) throws IOException return dataSegment; } - private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to) throws IOException + private void copyFilesWithChecks(final FileSystem fs, final Path from, final Path to, final boolean replaceExisting) + throws IOException { - if (!HadoopFsWrapper.rename(fs, from, to)) { + if (!HadoopFsWrapper.rename(fs, from, to, replaceExisting)) { if (fs.exists(to)) { log.info( "Unable to rename temp Index file[%s] to final segment path [%s]. " diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java index df00fdb7a0fa..4604241dd76a 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java @@ -36,23 +36,25 @@ public class HadoopFsWrapper private HadoopFsWrapper() {} /** - * Same as FileSystem.rename(from, to, Options.Rename.NONE) . That is, - * it returns "false" when "to" directory already exists. It is different from FileSystem.rename(from, to) - * which moves "from" directory inside "to" directory if it already exists. + * Same as FileSystem.rename(from, to, Options.Rename). It is different from FileSystem.rename(from, to) which moves + * "from" directory inside "to" directory if it already exists. * * @param from * @param to - * @return - * @throws IOException + * @param replaceExisting if existing files should be overwritten + * + * @return true if operation succeeded, false if replaceExisting == false and destination already exists + * + * @throws IOException if trying to overwrite a non-empty directory */ - public static boolean rename(FileSystem fs, Path from, Path to) throws IOException + public static boolean rename(FileSystem fs, Path from, Path to, boolean replaceExisting) throws IOException { try { - fs.rename(from, to, Options.Rename.NONE); + fs.rename(from, to, replaceExisting ? Options.Rename.OVERWRITE : Options.Rename.NONE); return true; } - catch (IOException ex) { - log.warn(ex, "Failed to rename [%s] to [%s].", from, to); + catch (FileAlreadyExistsException ex) { + log.info(ex, "Destination exists while renaming [%s] to [%s]", from, to); return false; } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index 95dd85bf2b9e..bfc119f6498a 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -161,7 +161,7 @@ private void testUsingScheme(final String scheme) throws Exception size ); - DataSegment segment = pusher.push(segmentDir, segmentToPush); + DataSegment segment = pusher.push(segmentDir, segmentToPush, true); String indexUri = StringUtils.format( @@ -201,7 +201,7 @@ private void testUsingScheme(final String scheme) throws Exception File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath)); outDir.setReadOnly(); try { - pusher.push(segmentDir, segmentToPush); + pusher.push(segmentDir, segmentToPush, true); } catch (IOException e) { Assert.fail("should not throw exception"); @@ -246,7 +246,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n } for (int i = 0; i < numberOfSegments; i++) { - final DataSegment pushedSegment = pusher.push(segmentDir, segments[i]); + final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], true); String indexUri = StringUtils.format( "%s/%s/%d_index.zip", @@ -308,7 +308,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n File outDir = new File(StringUtils.format("%s/%s", config.getStorageDirectory(), segmentPath)); outDir.setReadOnly(); try { - pusher.push(segmentDir, segments[i]); + pusher.push(segmentDir, segments[i], true); } catch (IOException e) { Assert.fail("should not throw exception"); diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 1100287cb966..333e7fae5dd1 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -88,7 +88,8 @@ public List getAllowedPropertyPrefixesForHadoop() } @Override - public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException + public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean replaceExisting) + throws IOException { final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment)); @@ -105,21 +106,10 @@ public DataSegment push(final File indexFilesDir, final DataSegment inSegment) t public DataSegment call() throws Exception { S3Object toPush = new S3Object(zipOutFile); - - final String outputBucket = config.getBucket(); - final String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); - - toPush.setBucketName(outputBucket); - toPush.setKey(s3Path); - if (!config.getDisableAcl()) { - toPush.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); - } - - log.info("Pushing %s.", toPush); - s3Client.putObject(outputBucket, toPush); + putObject(config.getBucket(), s3Path, toPush, replaceExisting); final DataSegment outSegment = inSegment.withSize(indexSize) - .withLoadSpec(makeLoadSpec(outputBucket, toPush.getKey())) + .withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey())) .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); File descriptorFile = File.createTempFile("druid", "descriptor.json"); @@ -127,14 +117,13 @@ public DataSegment call() throws Exception // runtime, and because Guava deletes methods over time, that causes incompatibilities. Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment)); S3Object descriptorObject = new S3Object(descriptorFile); - descriptorObject.setBucketName(outputBucket); - descriptorObject.setKey(s3DescriptorPath); - if (!config.getDisableAcl()) { - descriptorObject.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); - } - log.info("Pushing %s", descriptorObject); - s3Client.putObject(outputBucket, descriptorObject); + putObject( + config.getBucket(), + S3Utils.descriptorPathForSegmentPath(s3Path), + descriptorObject, + replaceExisting + ); log.info("Deleting zipped index File[%s]", zipOutFile); zipOutFile.delete(); @@ -164,7 +153,6 @@ public Map makeLoadSpec(URI finalIndexZipFilePath) /** * Any change in loadSpec need to be reflected {@link io.druid.indexer.JobHelper#getURIFromSegment()} - * */ @SuppressWarnings("JavadocReference") private Map makeLoadSpec(String bucket, String key) @@ -180,4 +168,22 @@ private Map makeLoadSpec(String bucket, String key) config.isUseS3aSchema() ? "s3a" : "s3n" ); } + + private void putObject(String bucketName, String path, S3Object object, boolean replaceExisting) + throws ServiceException + { + object.setBucketName(bucketName); + object.setKey(path); + if (!config.getDisableAcl()) { + object.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + } + + log.info("Pushing %s.", object); + + if (!replaceExisting && S3Utils.isObjectInBucket(s3Client, bucketName, object.getKey())) { + log.info("Skipping push because key [%s] exists && replaceExisting == false", object.getKey()); + } else { + s3Client.putObject(bucketName, object); + } + } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java index 32818b17e487..f26bd1610398 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java @@ -113,7 +113,7 @@ public S3Object answer() throws Throwable size ); - DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush); + DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush, true); Assert.assertEquals(segmentToPush.getSize(), segment.getSize()); Assert.assertEquals(1, (int) segment.getBinaryVersion()); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index b6e2a26d88fc..0bba29fbe881 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -199,7 +199,10 @@ public void finishJob() .withDimensions(ImmutableList.copyOf(mappedSegment.getAvailableDimensions())) .withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload)); - dataSegmentPusher.push(fileToUpload, segmentToUpload); + // This plumber is only used in batch ingestion situations where you do not have replica tasks pushing + // segments with the same identifier but potentially different contents. In case of conflict, favor the most + // recently pushed segment (replaceExisting == true). + dataSegmentPusher.push(fileToUpload, segmentToUpload, true); log.info( "Uploaded segment[%s]", diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index 4605995ee613..0e752cb988ce 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -440,7 +440,11 @@ private void convertSegment(TaskToolbox toolbox) throws SegmentLoadingException, // Appending to the version makes a new version that inherits most comparability parameters of the original // version, but is "newer" than said original version. DataSegment updatedSegment = segment.withVersion(StringUtils.format("%s_v%s", segment.getVersion(), outVersion)); - updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment); + + // The convert segment task does not support replicas where different tasks could generate segments with the + // same identifier but potentially different contents. In case of conflict, favor the most recently pushed + // segment (replaceExisting == true). + updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, true); actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment))); } else { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 8a6cb91dea52..2b5574127d65 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -185,7 +185,11 @@ public String apply(DataSegment input) long uploadStart = System.currentTimeMillis(); // Upload file - final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment); + + // The merge task does not support replicas where different tasks could generate segments with the + // same identifier but potentially different contents. In case of conflict, favor the most recently pushed + // segment (replaceExisting == true). + final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, true); emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 79310c9771de..ece5c55c7eb1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -1006,7 +1006,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { segments.add(segment); return segment; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 63389a804bae..e42f55d9f605 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -201,7 +201,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { // the merged segment is pushed to storage segments.add(segment); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 624ddc4cc425..3a255ea1fd55 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -249,7 +249,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { return segment; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index e408bd28c2db..50912834680b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -485,7 +485,7 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { pushedSegments++; return segment; @@ -1034,7 +1034,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment dataSegment) throws IOException + public DataSegment push(File file, DataSegment dataSegment, boolean replaceExisting) throws IOException { throw new RuntimeException("FAILURE"); } diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java index 923a80299605..d518931a5faa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -48,7 +48,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { pushedSegments.add(segment); return segment; diff --git a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java index 08614ff8d759..fd7bc1e6ec57 100644 --- a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java @@ -48,36 +48,59 @@ public class CompressionUtils { private static final Logger log = new Logger(CompressionUtils.class); private static final int DEFAULT_RETRY_COUNT = 3; - - public static final String GZ_SUFFIX = ".gz"; - public static final String ZIP_SUFFIX = ".zip"; + private static final String GZ_SUFFIX = ".gz"; + private static final String ZIP_SUFFIX = ".zip"; /** * Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param outputZipFile The output file to write the zipped data to + * @param fsync True if the output file should be fsynced to disk * * @return The number of bytes (uncompressed) read from the input directory. * * @throws IOException */ - public static long zip(File directory, File outputZipFile) throws IOException + public static long zip(File directory, File outputZipFile, boolean fsync) throws IOException { if (!isZip(outputZipFile.getName())) { log.warn("No .zip suffix[%s], putting files from [%s] into it anyway.", outputZipFile, directory); } try (final FileOutputStream out = new FileOutputStream(outputZipFile)) { - return zip(directory, out); + long bytes = zip(directory, out); + + // For explanation of why fsyncing here is a good practice: + // https://github.com/druid-io/druid/pull/5187#pullrequestreview-85188984 + if (fsync) { + out.getChannel().force(true); + } + + return bytes; } } + /** + * Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped + * + * @param directory The directory whose contents should be added to the zip in the output stream. + * @param outputZipFile The output file to write the zipped data to + * + * @return The number of bytes (uncompressed) read from the input directory. + * + * @throws IOException + */ + public static long zip(File directory, File outputZipFile) throws IOException + { + return zip(directory, outputZipFile, false); + } + /** * Zips the contents of the input directory to the output stream. Sub directories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. - * @param out The output stream to write the zip data to. It is closed in the process + * @param out The output stream to write the zip data to. Caller is responsible for closing this stream. * * @return The number of bytes (uncompressed) read from the input directory. * @@ -88,23 +111,23 @@ public static long zip(File directory, OutputStream out) throws IOException if (!directory.isDirectory()) { throw new IOE("directory[%s] is not a directory", directory); } - final File[] files = directory.listFiles(); + + final ZipOutputStream zipOut = new ZipOutputStream(out); long totalSize = 0; - try (final ZipOutputStream zipOut = new ZipOutputStream(out)) { - for (File file : files) { - log.info("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize); - if (file.length() >= Integer.MAX_VALUE) { - zipOut.finish(); - throw new IOE("file[%s] too large [%,d]", file, file.length()); - } - zipOut.putNextEntry(new ZipEntry(file.getName())); - totalSize += Files.asByteSource(file).copyTo(zipOut); + for (File file : directory.listFiles()) { + log.info("Adding file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize); + if (file.length() >= Integer.MAX_VALUE) { + zipOut.finish(); + throw new IOE("file[%s] too large [%,d]", file, file.length()); } - zipOut.closeEntry(); - // Workarround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf - zipOut.flush(); + zipOut.putNextEntry(new ZipEntry(file.getName())); + totalSize += Files.asByteSource(file).copyTo(zipOut); } + zipOut.closeEntry(); + // Workaround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf + zipOut.flush(); + zipOut.finish(); return totalSize; } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index b6e52bd551d3..0c00e0aeb334 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import io.druid.java.util.common.CompressionUtils; +import io.druid.java.util.common.IOE; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; @@ -33,23 +34,22 @@ import java.net.URI; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; +import java.nio.file.StandardOpenOption; import java.util.Map; import java.util.UUID; -/** - */ public class LocalDataSegmentPusher implements DataSegmentPusher { private static final Logger log = new Logger(LocalDataSegmentPusher.class); + private static final String INDEX_FILENAME = "index.zip"; + private static final String DESCRIPTOR_FILENAME = "descriptor.json"; + private final LocalDataSegmentPusherConfig config; private final ObjectMapper jsonMapper; @Inject - public LocalDataSegmentPusher( - LocalDataSegmentPusherConfig config, - ObjectMapper jsonMapper - ) + public LocalDataSegmentPusher(LocalDataSegmentPusherConfig config, ObjectMapper jsonMapper) { this.config = config; this.jsonMapper = jsonMapper; @@ -71,7 +71,7 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException + public DataSegment push(File dataSegmentFile, DataSegment segment, boolean replaceExisting) throws IOException { final String storageDir = this.getStorageDir(segment); final File baseStorageDir = config.getStorageDirectory(); @@ -95,27 +95,53 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce final File tmpOutDir = new File(baseStorageDir, intermediateDirFor(storageDir)); log.info("Creating intermediate directory[%s] for segment[%s]", tmpOutDir.toString(), segment.getIdentifier()); - final long size = compressSegment(dataSegmentFile, tmpOutDir); - - final DataSegment dataSegment = createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip").toURI())) - .withSize(size) - .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), - tmpOutDir - ); + FileUtils.forceMkdir(tmpOutDir); - // moving the temporary directory to the final destination, once success the potentially concurrent push operations - // will be failed and will read the descriptor.json created by current push operation directly - FileUtils.forceMkdir(outDir.getParentFile()); try { - Files.move(tmpOutDir.toPath(), outDir.toPath()); + final File tmpIndexFile = new File(tmpOutDir, INDEX_FILENAME); + final long size = compressSegment(dataSegmentFile, tmpIndexFile); + + final File tmpDescriptorFile = new File(tmpOutDir, DESCRIPTOR_FILENAME); + DataSegment dataSegment = createDescriptorFile( + segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI())) + .withSize(size) + .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), + tmpDescriptorFile + ); + + FileUtils.forceMkdir(outDir); + if (replaceExisting) { + final File indexFileTarget = new File(outDir, tmpIndexFile.getName()); + final File descriptorFileTarget = new File(outDir, tmpDescriptorFile.getName()); + + if (!tmpIndexFile.renameTo(indexFileTarget)) { + throw new IOE("Failed to rename [%s] to [%s]", tmpIndexFile, indexFileTarget); + } + + if (!tmpDescriptorFile.renameTo(descriptorFileTarget)) { + throw new IOE("Failed to rename [%s] to [%s]", tmpDescriptorFile, descriptorFileTarget); + } + } else { + try { + Files.move(tmpIndexFile.toPath(), outDir.toPath().resolve(tmpIndexFile.toPath().getFileName())); + } + catch (FileAlreadyExistsException e) { + log.info("[%s] already exists at [%s], ignore if replication is configured", INDEX_FILENAME, outDir); + } + try { + Files.move(tmpDescriptorFile.toPath(), outDir.toPath().resolve(tmpDescriptorFile.toPath().getFileName())); + } + catch (FileAlreadyExistsException e) { + log.info("[%s] already exists at [%s], ignore if replication is configured", DESCRIPTOR_FILENAME, outDir); + dataSegment = jsonMapper.readValue(new File(outDir, DESCRIPTOR_FILENAME), DataSegment.class); + } + } + + return dataSegment; } - catch (FileAlreadyExistsException e) { - log.warn("Push destination directory[%s] exists, ignore this message if replication is configured.", outDir); + finally { FileUtils.deleteDirectory(tmpOutDir); - return jsonMapper.readValue(new File(outDir, "descriptor.json"), DataSegment.class); } - return dataSegment; } @Override @@ -129,21 +155,21 @@ private String intermediateDirFor(String storageDir) return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString(); } - private long compressSegment(File dataSegmentFile, File outDir) throws IOException + private long compressSegment(File dataSegmentFile, File dest) throws IOException { - FileUtils.forceMkdir(outDir); - File outFile = new File(outDir, "index.zip"); - log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile); - return CompressionUtils.zip(dataSegmentFile, outFile); + log.info("Compressing files from[%s] to [%s]", dataSegmentFile, dest); + return CompressionUtils.zip(dataSegmentFile, dest, true); } - private DataSegment createDescriptorFile(DataSegment segment, File outDir) throws IOException + private DataSegment createDescriptorFile(DataSegment segment, File dest) throws IOException { - File descriptorFile = new File(outDir, "descriptor.json"); - log.info("Creating descriptor file at[%s]", descriptorFile); + log.info("Creating descriptor file at[%s]", dest); // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in // runtime, and because Guava deletes methods over time, that causes incompatibilities. - Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(segment)); + Files.write( + dest.toPath(), jsonMapper.writeValueAsBytes(segment), StandardOpenOption.CREATE, StandardOpenOption.SYNC + ); + return segment; } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index aec0f751a3b6..c8fd9c52d41d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -556,6 +556,10 @@ private ListenableFuture pushBarrier() * Merge segment, push to deep storage. Should only be used on segments that have been fully persisted. Must only * be run in the single-threaded pushExecutor. * + * Note that this calls DataSegmentPusher.push() with replaceExisting == true which is appropriate for the indexing + * tasks it is currently being used for (local indexing and Kafka indexing). If this is going to be used by an + * indexing task type that requires replaceExisting == false, this setting will need to be pushed to the caller. + * * @param identifier sink identifier * @param sink sink to push * @@ -633,9 +637,18 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink // Retry pushing segments because uploading to deep storage might fail especially for cloud storage types final DataSegment segment = RetryUtils.retry( + // The appenderator is currently being used for the local indexing task and the Kafka indexing task. For the + // Kafka indexing task, pushers MUST overwrite any existing objects in deep storage with the same identifier + // in order to maintain exactly-once semantics. If they do not and instead favor existing objects, in case of + // failure during publishing, the indexed data may not represent the checkpointed state and data loss or + // duplication may occur. See: https://github.com/druid-io/druid/issues/5161. The local indexing task does not + // support replicas where different tasks could generate segments with the same identifier but potentially + // different contents so it is okay if existing objects are overwritten. In both of these cases, we want to + // favor the most recently pushed segment so replaceExisting == true. () -> dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), + true ), exception -> exception instanceof Exception, 5 diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 3a19ed632161..345e687cf2e5 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -444,9 +444,17 @@ public void doRun() log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); + // The realtime plumber can generate segments with the same identifier (i.e. replica tasks) but does not + // have any strict requirement that the contents of these segments be identical. It is possible that two + // tasks generate a segment with the same identifier containing different data, and in this situation we + // want to favor the data from the task which pushed first. This is because it is possible that one + // historical could load the segment after the first task pushed and another historical load the same + // segment after the second task pushed. If the second task's segment overwrote the first one, the second + // historical node would be serving different data from the first. Hence set replaceExisting == false. DataSegment segment = dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)), + false ); log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); segmentPublisher.publishSegment(segment); diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java index 2309735424d5..e8168cb668fe 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.Files; import com.google.common.primitives.Ints; +import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.segment.TestHelper; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; +import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -54,7 +56,18 @@ public class LocalDataSegmentPusherTest Intervals.utc(0, 1), "v1", null, + ImmutableList.of("dim1"), null, + NoneShardSpec.instance(), + null, + -1 + ); + DataSegment dataSegment2 = new DataSegment( + "ds", + Intervals.utc(0, 1), + "v1", + null, + ImmutableList.of("dim2"), null, NoneShardSpec.instance(), null, @@ -79,8 +92,8 @@ public void testPush() throws IOException */ final DataSegment dataSegment2 = dataSegment.withVersion("v2"); - DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment); - DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2); + DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); + DataSegment returnSegment2 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment2, true); Assert.assertNotNull(returnSegment1); Assert.assertEquals(dataSegment, returnSegment1); @@ -106,14 +119,45 @@ public void testPush() throws IOException } @Test - public void testFirstPushWinsForConcurrentPushes() throws IOException + public void testFirstPushWinsForConcurrentPushesWhenReplaceExistingFalse() throws IOException { File replicatedDataSegmentFiles = temporaryFolder.newFolder(); Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); - DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment); - DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment); + DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, false); + DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, false); + + Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions()); + Assert.assertEquals(dataSegment.getDimensions(), returnSegment2.getDimensions()); + + File unzipDir = new File(config.storageDirectory, "unzip"); + FileUtils.forceMkdir(unzipDir); + CompressionUtils.unzip( + new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"), + unzipDir + ); + + Assert.assertEquals(0x9, Ints.fromByteArray(Files.toByteArray(new File(unzipDir, "version.bin")))); + } + + @Test + public void testLastPushWinsForConcurrentPushesWhenReplaceExistingTrue() throws IOException + { + File replicatedDataSegmentFiles = temporaryFolder.newFolder(); + Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); + DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); + DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment2, true); + + Assert.assertEquals(dataSegment.getDimensions(), returnSegment1.getDimensions()); + Assert.assertEquals(dataSegment2.getDimensions(), returnSegment2.getDimensions()); + + File unzipDir = new File(config.storageDirectory, "unzip"); + FileUtils.forceMkdir(unzipDir); + CompressionUtils.unzip( + new File(config.storageDirectory, "/ds/1970-01-01T00:00:00.000Z_1970-01-01T00:00:00.001Z/v1/0/index.zip"), + unzipDir + ); - Assert.assertEquals(returnSegment1, returnSegment2); + Assert.assertEquals(0x8, Ints.fromByteArray(Files.toByteArray(new File(unzipDir, "version.bin")))); } @Test @@ -124,7 +168,7 @@ public void testPushCannotCreateDirectory() throws IOException config.storageDirectory = new File(config.storageDirectory, "xxx"); Assert.assertTrue(config.storageDirectory.mkdir()); config.storageDirectory.setWritable(false); - localDataSegmentPusher.push(dataSegmentFiles, dataSegment); + localDataSegmentPusher.push(dataSegmentFiles, dataSegment, true); } @Test diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 555e05d20673..186434c01167 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -193,7 +193,7 @@ public String getPathForHadoop() } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { if (enablePushFailure && mustFail) { mustFail = false; diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index b4ebfc3eeee7..a50f51bf0204 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -155,7 +155,7 @@ public String getPathForHadoop(String dataSource) } @Override - public DataSegment push(File file, DataSegment segment) throws IOException + public DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException { return segment; }