Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-core/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ To use S3 for Deep Storage, you must supply [connection information](#configurat
|`druid.storage.type`|Global deep storage provider. Must be set to `s3` to make use of this extension.|Must be set (likely `s3`).|
|`druid.storage.disableAcl`|Boolean flag for how object permissions are handled. To use ACLs, set this property to `false`. To use Object Ownership, set it to `true`. The permission requirements for ACLs and Object Ownership are different. For more information, see [S3 permissions settings](#s3-permissions-settings).|false|
|`druid.storage.useS3aSchema`|If true, use the "s3a" filesystem when using Hadoop-based ingestion. If false, the "s3n" filesystem will be used. Only affects Hadoop-based ingestion.|false|
|`druid.storage.zip`|`true`, `false`|Whether segments in `s3` are written as directories (`false`) or zip files (`true`).|`false`|
|`druid.storage.transfer.useTransferManager`| If true, use AWS S3 Transfer Manager to upload segments to S3.|true|
|`druid.storage.transfer.minimumUploadPartSize`| Minimum size (in bytes) of each part in a multipart upload.|20971520 (20 MB)|
|`druid.storage.transfer.multipartUploadThreshold`| The file size threshold (in bytes) above which a file upload is converted into a multipart upload instead of a single PUT request.| 20971520 (20 MB)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public EmbeddedDruidCluster createCluster()
MSQExternalDataSourceModule.class
)
.addResource(storageResource)
.addCommonProperty("druid.storage.zip", "false")
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -88,6 +91,8 @@ public void kill(List<DataSegment> segments) throws SegmentLoadingException
return;
}

final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();

// create a map of bucket to keys to delete
Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete = new HashMap<>();
for (DataSegment segment : segments) {
Expand All @@ -97,11 +102,20 @@ public void kill(List<DataSegment> segments) throws SegmentLoadingException
s3Bucket,
k -> new ArrayList<>()
);
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
if (path.endsWith("/")) {
// segment is not compressed, list objects and add them all to delete list
final ListObjectsV2Result list = s3Client.listObjectsV2(
new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(path)
);
for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(objectSummary.getKey()));
}
} else {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
}
}

final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
boolean shouldThrowException = false;
for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys : bucketToKeysToDelete.entrySet()) {
String s3Bucket = bucketToKeys.getKey();
Expand Down Expand Up @@ -205,18 +219,29 @@ public void kill(DataSegment segment) throws SegmentLoadingException
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, S3DataSegmentPuller.BUCKET);
String s3Path = MapUtils.getString(loadSpec, S3DataSegmentPuller.KEY);
String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path);

final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
// descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored
// anymore, but we still delete them if exists.
if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);

if (s3Path.endsWith("/")) {
// segment is not compressed, list objects and delete them all
final ListObjectsV2Result list = s3Client.listObjectsV2(
new ListObjectsV2Request().withBucketName(s3Bucket).withPrefix(s3Path)
);
for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, objectSummary.getKey());
s3Client.deleteObject(s3Bucket, objectSummary.getKey());
}
} else {
String s3DescriptorPath = DataSegmentKiller.descriptorPath(s3Path);
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
// descriptor.json is a file to store segment metadata in deep storage. This file is deprecated and not stored
// anymore, but we still delete them if exists.
if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
}
catch (AmazonServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.timeline.DataSegment;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;

public class S3DataSegmentMover implements DataSegmentMover
Expand Down Expand Up @@ -80,39 +81,63 @@ public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec)

final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");
final String targetS3Path;

final String targetS3Path = S3Utils.constructSegmentPath(
targetS3BaseKey,
DataSegmentPusher.getDefaultStorageDir(segment, false)
);
if (s3Path.endsWith("/")) {
// segment is not compressed, list objects and move them all
final ListObjectsV2Result list = s3ClientSupplier.get().listObjectsV2(
new ListObjectsV2Request().withBucketName(s3Bucket)
.withPrefix(s3Path + "/")
);
targetS3Path = S3Utils.constructSegmentBasePath(
targetS3BaseKey,
DataSegmentPusher.getDefaultStorageDir(segment, false)
);
for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
final String fileName = Paths.get(objectSummary.getKey()).getFileName().toString();
if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
}
if (targetS3Path.isEmpty()) {
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}

if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
}
if (targetS3Path.isEmpty()) {
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}
safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path + "/" + fileName);
}
} else {
targetS3Path = S3Utils.constructSegmentPath(
targetS3BaseKey,
DataSegmentPusher.getDefaultStorageDir(segment, false)
);

safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
if (targetS3Bucket.isEmpty()) {
throw new SegmentLoadingException("Target S3 bucket is not specified");
}
if (targetS3Path.isEmpty()) {
throw new SegmentLoadingException("Target S3 baseKey is not specified");
}

safeMove(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
}

return segment.withLoadSpec(
ImmutableMap.<String, Object>builder()
.putAll(
Maps.filterKeys(
loadSpec,
new Predicate<>()
{
@Override
public boolean apply(String input)
{
return !("bucket".equals(input) || "key".equals(input));
}
}
)
)
.put("bucket", targetS3Bucket)
.put("key", targetS3Path)
.build()
.putAll(
Maps.filterKeys(
loadSpec,
new Predicate<>()
{
@Override
public boolean apply(String input)
{
return !("bucket".equals(input) || "key".equals(input));
}
}
)
)
.put("bucket", targetS3Bucket)
.put("key", targetS3Path)
.build()
);
}
catch (AmazonServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.io.ByteSource;
Expand All @@ -43,6 +46,7 @@
import org.apache.druid.segment.loading.URIDataPuller;
import org.apache.druid.utils.CompressionUtils;

import javax.annotation.Nonnull;
import javax.tools.FileObject;
import java.io.File;
import java.io.FilterInputStream;
Expand All @@ -52,6 +56,7 @@
import java.io.Reader;
import java.io.Writer;
import java.net.URI;
import java.nio.file.Paths;

/**
* A data segment puller that also hanldes URI data pulls.
Expand All @@ -77,33 +82,15 @@

log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);

if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}

try {
FileUtils.mkdirp(outDir);

final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return buildFileObject(uri).openInputStream();
}
catch (AmazonServiceException e) {
if (e.getCause() != null) {
if (S3Utils.S3RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}
}
throw new RuntimeException(e);
}
}
};
if (CompressionUtils.isZip(s3Coords.getPath())) {
if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = getByteSource(uri);
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
byteSource,
outDir,
Expand All @@ -112,16 +99,37 @@
);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
return result;
}
if (CompressionUtils.isGz(s3Coords.getPath())) {
} else if (CompressionUtils.isGz(s3Coords.getPath())) {
if (!isObjectInBucket(s3Coords)) {
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
}
final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = getByteSource(uri);
final String fname = Files.getNameWithoutExtension(uri.getPath());
final File outFile = new File(outDir, fname);

final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile, S3Utils.S3RETRY);
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath());
return result;
} else if (s3Coords.getPath().endsWith("/")) {
// segment is not compressed, list objects and pull them all
final ListObjectsV2Result list = s3Client.listObjectsV2(
new ListObjectsV2Request().withBucketName(s3Coords.getBucket())
.withPrefix(s3Coords.getPath())
);
FileUtils.FileCopyResult copyResult = new FileUtils.FileCopyResult();
for (S3ObjectSummary objectSummary : list.getObjectSummaries()) {
final CloudObjectLocation objectLocation = S3Utils.summaryToCloudObjectLocation(objectSummary);
final URI uri = objectLocation.toUri(S3StorageDruidModule.SCHEME);
final ByteSource byteSource = getByteSource(uri);
final File outFile = new File(outDir, Paths.get(objectLocation.getPath()).getFileName().toString());
outFile.createNewFile();
Comment thread Dismissed

Check notice

Code scanning / CodeQL

Ignored error status of call Note

Method getSegmentFiles ignores exceptional return value of File.createNewFile.
final FileUtils.FileCopyResult result = FileUtils.retryCopy(byteSource, outFile, S3Utils.S3RETRY, 3);
copyResult.addFiles(result.getFiles());
}
log.info("Loaded %d bytes from [%s] to [%s]", copyResult.size(), s3Coords.toString(), outDir.getAbsolutePath());
return copyResult;
}
throw new IAE("Do not know how to load file type at [%s]", uri.toString());
throw new IAE("Do not know how to load file type at [%s]", s3Coords.toUri(S3StorageDruidModule.SCHEME));
}
catch (Exception e) {
try {
Expand All @@ -139,6 +147,30 @@
}
}

@Nonnull
private ByteSource getByteSource(URI uri)
{
final ByteSource byteSource = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
try {
return buildFileObject(uri).openInputStream();
}
catch (AmazonServiceException e) {
if (e.getCause() != null) {
if (S3Utils.S3RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}
}
throw new RuntimeException(e);
}
}
};
return byteSource;
}

@Override
public InputStream getInputStream(URI uri) throws IOException
{
Expand Down
Loading