diff --git a/docs/development/extensions-core/s3.md b/docs/development/extensions-core/s3.md index 20bf52d7c3f9..ed33d4337e0a 100644 --- a/docs/development/extensions-core/s3.md +++ b/docs/development/extensions-core/s3.md @@ -57,6 +57,7 @@ To use S3 for Deep Storage, you must supply [connection information](#configurat |`druid.storage.type`|Global deep storage provider. Must be set to `s3` to make use of this extension.|Must be set (likely `s3`).| |`druid.storage.disableAcl`|Boolean flag for how object permissions are handled. To use ACLs, set this property to `false`. To use Object Ownership, set it to `true`. The permission requirements for ACLs and Object Ownership are different. For more information, see [S3 permissions settings](#s3-permissions-settings).|false| |`druid.storage.useS3aSchema`|If true, use the "s3a" filesystem when using Hadoop-based ingestion. If false, the "s3n" filesystem will be used. Only affects Hadoop-based ingestion.|false| +|`druid.storage.zip`|`true`, `false`|Whether segments in `s3` are written as directories (`false`) or zip files (`true`).|`false`| |`druid.storage.transfer.useTransferManager`| If true, use AWS S3 Transfer Manager to upload segments to S3.|true| |`druid.storage.transfer.minimumUploadPartSize`| Minimum size (in bytes) of each part in a multipart upload.|20971520 (20 MB)| |`druid.storage.transfer.multipartUploadThreshold`| The file size threshold (in bytes) above which a file upload is converted into a multipart upload instead of a single PUT request.| 20971520 (20 MB)| diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java index 18599cd990c6..1cfcad2896e4 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java @@ -117,6 +117,7 @@ public EmbeddedDruidCluster createCluster() MSQExternalDataSourceModule.class ) .addResource(storageResource) + .addCommonProperty("druid.storage.zip", "false") .addServer(coordinator) .addServer(overlord) .addServer(indexer) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index 79435869e7b1..e108143cee55 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -21,7 +21,10 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.MultiObjectDeleteException; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -88,6 +91,8 @@ public void kill(List segments) throws SegmentLoadingException return; } + final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get(); + // create a map of bucket to keys to delete Map> bucketToKeysToDelete = new HashMap<>(); for (DataSegment segment : segments) { @@ -97,11 +102,20 @@ public void kill(List segments) throws SegmentLoadingException s3Bucket, k -> new ArrayList<>() ); - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path)); - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))); + if (path.endsWith("/")) { + // segment is not compressed, list objects and add them all to delete list + final ListObjectsV2Result list = s3Client.listObjectsV2( + new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(path) + ); + for (S3ObjectSummary objectSummary : list.getObjectSummaries()) { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(objectSummary.getKey())); + } + } else { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path)); + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))); + } } - final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get(); boolean shouldThrowException = false; for (Map.Entry> bucketToKeys : bucketToKeysToDelete.entrySet()) { String s3Bucket = bucketToKeys.getKey(); @@ -205,18 +219,29 @@ public void kill(DataSegment segment) throws SegmentLoadingException Map loadSpec = segment.getLoadSpec(); String s3Bucket = MapUtils.getString(loadSpec, S3DataSegmentPuller.BUCKET); String s3Path = MapUtils.getString(loadSpec, S3DataSegmentPuller.KEY); - String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path); - final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get(); - if (s3Client.doesObjectExist(s3Bucket, s3Path)) { - log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); - s3Client.deleteObject(s3Bucket, s3Path); - } - // descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored - // anymore, but we still delete them if exists. - if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) { - log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); - s3Client.deleteObject(s3Bucket, s3DescriptorPath); + + if (s3Path.endsWith("/")) { + // segment is not compressed, list objects and delete them all + final ListObjectsV2Result list = s3Client.listObjectsV2( + new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(s3Path) + ); + for (S3ObjectSummary objectSummary : list.getObjectSummaries()) { + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, objectSummary.getKey()); + s3Client.deleteObject(s3Bucket, objectSummary.getKey()); + } + } else { + String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path); + if (s3Client.doesObjectExist(s3Bucket, s3Path)) { + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3Path); + } + // descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored + // anymore, but we still delete them if exists. + if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) { + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); + } } } catch (AmazonServiceException e) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java index 770fe356e498..ee4de1dbe8b9 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentMover.java @@ -43,6 +43,7 @@ import org.apache.druid.timeline.DataSegment; import java.io.IOException; +import java.nio.file.Paths; import java.util.Map; public class S3DataSegmentMover implements DataSegmentMover @@ -80,39 +81,63 @@ public DataSegment move(DataSegment segment, Map targetLoadSpec) final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket"); final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey"); + final String targetS3Path; - final String targetS3Path = S3Utils.constructSegmentPath( - targetS3BaseKey, - DataSegmentPusher.getDefaultStorageDir(segment, false) - ); + if (s3Path.endsWith("/")) { + // segment is not compressed, list objects and move them all + final ListObjectsV2Result list = s3ClientSupplier.get().listObjectsV2( + new ListObjectsV2Request().withBucketName(s3Bucket) + .withPrefix(s3Path + "/") + ); + targetS3Path = S3Utils.constructSegmentBasePath( + targetS3BaseKey, + DataSegmentPusher.getDefaultStorageDir(segment, false) + ); + for (S3ObjectSummary objectSummary : list.getObjectSummaries()) { + final String fileName = Paths.get(objectSummary.getKey()).getFileName().toString(); + if (targetS3Bucket.isEmpty()) { + throw new SegmentLoadingException("Target S3 bucket is not specified"); + } + if (targetS3Path.isEmpty()) { + throw new SegmentLoadingException("Target S3 baseKey is not specified"); + } - if (targetS3Bucket.isEmpty()) { - throw new SegmentLoadingException("Target S3 bucket is not specified"); - } - if (targetS3Path.isEmpty()) { - throw new SegmentLoadingException("Target S3 baseKey is not specified"); - } + safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path + "/" + fileName); + } + } else { + targetS3Path = S3Utils.constructSegmentPath( + targetS3BaseKey, + DataSegmentPusher.getDefaultStorageDir(segment, false) + ); - safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path); + if (targetS3Bucket.isEmpty()) { + throw new SegmentLoadingException("Target S3 bucket is not specified"); + } + if (targetS3Path.isEmpty()) { + throw new SegmentLoadingException("Target S3 baseKey is not specified"); + } + + safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path); + } return segment.withLoadSpec( ImmutableMap.builder() - .putAll( - Maps.filterKeys( - loadSpec, - new Predicate<>() - { - @Override - public boolean apply(String input) - { - return !("bucket".equals(input) || "key".equals(input)); - } - } - ) - ) - .put("bucket", targetS3Bucket) - .put("key", targetS3Path) - .build() + .putAll( + Maps.filterKeys( + loadSpec, + new Predicate<>() + { + @Override + public boolean apply(String input) + { + return !("bucket".equals(input) || "key".equals(input)); + } + } + ) + ) + .put("bucket", targetS3Bucket) + .put("key", targetS3Path) + .build() ); } catch (AmazonServiceException e) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java index 1ad07422a16d..918bd781ec23 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java @@ -22,8 +22,11 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.io.ByteSource; @@ -43,6 +46,7 @@ import org.apache.druid.segment.loading.URIDataPuller; import org.apache.druid.utils.CompressionUtils; +import javax.annotation.Nonnull; import javax.tools.FileObject; import java.io.File; import java.io.FilterInputStream; @@ -52,6 +56,7 @@ import java.io.Reader; import java.io.Writer; import java.net.URI; +import java.nio.file.Paths; /** * A data segment puller that also hanldes URI data pulls. @@ -77,33 +82,15 @@ FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, fin log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); - if (!isObjectInBucket(s3Coords)) { - throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords); - } - try { FileUtils.mkdirp(outDir); - final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME); - final ByteSource byteSource = new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - try { - return buildFileObject(uri).openInputStream(); - } - catch (AmazonServiceException e) { - if (e.getCause() != null) { - if (S3Utils.S3RETRY.apply(e)) { - throw new IOException("Recoverable exception", e); - } - } - throw new RuntimeException(e); - } - } - }; if (CompressionUtils.isZip(s3Coords.getPath())) { + if (!isObjectInBucket(s3Coords)) { + throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords); + } + final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME); + final ByteSource byteSource = getByteSource(uri); final FileUtils.FileCopyResult result = CompressionUtils.unzip( byteSource, outDir, @@ -112,16 +99,37 @@ public InputStream openStream() throws IOException ); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath()); return result; - } - if (CompressionUtils.isGz(s3Coords.getPath())) { + } else if (CompressionUtils.isGz(s3Coords.getPath())) { + if (!isObjectInBucket(s3Coords)) { + throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords); + } + final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME); + final ByteSource byteSource = getByteSource(uri); final String fname = Files.getNameWithoutExtension(uri.getPath()); final File outFile = new File(outDir, fname); - final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile, S3Utils.S3RETRY); log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath()); return result; + } else if (s3Coords.getPath().endsWith("/")) { + // segment is not compressed, list objects and pull them all + final ListObjectsV2Result list = s3Client.listObjectsV2( + new ListObjectsV2Request().withBucketName(s3Coords.getBucket()) + .withPrefix(s3Coords.getPath()) + ); + FileUtils.FileCopyResult copyResult = new FileUtils.FileCopyResult(); + for (S3ObjectSummary objectSummary : list.getObjectSummaries()) { + final CloudObjectLocation objectLocation = S3Utils.summaryToCloudObjectLocation(objectSummary); + final URI uri = objectLocation.toUri(S3StorageDruidModule.SCHEME); + final ByteSource byteSource = getByteSource(uri); + final File outFile = new File(outDir, Paths.get(objectLocation.getPath()).getFileName().toString()); + outFile.createNewFile(); + final FileUtils.FileCopyResult result = FileUtils.retryCopy(byteSource, outFile, S3Utils.S3RETRY, 3); + copyResult.addFiles(result.getFiles()); + } + log.info("Loaded %d bytes from [%s] to [%s]", copyResult.size(), s3Coords.toString(), outDir.getAbsolutePath()); + return copyResult; } - throw new IAE("Do not know how to load file type at [%s]", uri.toString()); + throw new IAE("Do not know how to load file type at [%s]", s3Coords.toUri(S3StorageDruidModule.SCHEME)); } catch (Exception e) { try { @@ -139,6 +147,30 @@ public InputStream openStream() throws IOException } } + @Nonnull + private ByteSource getByteSource(URI uri) + { + final ByteSource byteSource = new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + try { + return buildFileObject(uri).openInputStream(); + } + catch (AmazonServiceException e) { + if (e.getCause() != null) { + if (S3Utils.S3RETRY.apply(e)) { + throw new IOException("Recoverable exception", e); + } + } + throw new RuntimeException(e); + } + } + }; + return byteSource; + } + @Override public InputStream getInputStream(URI uri) throws IOException { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java index 7b701ab56552..6f0822ecd117 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.SegmentUtils; @@ -86,15 +87,25 @@ public DataSegment push(final File indexFilesDir, final DataSegment inSegment, f @Override public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix) throws IOException { - final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), storageDirSuffix); - log.debug("Copying segment[%s] to S3 at location[%s]", inSegment.getId(), s3Path); + if (config.isZip()) { + final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), storageDirSuffix); + log.debug("Copying segment[%s] to S3 at location[%s]", inSegment.getId(), s3Path); + return pushZip(indexFilesDir, inSegment, s3Path); + } else { + final String s3Path = S3Utils.constructSegmentBasePath(config.getBaseKey(), storageDirSuffix); + log.debug("Copying segment[%s] files to S3 at location[%s]", inSegment.getId(), s3Path); + return pushNoZip(indexFilesDir, inSegment, s3Path); + } + } + private DataSegment pushZip(File indexFilesDir, DataSegment baseSegment, String s3Path) throws IOException + { final File zipOutFile = File.createTempFile("druid", "index.zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); - final DataSegment outSegment = inSegment.withSize(indexSize) - .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path)) - .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); + final DataSegment outSegment = baseSegment.withSize(indexSize) + .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path)) + .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); try { return S3Utils.retryS3Operation( @@ -106,20 +117,7 @@ public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String ); } catch (AmazonServiceException e) { - if (S3Utils.ERROR_ENTITY_TOO_LARGE.equals(S3Utils.getS3ErrorCode(e))) { - throw DruidException - .forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build( - e, - "Got error[%s] from S3 when uploading segment of size[%,d] bytes. This typically happens when segment " - + "size is above 5GB. Try reducing your segment size by lowering the target number of rows per " - + "segment.", - S3Utils.ERROR_ENTITY_TOO_LARGE, - indexSize - ); - } - throw new IOException(e); + throw handlePushServiceException(e, indexSize); } catch (Exception e) { throw new RuntimeException(e); @@ -130,6 +128,43 @@ public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String } } + private DataSegment pushNoZip(File indexFilesDir, DataSegment baseSegment, String s3Path) throws IOException + { + final File[] files = indexFilesDir.listFiles(); + if (files == null) { + throw new IOE("Cannot list directory [%s]", indexFilesDir); + } + + long size = 0; + for (final File file : files) { + if (file.isFile()) { + size += file.length(); + + try { + S3Utils.retryS3Operation( + () -> { + S3Utils.uploadFileIfPossible(s3Client, config.getDisableAcl(), config.getBucket(), s3Path + file.getName(), file); + return null; + } + ); + } + catch (AmazonServiceException e) { + throw handlePushServiceException(e, file.length()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } else { + // Segment directories are expected to be flat. + throw new IOE("Unexpected subdirectory [%s]", file.getName()); + } + } + + return baseSegment.withSize(size) + .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path)) + .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); + } + @Override public Map makeLoadSpec(URI finalIndexZipFilePath) { @@ -155,4 +190,21 @@ private Map makeLoadSpec(String bucket, String key) ); } + private static IOException handlePushServiceException(AmazonServiceException e, long indexSize) + { + if (S3Utils.ERROR_ENTITY_TOO_LARGE.equals(S3Utils.getS3ErrorCode(e))) { + throw DruidException + .forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build( + e, + "Got error[%s] from S3 when uploading segment of size[%,d] bytes. This typically happens when segment " + + "size is above 5GB. Try reducing your segment size by lowering the target number of rows per " + + "segment.", + S3Utils.ERROR_ENTITY_TOO_LARGE, + indexSize + ); + } + return new IOException(e); + } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java index 7c6deb555058..55e65a043c86 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java @@ -43,6 +43,9 @@ public class S3DataSegmentPusherConfig @JsonProperty private boolean useS3aSchema = false; + @JsonProperty + private boolean zip = true; + public void setBucket(String bucket) { this.bucket = bucket; @@ -92,4 +95,9 @@ public int getMaxListingLength() { return maxListingLength; } + + public boolean isZip() + { + return zip; + } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 1eba9907ab36..2a7d4c58179e 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -212,11 +212,16 @@ public static CloudObjectLocation summaryToCloudObjectLocation(S3ObjectSummary o } static String constructSegmentPath(String baseKey, String storageDir) + { + return constructSegmentBasePath(baseKey, storageDir) + "index.zip"; + } + + static String constructSegmentBasePath(String baseKey, String storageDir) { return JOINER.join( baseKey.isEmpty() ? null : baseKey, storageDir - ) + "/index.zip"; + ) + "/"; } static AccessControlList grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3 s3Client, String bucket) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java index 7164c38e3023..1d15ac06a6a2 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java @@ -23,6 +23,8 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Suppliers; @@ -37,6 +39,7 @@ import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; +import org.easymock.LogicalOperator; import org.easymock.Mock; import org.junit.Assert; import org.junit.Test; @@ -44,15 +47,16 @@ import java.io.IOException; import java.net.URI; +import java.util.List; @RunWith(EasyMockRunner.class) public class S3DataSegmentKillerTest extends EasyMockSupport { private static final String KEY_1 = "key1"; - private static final String KEY_1_PATH = KEY_1 + "/"; - private static final String KEY_1_DESCRIPTOR_PATH = KEY_1_PATH + "descriptor.json"; + private static final String KEY_1_PATH = KEY_1 + "/index.zip"; + private static final String KEY_1_DESCRIPTOR_PATH = KEY_1 + "/descriptor.json"; private static final String KEY_2 = "key2"; - private static final String KEY_2_PATH = KEY_2 + "/"; + private static final String KEY_2_PATH = KEY_2 + "/index.zip"; private static final String TEST_BUCKET = "test_bucket"; private static final String TEST_PREFIX = "test_prefix"; private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX)); @@ -86,6 +90,30 @@ public class S3DataSegmentKillerTest extends EasyMockSupport 1 ); + private static final DataSegment DATA_SEGMENT_1_NO_ZIP = new DataSegment( + "test", + Intervals.of("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("bucket", TEST_BUCKET, "key", KEY_1 + "/"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + + private static final DataSegment DATA_SEGMENT_2_NO_ZIP = new DataSegment( + "test", + Intervals.of("2015-04-13/2015-04-14"), + "1", + ImmutableMap.of("bucket", TEST_BUCKET, "key", KEY_2 + "/"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + @Mock private ServerSideEncryptingAmazonS3 s3Client; @Mock @@ -449,4 +477,136 @@ public void test_kill_listOfSegments_unexpectedExceptionIsThrown() ); Assert.assertEquals("Couldn't delete segments from S3. See the task logs for more details.", thrown.getMessage()); } + + @Test + public void test_kill_not_zipped() throws SegmentLoadingException + { + ListObjectsV2Result list = EasyMock.createMock(ListObjectsV2Result.class); + S3ObjectSummary objectSummary = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary.getBucketName()).andReturn(TEST_BUCKET).anyTimes(); + EasyMock.expect(objectSummary.getKey()).andReturn(KEY_1 + "/meta.smoosh").anyTimes(); + S3ObjectSummary objectSummary2 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary2.getBucketName()).andReturn(TEST_BUCKET).anyTimes(); + EasyMock.expect(objectSummary2.getKey()).andReturn(KEY_1 + "/00000.smoosh").anyTimes(); + EasyMock.expect(list.getObjectSummaries()).andReturn(List.of(objectSummary, objectSummary2)).once(); + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject())).andReturn(list).once(); + s3Client.deleteObject(TEST_BUCKET, KEY_1 + "/00000.smoosh"); + EasyMock.expectLastCall().andVoid(); + s3Client.deleteObject(TEST_BUCKET, KEY_1 + "/meta.smoosh"); + EasyMock.expectLastCall().andVoid(); + + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig, objectSummary, objectSummary2, list); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(List.of(DATA_SEGMENT_1_NO_ZIP)); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig, objectSummary, objectSummary2, list); + } + + @Test + public void test_kill_not_zipped_multi() throws SegmentLoadingException + { + ListObjectsV2Result list = EasyMock.createMock(ListObjectsV2Result.class); + S3ObjectSummary objectSummary11 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary11.getBucketName()).andReturn(TEST_BUCKET).anyTimes(); + EasyMock.expect(objectSummary11.getKey()).andReturn(KEY_1 + "/meta.smoosh").anyTimes(); + S3ObjectSummary objectSummary12 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary12.getBucketName()).andReturn(TEST_BUCKET).anyTimes(); + EasyMock.expect(objectSummary12.getKey()).andReturn(KEY_1 + "/00000.smoosh").anyTimes(); + EasyMock.expect(list.getObjectSummaries()).andReturn(List.of(objectSummary11, objectSummary12)).once(); + + ListObjectsV2Result list2 = EasyMock.createMock(ListObjectsV2Result.class); + S3ObjectSummary objectSummary21 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary21.getBucketName()).andReturn(TEST_BUCKET).anyTimes(); + EasyMock.expect(objectSummary21.getKey()).andReturn(KEY_2 + "/meta.smoosh").anyTimes(); + S3ObjectSummary objectSummary22 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary22.getBucketName()).andReturn(TEST_BUCKET).anyTimes(); + EasyMock.expect(objectSummary22.getKey()).andReturn(KEY_2 + "/00000.smoosh").anyTimes(); + EasyMock.expect(list2.getObjectSummaries()).andReturn(List.of(objectSummary21, objectSummary22)).once(); + EasyMock.expect( + s3Client.listObjectsV2( + EasyMock.cmp( + new ListObjectsV2Request().withBucketName(TEST_BUCKET) + .withPrefix(KEY_1 + "/"), + (o1, o2) -> { + if (!o1.getBucketName().equals(o2.getBucketName())) { + return o1.getBucketName().compareTo(o2.getBucketName()); + } + return o1.getPrefix().compareTo(o2.getPrefix()); + }, + LogicalOperator.EQUAL + ) + ) + ).andReturn(list).once(); + EasyMock.expect( + s3Client.listObjectsV2( + EasyMock.cmp( + new ListObjectsV2Request().withBucketName(TEST_BUCKET) + .withPrefix(KEY_2 + "/"), + (o1, o2) -> { + if (!o1.getBucketName().equals(o2.getBucketName())) { + return o1.getBucketName().compareTo(o2.getBucketName()); + } + return o1.getPrefix().compareTo(o2.getPrefix()); + }, + LogicalOperator.EQUAL + ) + ) + ).andReturn(list2).once(); + + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(TEST_BUCKET); + deleteObjectsRequest.withKeys(KEY_1_PATH, KEY_1_PATH); + + s3Client.deleteObjects(EasyMock.cmp( + new DeleteObjectsRequest(TEST_BUCKET).withKeys( + KEY_1 + "/00000.smoosh", + KEY_1 + "/meta.smoosh", + KEY_2 + "/00000.smoosh", + KEY_2 + "/meta.smoosh" + ), + (o1, o2) -> { + if (!o1.getBucketName().equals(o2.getBucketName())) { + return o1.getBucketName().compareTo(o2.getBucketName()); + } + + for (DeleteObjectsRequest.KeyVersion key : o1.getKeys()) { + boolean found = false; + for (DeleteObjectsRequest.KeyVersion key2 : o2.getKeys()) { + if (key.getKey().equals(key2.getKey())) { + found = true; + } + } + if (!found) { + return -1; + } + } + return 0; + }, + LogicalOperator.EQUAL + )); + EasyMock.expectLastCall().andVoid().once(); + + EasyMock.replay( + s3Client, + segmentPusherConfig, + inputDataConfig, + objectSummary11, + objectSummary12, + list, + objectSummary21, + objectSummary22, + list2 + ); + segmentKiller = new S3DataSegmentKiller(Suppliers.ofInstance(s3Client), segmentPusherConfig, inputDataConfig); + segmentKiller.kill(List.of(DATA_SEGMENT_1_NO_ZIP, DATA_SEGMENT_2_NO_ZIP)); + EasyMock.verify( + s3Client, + segmentPusherConfig, + inputDataConfig, + objectSummary11, + objectSummary12, + list, + objectSummary21, + objectSummary22, + list2 + ); + } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java index 55773f0f2a1a..01b307deaba0 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java @@ -45,6 +45,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Date; +import java.util.List; import java.util.zip.GZIPOutputStream; /** @@ -315,4 +316,74 @@ public void testS3ObjectModifiedDate() throws IOException EasyMock.verify(s3Client); Assert.assertEquals(0, modifiedDate); } + + @Test + public void testGetNozip() throws IOException, SegmentLoadingException + { + final String bucket = "bucket"; + final String keyPrefix = "prefix/dir/0/"; + final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); + final byte[] value = bucket.getBytes(StandardCharsets.UTF_8); + + final File tmpFile = temporaryFolder.newFile("meta.smoosh"); + final File tmpFile2 = temporaryFolder.newFile("00000.smoosh"); + + try (OutputStream outputStream = new FileOutputStream(tmpFile)) { + outputStream.write(value); + } + try (OutputStream outputStream = new FileOutputStream(tmpFile2)) { + outputStream.write(value); + } + + ListObjectsV2Result list = EasyMock.createMock(ListObjectsV2Result.class); + S3ObjectSummary objectSummary1 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary1.getBucketName()).andReturn(bucket).anyTimes(); + EasyMock.expect(objectSummary1.getKey()).andReturn(keyPrefix + "meta.smoosh").anyTimes(); + S3ObjectSummary objectSummary2 = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary2.getBucketName()).andReturn(bucket).anyTimes(); + EasyMock.expect(objectSummary2.getKey()).andReturn(keyPrefix + "00000.smoosh").anyTimes(); + EasyMock.expect(list.getObjectSummaries()).andReturn(List.of(objectSummary1, objectSummary2)).once(); + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject())).andReturn(list).once(); + + final S3Object object1 = new S3Object(); + object1.setBucketName(bucket); + object1.setKey(keyPrefix + "meta.smoosh"); + object1.getObjectMetadata().setLastModified(new Date(0)); + object1.setObjectContent(new FileInputStream(tmpFile)); + + final S3Object object2 = new S3Object(); + object2.setBucketName(bucket); + object2.setKey(keyPrefix + "00000.smoosh"); + object2.getObjectMetadata().setLastModified(new Date(0)); + object2.setObjectContent(new FileInputStream(tmpFile)); + + + final File tmpDir = temporaryFolder.newFolder("noZipTestDir"); + + EasyMock.expect(s3Client.getObject(EasyMock.eq(object1.getBucketName()), EasyMock.eq(object1.getKey()))) + .andReturn(object1) + .once(); + EasyMock.expect(s3Client.getObject(EasyMock.eq(object2.getBucketName()), EasyMock.eq(object2.getKey()))) + .andReturn(object2) + .once(); + S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); + + EasyMock.replay(s3Client, list, objectSummary1, objectSummary2); + FileUtils.FileCopyResult result = puller.getSegmentFiles( + new CloudObjectLocation( + bucket, + keyPrefix + ), + tmpDir + ); + EasyMock.verify(s3Client, list, objectSummary1, objectSummary2); + + Assert.assertEquals(value.length + value.length, result.size()); + File expected = new File(tmpDir, "meta.smoosh"); + Assert.assertTrue(expected.exists()); + Assert.assertEquals(value.length, expected.length()); + expected = new File(tmpDir, "00000.smoosh"); + Assert.assertTrue(expected.exists()); + Assert.assertEquals(value.length, expected.length()); + } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java index baa71fac068e..0a4389ed7d84 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java @@ -40,7 +40,7 @@ public class S3DataSegmentPusherConfigTest public void testSerialization() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - + "\"disableAcl\":false,\"maxListingLength\":2000,\"useS3aSchema\":false}"; + + "\"disableAcl\":false,\"maxListingLength\":2000,\"useS3aSchema\":false,\"zip\":true}"; S3DataSegmentPusherConfig config = JSON_MAPPER.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Map expected = JSON_MAPPER.readValue(jsonConfig, Map.class); @@ -53,7 +53,7 @@ public void testSerializationWithDefaults() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}"; String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - + "\"disableAcl\":false,\"maxListingLength\":1024,\"useS3aSchema\":false}"; + + "\"disableAcl\":false,\"maxListingLength\":1024,\"useS3aSchema\":false,\"zip\":true}"; S3DataSegmentPusherConfig config = JSON_MAPPER.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Map expected = JSON_MAPPER.readValue(expectedJsonConfig, Map.class); Map actual = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(config), Map.class); @@ -64,7 +64,7 @@ public void testSerializationWithDefaults() throws IOException public void testSerializationValidatingMaxListingLength() throws IOException { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - + "\"disableAcl\":false,\"maxListingLength\":-1}"; + + "\"disableAcl\":false,\"maxListingLength\":-1,\"zip\":true}"; Validator validator = Validation.buildDefaultValidatorFactory().getValidator(); S3DataSegmentPusherConfig config = JSON_MAPPER.readValue(jsonConfig, S3DataSegmentPusherConfig.class); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java index 698f9d6e63f4..2a271caddb1c 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java @@ -90,6 +90,34 @@ public void testEntityTooLarge() ); } + @Test + public void testPushNoZip() throws Exception + { + ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); + + final AccessControlList acl = new AccessControlList(); + acl.setOwner(new Owner("ownerId", "owner")); + acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl)); + EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once(); + + s3Client.upload(EasyMock.anyObject(PutObjectRequest.class)); + EasyMock.expectLastCall().once(); + + EasyMock.replay(s3Client); + + S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig() + { + @Override + public boolean isZip() + { + return false; + } + }; + config.setBucket("bucket"); + config.setBaseKey("key"); + validate(false, "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/", s3Client, config); + } + private void testPushInternal(boolean useUniquePath, String matcher) throws Exception { ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class); @@ -132,7 +160,11 @@ private void validate(boolean useUniquePath, String matcher, ServerSideEncryptin S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig(); config.setBucket("bucket"); config.setBaseKey("key"); + validate(useUniquePath, matcher, s3Client, config); + } + private void validate(boolean useUniquePath, String matcher, ServerSideEncryptingAmazonS3 s3Client, S3DataSegmentPusherConfig config) throws IOException + { S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config); // Create a mock segment on disk