From 914510f1cf7bc3f1de0a08a2e2127ffb807a04f2 Mon Sep 17 00:00:00 2001 From: zachary sherman Date: Wed, 4 Mar 2020 23:50:18 -0800 Subject: [PATCH 1/5] Ability to Delete task logs and segments from S3 * implement ability to delete all tasks logs or all task logs written before a particular date when written to S3 * implement ability to delete all segments from S3 deep storage * upgrade version of aws SDK in use --- .../druid/common/utils/TimeSupplier.java | 32 ++ .../druid/storage/s3/S3DataSegmentKiller.java | 62 +++- .../storage/s3/S3StorageDruidModule.java | 1 + .../apache/druid/storage/s3/S3TaskLogs.java | 76 ++++- .../druid/storage/s3/S3TaskLogsConfig.java | 6 + .../s3/ServerSideEncryptingAmazonS3.java | 6 + .../storage/s3/S3DataSegmentKillerTest.java | 183 +++++++++++ .../druid/storage/s3/S3TaskLogsTest.java | 298 +++++++++++++++++- .../apache/druid/storage/s3/S3TestUtils.java | 212 +++++++++++++ pom.xml | 2 +- 10 files changed, 853 insertions(+), 25 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java create mode 100644 extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java create mode 100644 extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java diff --git a/core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java b/core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java new file mode 100644 index 000000000000..08fa6b14ef60 --- /dev/null +++ b/core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.common.utils; + +import java.util.function.Supplier; + +public class TimeSupplier implements Supplier +{ + @Override + public Long get() + { + return System.currentTimeMillis(); + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index 6df4161d846b..edecbc9b35b3 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -20,6 +20,10 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.inject.Inject; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -27,20 +31,32 @@ import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; +import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** + * */ public class S3DataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(S3DataSegmentKiller.class); private final ServerSideEncryptingAmazonS3 s3Client; + private final S3DataSegmentPusherConfig segmentPusherConfig; + private final S3InputDataConfig inputDataConfig; @Inject - public S3DataSegmentKiller(ServerSideEncryptingAmazonS3 s3Client) + public S3DataSegmentKiller( + ServerSideEncryptingAmazonS3 s3Client, + S3DataSegmentPusherConfig segmentPusherConfig, + S3InputDataConfig inputDataConfig + ) { this.s3Client = s3Client; + this.segmentPusherConfig = segmentPusherConfig; + this.inputDataConfig = inputDataConfig; } @Override @@ -69,8 +85,48 @@ public void kill(DataSegment segment) throws SegmentLoadingException } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + try { + S3Utils.retryS3Operation( + () -> { + String bucketName = segmentPusherConfig.getBucket(); + String prefix = segmentPusherConfig.getBaseKey(); + int maxListingLength = inputDataConfig.getMaxListingLength(); + ListObjectsV2Result result; + String continuationToken = null; + do { + log.info("Deleting batch of %d segment files from s3 location [bucket: %s prefix: %s].", + maxListingLength, bucketName, prefix + ); + ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(bucketName) + .withPrefix(prefix) + .withContinuationToken(continuationToken) + .withMaxKeys(maxListingLength); + + result = s3Client.listObjectsV2(request); + List objectSummaries = result.getObjectSummaries(); + + List keyVersionsToDelete = + objectSummaries.stream() + .map(x -> new DeleteObjectsRequest.KeyVersion(x.getKey())) + .collect(Collectors.toList()); + + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName) + .withBucketName(bucketName) + .withKeys(keyVersionsToDelete); + s3Client.deleteObjects(deleteRequest); + + continuationToken = result.getContinuationToken(); + } while (result.isTruncated()); + return null; + } + ); + } + catch (Exception e) { + log.error("Error occurred while deleting segment files from s3. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index 2676b6d4aa9a..723548b1fcbc 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -155,6 +155,7 @@ public void configure(Binder binder) .to(S3DataSegmentArchiver.class) .in(LazySingleton.class); Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(S3DataSegmentPusher.class).in(LazySingleton.class); + JsonConfigProvider.bind(binder, "druid.storage", S3InputDataConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 9cee08b177e9..19fcd5e72d12 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -21,12 +21,17 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import org.apache.druid.common.utils.TimeSupplier; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -35,6 +40,8 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.List; +import java.util.stream.Collectors; /** * Provides task logs archived on S3. @@ -45,12 +52,21 @@ public class S3TaskLogs implements TaskLogs private final ServerSideEncryptingAmazonS3 service; private final S3TaskLogsConfig config; + private final S3InputDataConfig inputDataConfig; + private final TimeSupplier timeSupplier; @Inject - public S3TaskLogs(ServerSideEncryptingAmazonS3 service, S3TaskLogsConfig config) + public S3TaskLogs( + ServerSideEncryptingAmazonS3 service, + S3TaskLogsConfig config, + S3InputDataConfig inputDataConfig, + TimeSupplier timeSupplier + ) { this.service = service; this.config = config; + this.inputDataConfig = inputDataConfig; + this.timeSupplier = timeSupplier; } @Override @@ -152,14 +168,64 @@ String getTaskLogKey(String taskid, String filename) } @Override - public void killAll() + public void killAll() throws IOException { - throw new UnsupportedOperationException("not implemented"); + log.info("Deleting all task logs from s3 location [bucket: %s prefix: %s].", + config.getS3Bucket(), config.getS3Prefix() + ); + + long now = timeSupplier.get(); + killOlderThan(now); } @Override - public void killOlderThan(long timestamp) + public void killOlderThan(long timestamp) throws IOException { - throw new UnsupportedOperationException("not implemented"); + try { + S3Utils.retryS3Operation( + () -> { + String bucketName = config.getS3Bucket(); + String prefix = config.getS3Prefix(); + int maxListingLength = inputDataConfig.getMaxListingLength(); + ListObjectsV2Result result; + String continuationToken = null; + do { + log.info("Deleting batch of %d task logs from s3 location [bucket: %s prefix: %s].", + maxListingLength, bucketName, prefix + ); + ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(bucketName) + .withPrefix(prefix) + .withContinuationToken(continuationToken) + .withMaxKeys(maxListingLength); + + result = service.listObjectsV2(request); + List objectSummaries = result.getObjectSummaries(); + + List keyVersionsToDelete = + objectSummaries.stream() + .filter(x -> x.getLastModified().getTime() < timestamp) + .map(x -> new DeleteObjectsRequest.KeyVersion( + x.getKey())) + .collect( + Collectors.toList()); + + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName) + .withBucketName(bucketName) + .withKeys(keyVersionsToDelete); + if (!deleteRequest.getKeys().isEmpty()) { + service.deleteObjects(deleteRequest); + } + + continuationToken = result.getContinuationToken(); + } while (result.isTruncated()); + return null; + } + ); + } + catch (Exception e) { + log.error("Error occurred while deleting task log files from s3. Error: %s", e.getMessage()); + throw new IOException(e); + } } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java index e61dbf795925..6fc04113f5ac 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogsConfig.java @@ -61,6 +61,12 @@ public String getS3Prefix() return s3Prefix; } + @VisibleForTesting + void setS3Prefix(String s3Prefix) + { + this.s3Prefix = s3Prefix; + } + public boolean getDisableAcl() { return disableAcl; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java index 236cb2c8748e..6c353959dcd9 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java @@ -25,6 +25,7 @@ import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.CopyObjectResult; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ListObjectsV2Request; @@ -128,6 +129,11 @@ public void deleteObject(String bucket, String key) amazonS3.deleteObject(bucket, key); } + public void deleteObjects(DeleteObjectsRequest request) + { + amazonS3.deleteObjects(request); + } + public static class Builder { private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder(); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java new file mode 100644 index 000000000000..453811da191b --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.SdkClientException; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; + +@RunWith(EasyMockRunner.class) +public class S3DataSegmentKillerTest extends EasyMockSupport +{ + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final String TEST_BUCKET = "test_bucket"; + private static final String TEST_PREFIX = "test_prefix"; + private static final String CONTINUATION_STRING = "continuationToken"; + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final int MAX_KEYS = 1; + private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); + private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); + + @Mock private ServerSideEncryptingAmazonS3 s3Client; + @Mock private S3DataSegmentPusherConfig segmentPusherConfig; + @Mock private S3InputDataConfig inputDataConfig; + + private S3DataSegmentKiller segmentKiller; + + @Test + public void test_killAll_noException_deletesAllSegments() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); + S3ObjectSummary objectSummary2 = S3TestUtils.mockS3ObjectSummary(TIME_1, KEY_2); + + + ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + ListObjectsV2Result result1 = S3TestUtils.mockResult(CONTINUATION_STRING, true, ImmutableList.of(objectSummary1)); + + ListObjectsV2Request request2 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, CONTINUATION_STRING); + ListObjectsV2Result result2 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary2)); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of( + request1, result1, + request2, result2 + ), + ImmutableMap.of() + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_2) + )); + + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1, deleteRequest2), ImmutableList.of()); + + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller.killAll(); + EasyMock.verify(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, segmentPusherConfig, inputDataConfig); + } + + @Test + public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); + + + ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + ListObjectsV2Result result1 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary1)); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of( + request1, result1 + ), + ImmutableMap.of(request1, RECOVERABLE_EXCEPTION) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of()); + + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(objectSummary1, request1, result1, s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller.killAll(); + EasyMock.verify(objectSummary1, request1, result1, s3Client, segmentPusherConfig, inputDataConfig); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() + { + boolean ioExceptionThrown = false; + ListObjectsV2Request request1 = null; + try { + request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of(), + ImmutableMap.of(request1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); + EasyMock.expectLastCall().anyTimes(); + + EasyMock.replay(request1, s3Client, segmentPusherConfig, inputDataConfig); + + segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); + segmentKiller.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + EasyMock.verify(request1, s3Client, segmentPusherConfig, inputDataConfig); + } + + + +} diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java index a342b3b0cb93..4282965e3845 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -19,24 +19,54 @@ package org.apache.druid.storage.s3; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.Grant; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.utils.TimeSupplier; import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; import java.io.File; +import java.io.IOException; import java.util.List; -public class S3TaskLogsTest +@RunWith(EasyMockRunner.class) +public class S3TaskLogsTest extends EasyMockSupport { + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final String TEST_BUCKET = "test_bucket"; + private static final String TEST_PREFIX = "test_prefix"; + private static final String CONTINUATION_STRING = "continuationToken"; + private static final long TIME_0 = 0L; + private static final long TIME_1 = 1L; + private static final long TIME_NOW = 2L; + private static final long TIME_FUTURE = 3L; + private static final int MAX_KEYS = 1; + private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); + private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); + + @Mock private TimeSupplier timeSupplier; + @Mock private ServerSideEncryptingAmazonS3 s3Client; + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @@ -48,7 +78,7 @@ public void testTaskLogsPushWithAclDisabled() throws Exception List grantList = testPushInternal(true, ownerId, ownerDisplayName); - Assert.assertTrue("Grant list should not be null", grantList != null); + Assert.assertNotNull("Grant list should not be null", grantList); Assert.assertEquals("Grant list should be empty as ACL is disabled", 0, grantList.size()); } @@ -60,38 +90,275 @@ public void testTaskLogsPushWithAclEnabled() throws Exception List grantList = testPushInternal(false, ownerId, ownerDisplayName); - Assert.assertTrue("Grant list should not be null", grantList != null); + Assert.assertNotNull("Grant list should not be null", grantList); Assert.assertEquals("Grant list size should be equal to 1", 1, grantList.size()); Grant grant = grantList.get(0); - Assert.assertEquals("The Grantee identifier should be test_owner", "test_owner", grant.getGrantee().getIdentifier()); + Assert.assertEquals( + "The Grantee identifier should be test_owner", + "test_owner", + grant.getGrantee().getIdentifier() + ); Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission()); } - private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception + @Test + public void test_killAll_noException_deletesAllTaskLogs() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); + S3ObjectSummary objectSummary2 = S3TestUtils.mockS3ObjectSummary(TIME_1, KEY_2); + + EasyMock.expect(timeSupplier.get()).andReturn(TIME_NOW); + + ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + ListObjectsV2Result result1 = S3TestUtils.mockResult(CONTINUATION_STRING, true, ImmutableList.of(objectSummary1)); + + ListObjectsV2Request request2 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, CONTINUATION_STRING); + ListObjectsV2Result result2 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary2)); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of( + request1, result1, + request2, result2 + ), + ImmutableMap.of() + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_2) + )); + + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1, deleteRequest2), ImmutableList.of()); + + EasyMock.replay(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killAll(); + + EasyMock.verify(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + } + + @Test + public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); + + EasyMock.expect(timeSupplier.get()).andReturn(TIME_NOW); + + ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + ListObjectsV2Result result1 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary1)); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of( + request1, result1 + ), + ImmutableMap.of(request1, RECOVERABLE_EXCEPTION) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of()); + + EasyMock.replay(objectSummary1, request1, result1, s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killAll(); + + EasyMock.verify(objectSummary1, request1, result1, s3Client, timeSupplier); + } + + @Test + public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + ListObjectsV2Request request1 = null; + try { + EasyMock.expect(timeSupplier.get()).andReturn(TIME_NOW); + + request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of(), + ImmutableMap.of(request1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.replay(request1, s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killAll(); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(request1, s3Client, timeSupplier); + } + + @Test + public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException { - ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class); + S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); + S3ObjectSummary objectSummary2 = S3TestUtils.mockS3ObjectSummary(TIME_FUTURE, KEY_2); + + ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + ListObjectsV2Result result1 = S3TestUtils.mockResult(CONTINUATION_STRING, true, ImmutableList.of(objectSummary1)); + + ListObjectsV2Request request2 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, CONTINUATION_STRING); + ListObjectsV2Result result2 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary2)); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of( + request1, result1, + request2, result2 + ), + ImmutableMap.of() + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_2) + )); + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of(deleteRequest2)); + + EasyMock.replay(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killOlderThan(TIME_NOW); + + EasyMock.verify(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + } + + @Test + public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException + { + S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); + + ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + ListObjectsV2Result result1 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary1)); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of( + request1, result1 + ), + ImmutableMap.of(request1, RECOVERABLE_EXCEPTION) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of()); + + EasyMock.replay(objectSummary1, request1, result1, s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killOlderThan(TIME_NOW); + + EasyMock.verify(objectSummary1, request1, result1, s3Client, timeSupplier); + } + + @Test + public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() + { + boolean ioExceptionThrown = false; + ListObjectsV2Request request1 = null; + try { + request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + + s3Client = S3TestUtils.mockS3ClientListObjectsV2( + ImmutableMap.of(), + ImmutableMap.of(request1, NON_RECOVERABLE_EXCEPTION) + ); + + EasyMock.replay(request1, s3Client, timeSupplier); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setS3Prefix(TEST_PREFIX); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + inputDataConfig.setMaxListingLength(MAX_KEYS); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + s3TaskLogs.killOlderThan(TIME_NOW); + } + catch (IOException e) { + ioExceptionThrown = true; + } + + Assert.assertTrue(ioExceptionThrown); + + EasyMock.verify(request1, s3Client, timeSupplier); + } + + private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception + { EasyMock.expect(s3Client.putObject(EasyMock.anyObject())) - .andReturn(new PutObjectResult()) - .once(); + .andReturn(new PutObjectResult()) + .once(); AccessControlList aclExpected = new AccessControlList(); aclExpected.setOwner(new Owner(ownerId, ownerDisplayName)); - EasyMock.expect(s3Client.getBucketAcl("test_bucket")) - .andReturn(aclExpected) - .once(); + EasyMock.expect(s3Client.getBucketAcl(TEST_BUCKET)) + .andReturn(aclExpected) + .once(); EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class))) - .andReturn(new PutObjectResult()) - .once(); + .andReturn(new PutObjectResult()) + .once(); EasyMock.replay(s3Client); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setDisableAcl(disableAcl); - config.setS3Bucket("test_bucket"); - S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config); + config.setS3Bucket(TEST_BUCKET); + TimeSupplier timeSupplier = new TimeSupplier(); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z"; File logFile = tempFolder.newFile("test_log_file"); @@ -102,5 +369,4 @@ private List testPushInternal(boolean disableAcl, String ownerId, String return grantsAsList; } - } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java new file mode 100644 index 000000000000..2a7da78175d4 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import junit.framework.AssertionFailedError; +import org.apache.commons.collections4.map.HashedMap; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.easymock.IArgumentMatcher; +import org.easymock.IExpectationSetters; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class S3TestUtils extends EasyMockSupport +{ + public static ListObjectsV2Request listObjectsV2RequestArgumentMatcher(ListObjectsV2Request listObjectsV2Request) + { + EasyMock.reportMatcher(new IArgumentMatcher() + { + @Override + public boolean matches(Object argument) + { + + return argument instanceof ListObjectsV2Request + && listObjectsV2Request.getBucketName().equals(((ListObjectsV2Request) argument).getBucketName()) + && listObjectsV2Request.getPrefix().equals(((ListObjectsV2Request) argument).getPrefix()) + && ((listObjectsV2Request.getContinuationToken() == null + && ((ListObjectsV2Request) argument).getContinuationToken() == null) + || (listObjectsV2Request.getContinuationToken() + .equals(((ListObjectsV2Request) argument).getContinuationToken()))) + && listObjectsV2Request.getMaxKeys().equals(((ListObjectsV2Request) argument).getMaxKeys()); + } + + @Override + public void appendTo(StringBuffer buffer) + { + String str = "ListObjectsV2Request(\"bucketName:\" \"" + + listObjectsV2Request.getBucketName() + + "\", \"prefix:\"" + + listObjectsV2Request.getPrefix() + + "\", \"continuationToken:\"" + + listObjectsV2Request.getContinuationToken() + + "\", \"maxKeys:\"" + + listObjectsV2Request.getMaxKeys() + + "\")"; + buffer.append(str); + } + }); + return null; + } + + public static DeleteObjectsRequest deleteObjectsRequestArgumentMatcher(DeleteObjectsRequest deleteObjectsRequest) + { + EasyMock.reportMatcher(new IArgumentMatcher() + { + @Override + public boolean matches(Object argument) + { + + boolean matches = argument instanceof DeleteObjectsRequest + && deleteObjectsRequest.getBucketName() + .equals(((DeleteObjectsRequest) argument).getBucketName()) + && deleteObjectsRequest.getKeys().size() == ((DeleteObjectsRequest) argument).getKeys() + .size(); + if (matches) { + Map expectedKeysAndVersions = deleteObjectsRequest.getKeys().stream().collect( + Collectors.toMap(DeleteObjectsRequest.KeyVersion::getKey, x -> { + return x.getVersion() == null ? "null" : x.getVersion(); + })); + Map actualKeysAndVersions = ((DeleteObjectsRequest) argument).getKeys().stream().collect( + Collectors.toMap(DeleteObjectsRequest.KeyVersion::getKey, x -> { + return x.getVersion() == null ? "null" : x.getVersion(); + })); + matches = expectedKeysAndVersions.equals(actualKeysAndVersions); + } + return matches; + } + + @Override + public void appendTo(StringBuffer buffer) + { + String str = "DeleteObjectsRequest(\"bucketName:\" \"" + + deleteObjectsRequest.getBucketName() + + "\", \"keys:\"" + + deleteObjectsRequest.getKeys() + + "\")"; + buffer.append(str); + } + }); + return null; + } + + public static S3ObjectSummary mockS3ObjectSummary(long lastModified, String key) + { + S3ObjectSummary objectSummary = EasyMock.createMock(S3ObjectSummary.class); + EasyMock.expect(objectSummary.getLastModified()).andReturn(new Date(lastModified)); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(objectSummary.getKey()).andReturn(key); + EasyMock.expectLastCall().anyTimes(); + return objectSummary; + } + + public static ListObjectsV2Request mockRequest( + String bucket, + String prefix, + int maxKeys, + String continuationToken + ) + { + ListObjectsV2Request request = EasyMock.createMock(ListObjectsV2Request.class); + EasyMock.expect(request.getBucketName()).andReturn(bucket); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(request.getPrefix()).andReturn(prefix); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(request.getMaxKeys()).andReturn(maxKeys); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(request.getContinuationToken()).andReturn(continuationToken); + EasyMock.expectLastCall().anyTimes(); + return request; + } + + public static ListObjectsV2Result mockResult( + String continuationToken, + boolean isTruncated, + List objectSummaries + ) + { + ListObjectsV2Result result = EasyMock.createMock(ListObjectsV2Result.class); + EasyMock.expect(result.getContinuationToken()).andReturn(continuationToken); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(result.isTruncated()).andReturn(isTruncated); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(result.getObjectSummaries()).andReturn(objectSummaries); + return result; + } + + public static ServerSideEncryptingAmazonS3 mockS3ClientListObjectsV2( + Map requestsToResults, + Map requestsToExceptions + ) + { + Map> requestToResultExpectationSetter = new HashedMap<>(); + ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class); + for (Map.Entry requestsAndErrors : requestsToExceptions.entrySet()) { + ListObjectsV2Request request = requestsAndErrors.getKey(); + Exception exception = requestsAndErrors.getValue(); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); + if (resultExpectationSetter == null) { + s3Client.listObjectsV2( + S3TestUtils.listObjectsV2RequestArgumentMatcher(request)); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + requestToResultExpectationSetter.put(request, resultExpectationSetter); + } else { + resultExpectationSetter.andThrow(exception); + } + } + + for (Map.Entry requestsAndResults : requestsToResults.entrySet()) { + ListObjectsV2Request request = requestsAndResults.getKey(); + ListObjectsV2Result result = requestsAndResults.getValue(); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); + if (resultExpectationSetter == null) { + resultExpectationSetter = EasyMock.expect(s3Client.listObjectsV2( + S3TestUtils.listObjectsV2RequestArgumentMatcher(request))); + requestToResultExpectationSetter.put(request, resultExpectationSetter); + } + resultExpectationSetter.andReturn(result); + } + return s3Client; + } + + public static void mockS3ClientDeleteObjects( + ServerSideEncryptingAmazonS3 s3Client, + List deleteRequestsExpected, + List deleteRequestsNotExpected + ) + { + for (DeleteObjectsRequest deleteRequestExpected : deleteRequestsExpected) { + s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(deleteRequestExpected)); + EasyMock.expectLastCall(); + } + + for (DeleteObjectsRequest deleteRequestNotExpected : deleteRequestsNotExpected) { + s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(deleteRequestNotExpected)); + EasyMock.expectLastCall().andThrow(new AssertionFailedError()).anyTimes(); + } + } +} diff --git a/pom.xml b/pom.xml index f229d32dd74f..6368f232948f 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,7 @@ 2.8.5 2.0.2 - 1.11.199 + 1.11.735 2.8.0 3.4.14 From 5d505a1d23d79c88261011d5abd9d670401b107b Mon Sep 17 00:00:00 2001 From: zachary sherman Date: Thu, 5 Mar 2020 00:14:22 -0800 Subject: [PATCH 2/5] * update licenses for updated AWS SDK version --- licenses.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/licenses.yaml b/licenses.yaml index c352faa57dd7..cd05c8b87aa5 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -140,7 +140,7 @@ name: AWS SDK for Java license_category: binary module: java-core license_name: Apache License version 2.0 -version: 1.11.199 +version: 1.11.735 libraries: - com.amazonaws: aws-java-sdk-core - com.amazonaws: aws-java-sdk-ec2 From 2af8070937c585eefa012a1dec96751fa0e562fc Mon Sep 17 00:00:00 2001 From: zachary sherman Date: Thu, 5 Mar 2020 16:57:34 -0800 Subject: [PATCH 3/5] * fix bug in iterating through results from S3 * revert back to original version of AWS SDK --- .../org/apache/druid/storage/s3/S3DataSegmentKiller.java | 2 +- .../main/java/org/apache/druid/storage/s3/S3TaskLogs.java | 7 ++++--- .../test/java/org/apache/druid/storage/s3/S3TestUtils.java | 2 +- licenses.yaml | 2 +- pom.xml | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index edecbc9b35b3..9fed147015fe 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -118,7 +118,7 @@ public void killAll() throws IOException .withKeys(keyVersionsToDelete); s3Client.deleteObjects(deleteRequest); - continuationToken = result.getContinuationToken(); + continuationToken = result.getNextContinuationToken(); } while (result.isTruncated()); return null; } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 19fcd5e72d12..5b5f50e6a284 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -40,6 +40,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -190,8 +191,8 @@ public void killOlderThan(long timestamp) throws IOException ListObjectsV2Result result; String continuationToken = null; do { - log.info("Deleting batch of %d task logs from s3 location [bucket: %s prefix: %s].", - maxListingLength, bucketName, prefix + log.info("Deleting batch of %d task logs from s3 location [bucket: %s prefix: %s] older than %s", + maxListingLength, bucketName, prefix, new Date(timestamp) ); ListObjectsV2Request request = new ListObjectsV2Request() .withBucketName(bucketName) @@ -217,7 +218,7 @@ public void killOlderThan(long timestamp) throws IOException service.deleteObjects(deleteRequest); } - continuationToken = result.getContinuationToken(); + continuationToken = result.getNextContinuationToken(); } while (result.isTruncated()); return null; } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java index 2a7da78175d4..b605ee3d601f 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java @@ -150,7 +150,7 @@ public static ListObjectsV2Result mockResult( ) { ListObjectsV2Result result = EasyMock.createMock(ListObjectsV2Result.class); - EasyMock.expect(result.getContinuationToken()).andReturn(continuationToken); + EasyMock.expect(result.getNextContinuationToken()).andReturn(continuationToken); EasyMock.expectLastCall().anyTimes(); EasyMock.expect(result.isTruncated()).andReturn(isTruncated); EasyMock.expectLastCall().anyTimes(); diff --git a/licenses.yaml b/licenses.yaml index cd05c8b87aa5..c352faa57dd7 100644 --- a/licenses.yaml +++ b/licenses.yaml @@ -140,7 +140,7 @@ name: AWS SDK for Java license_category: binary module: java-core license_name: Apache License version 2.0 -version: 1.11.735 +version: 1.11.199 libraries: - com.amazonaws: aws-java-sdk-core - com.amazonaws: aws-java-sdk-ec2 diff --git a/pom.xml b/pom.xml index 6368f232948f..f229d32dd74f 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,7 @@ 2.8.5 2.0.2 - 1.11.735 + 1.11.199 2.8.0 3.4.14 From 345c22e98636cb96bb500f07bb06d937b6c3c782 Mon Sep 17 00:00:00 2001 From: zachary sherman Date: Mon, 9 Mar 2020 17:52:51 -0700 Subject: [PATCH 4/5] * Address review comments --- ...er.java => CurrentTimeMillisSupplier.java} | 6 +- .../druid/storage/s3/S3DataSegmentKiller.java | 50 +---- .../apache/druid/storage/s3/S3TaskLogs.java | 62 ++---- .../org/apache/druid/storage/s3/S3Utils.java | 52 +++++ .../storage/s3/S3DataSegmentKillerTest.java | 101 +++++----- .../druid/storage/s3/S3TaskLogsTest.java | 187 +++++++++--------- .../apache/druid/storage/s3/S3TestUtils.java | 182 +++++++---------- 7 files changed, 302 insertions(+), 338 deletions(-) rename core/src/main/java/org/apache/druid/common/utils/{TimeSupplier.java => CurrentTimeMillisSupplier.java} (87%) diff --git a/core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java b/core/src/main/java/org/apache/druid/common/utils/CurrentTimeMillisSupplier.java similarity index 87% rename from core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java rename to core/src/main/java/org/apache/druid/common/utils/CurrentTimeMillisSupplier.java index 08fa6b14ef60..d540c781e06d 100644 --- a/core/src/main/java/org/apache/druid/common/utils/TimeSupplier.java +++ b/core/src/main/java/org/apache/druid/common/utils/CurrentTimeMillisSupplier.java @@ -20,12 +20,12 @@ package org.apache.druid.common.utils; -import java.util.function.Supplier; +import java.util.function.LongSupplier; -public class TimeSupplier implements Supplier +public class CurrentTimeMillisSupplier implements LongSupplier { @Override - public Long get() + public long getAsLong() { return System.currentTimeMillis(); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java index 9fed147015fe..cffc3d36399f 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java @@ -20,10 +20,7 @@ package org.apache.druid.storage.s3; import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.base.Predicates; import com.google.inject.Inject; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -32,9 +29,7 @@ import org.apache.druid.timeline.DataSegment; import java.io.IOException; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * @@ -87,41 +82,16 @@ public void kill(DataSegment segment) throws SegmentLoadingException @Override public void killAll() throws IOException { + log.info("Deleting all segment files from s3 location [bucket: '%s' prefix: '%s']", + segmentPusherConfig.getBucket(), segmentPusherConfig.getBaseKey() + ); try { - S3Utils.retryS3Operation( - () -> { - String bucketName = segmentPusherConfig.getBucket(); - String prefix = segmentPusherConfig.getBaseKey(); - int maxListingLength = inputDataConfig.getMaxListingLength(); - ListObjectsV2Result result; - String continuationToken = null; - do { - log.info("Deleting batch of %d segment files from s3 location [bucket: %s prefix: %s].", - maxListingLength, bucketName, prefix - ); - ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(bucketName) - .withPrefix(prefix) - .withContinuationToken(continuationToken) - .withMaxKeys(maxListingLength); - - result = s3Client.listObjectsV2(request); - List objectSummaries = result.getObjectSummaries(); - - List keyVersionsToDelete = - objectSummaries.stream() - .map(x -> new DeleteObjectsRequest.KeyVersion(x.getKey())) - .collect(Collectors.toList()); - - DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName) - .withBucketName(bucketName) - .withKeys(keyVersionsToDelete); - s3Client.deleteObjects(deleteRequest); - - continuationToken = result.getNextContinuationToken(); - } while (result.isTruncated()); - return null; - } + S3Utils.deleteObjectsInPath( + s3Client, + inputDataConfig, + segmentPusherConfig.getBucket(), + segmentPusherConfig.getBaseKey(), + Predicates.alwaysTrue() ); } catch (Exception e) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 5b5f50e6a284..75fadf6e17ea 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -21,17 +21,13 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import com.google.inject.Inject; -import org.apache.druid.common.utils.TimeSupplier; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -41,8 +37,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.Date; -import java.util.List; -import java.util.stream.Collectors; /** * Provides task logs archived on S3. @@ -54,14 +48,14 @@ public class S3TaskLogs implements TaskLogs private final ServerSideEncryptingAmazonS3 service; private final S3TaskLogsConfig config; private final S3InputDataConfig inputDataConfig; - private final TimeSupplier timeSupplier; + private final CurrentTimeMillisSupplier timeSupplier; @Inject public S3TaskLogs( ServerSideEncryptingAmazonS3 service, S3TaskLogsConfig config, S3InputDataConfig inputDataConfig, - TimeSupplier timeSupplier + CurrentTimeMillisSupplier timeSupplier ) { this.service = service; @@ -175,53 +169,23 @@ public void killAll() throws IOException config.getS3Bucket(), config.getS3Prefix() ); - long now = timeSupplier.get(); + long now = timeSupplier.getAsLong(); killOlderThan(now); } @Override public void killOlderThan(long timestamp) throws IOException { + log.info("Deleting all task logs from s3 location [bucket: '%s' prefix: '%s'] older than %s.", + config.getS3Bucket(), config.getS3Prefix(), new Date(timestamp) + ); try { - S3Utils.retryS3Operation( - () -> { - String bucketName = config.getS3Bucket(); - String prefix = config.getS3Prefix(); - int maxListingLength = inputDataConfig.getMaxListingLength(); - ListObjectsV2Result result; - String continuationToken = null; - do { - log.info("Deleting batch of %d task logs from s3 location [bucket: %s prefix: %s] older than %s", - maxListingLength, bucketName, prefix, new Date(timestamp) - ); - ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(bucketName) - .withPrefix(prefix) - .withContinuationToken(continuationToken) - .withMaxKeys(maxListingLength); - - result = service.listObjectsV2(request); - List objectSummaries = result.getObjectSummaries(); - - List keyVersionsToDelete = - objectSummaries.stream() - .filter(x -> x.getLastModified().getTime() < timestamp) - .map(x -> new DeleteObjectsRequest.KeyVersion( - x.getKey())) - .collect( - Collectors.toList()); - - DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName) - .withBucketName(bucketName) - .withKeys(keyVersionsToDelete); - if (!deleteRequest.getKeys().isEmpty()) { - service.deleteObjects(deleteRequest); - } - - continuationToken = result.getNextContinuationToken(); - } while (result.isTruncated()); - return null; - } + S3Utils.deleteObjectsInPath( + service, + inputDataConfig, + config.getS3Bucket(), + config.getS3Prefix(), + (object) -> object.getLastModified().getTime() < timestamp ); } catch (Exception e) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index eb704ed819e1..e19639a41d71 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.Grant; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; @@ -31,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; @@ -41,7 +43,9 @@ import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; /** * @@ -200,6 +204,54 @@ public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS return objectSummary; } + public static void deleteObjectsInPath( + ServerSideEncryptingAmazonS3 s3Client, + S3InputDataConfig config, + String bucket, + String prefix, + Predicate filter + ) + throws Exception + { + final List keysToDelete = new ArrayList<>(config.getMaxListingLength()); + final ObjectSummaryIterator iterator = new ObjectSummaryIterator( + s3Client, + ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")), + config.getMaxListingLength() + ); + + while (iterator.hasNext()) { + final S3ObjectSummary nextObject = iterator.next(); + if (filter.apply(nextObject)) { + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey())); + if (keysToDelete.size() == config.getMaxListingLength()) { + deleteBucketKeys(s3Client, bucket, keysToDelete); + log.info("Deleted %d files", keysToDelete.size()); + keysToDelete.clear(); + } + } + } + + if (keysToDelete.size() > 0) { + deleteBucketKeys(s3Client, bucket, keysToDelete); + log.info("Deleted %d files", keysToDelete.size()); + } + } + + public static void deleteBucketKeys( + ServerSideEncryptingAmazonS3 s3Client, + String bucket, + List keysToDelete + ) + throws Exception + { + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete); + S3Utils.retryS3Operation(() -> { + s3Client.deleteObjects(deleteRequest); + return null; + }); + } + /** * Uploads a file to S3 if possible. First trying to set ACL to give the bucket owner full control of the file before uploading. * diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java index 453811da191b..6260d3a25d26 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java @@ -21,11 +21,10 @@ import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -35,6 +34,7 @@ import org.junit.runner.RunWith; import java.io.IOException; +import java.net.URI; @RunWith(EasyMockRunner.class) public class S3DataSegmentKillerTest extends EasyMockSupport @@ -43,38 +43,32 @@ public class S3DataSegmentKillerTest extends EasyMockSupport private static final String KEY_2 = "key2"; private static final String TEST_BUCKET = "test_bucket"; private static final String TEST_PREFIX = "test_prefix"; - private static final String CONTINUATION_STRING = "continuationToken"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX)); private static final long TIME_0 = 0L; private static final long TIME_1 = 1L; private static final int MAX_KEYS = 1; private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); - @Mock private ServerSideEncryptingAmazonS3 s3Client; - @Mock private S3DataSegmentPusherConfig segmentPusherConfig; - @Mock private S3InputDataConfig inputDataConfig; + @Mock + private ServerSideEncryptingAmazonS3 s3Client; + @Mock + private S3DataSegmentPusherConfig segmentPusherConfig; + @Mock + private S3InputDataConfig inputDataConfig; private S3DataSegmentKiller segmentKiller; @Test public void test_killAll_noException_deletesAllSegments() throws IOException { - S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); - S3ObjectSummary objectSummary2 = S3TestUtils.mockS3ObjectSummary(TIME_1, KEY_2); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1); - - ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); - ListObjectsV2Result result1 = S3TestUtils.mockResult(CONTINUATION_STRING, true, ImmutableList.of(objectSummary1)); - - ListObjectsV2Request request2 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, CONTINUATION_STRING); - ListObjectsV2Result result2 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary2)); - - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of( - request1, result1, - request2, result2 - ), - ImmutableMap.of() + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1, objectSummary2) ); DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) @@ -88,7 +82,11 @@ public void test_killAll_noException_deletesAllSegments() throws IOException new DeleteObjectsRequest.KeyVersion(KEY_2) )); - S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1, deleteRequest2), ImmutableList.of()); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1, deleteRequest2), + ImmutableMap.of() + ); EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); EasyMock.expectLastCall().anyTimes(); @@ -98,27 +96,22 @@ public void test_killAll_noException_deletesAllSegments() throws IOException EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, segmentPusherConfig, inputDataConfig); + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); segmentKiller.killAll(); - EasyMock.verify(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, segmentPusherConfig, inputDataConfig); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); } @Test public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws IOException { - S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); - - - ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); - ListObjectsV2Result result1 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary1)); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of( - request1, result1 - ), - ImmutableMap.of(request1, RECOVERABLE_EXCEPTION) + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) ); DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) @@ -127,7 +120,11 @@ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegmen new DeleteObjectsRequest.KeyVersion(KEY_1) )); - S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of()); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1), + ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION) + ); EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); EasyMock.expectLastCall().anyTimes(); @@ -137,26 +134,39 @@ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegmen EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(objectSummary1, request1, result1, s3Client, segmentPusherConfig, inputDataConfig); + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); segmentKiller.killAll(); - EasyMock.verify(objectSummary1, request1, result1, s3Client, segmentPusherConfig, inputDataConfig); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); } @Test public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() { boolean ioExceptionThrown = false; - ListObjectsV2Request request1 = null; try { - request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); + + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of(), - ImmutableMap.of(request1, NON_RECOVERABLE_EXCEPTION) + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(), + ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION) ); + EasyMock.expect(segmentPusherConfig.getBucket()).andReturn(TEST_BUCKET); EasyMock.expectLastCall().anyTimes(); EasyMock.expect(segmentPusherConfig.getBaseKey()).andReturn(TEST_PREFIX); @@ -165,7 +175,7 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS); EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(request1, s3Client, segmentPusherConfig, inputDataConfig); + EasyMock.replay(s3Client, segmentPusherConfig, inputDataConfig); segmentKiller = new S3DataSegmentKiller(s3Client, segmentPusherConfig, inputDataConfig); segmentKiller.killAll(); @@ -175,9 +185,6 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSeg } Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(request1, s3Client, segmentPusherConfig, inputDataConfig); + EasyMock.verify(s3Client, segmentPusherConfig, inputDataConfig); } - - - } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java index 4282965e3845..502897f01045 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -23,8 +23,6 @@ import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.Grant; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; import com.amazonaws.services.s3.model.PutObjectRequest; @@ -32,7 +30,8 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.common.utils.TimeSupplier; +import org.apache.druid.common.utils.CurrentTimeMillisSupplier; +import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -45,6 +44,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.List; @RunWith(EasyMockRunner.class) @@ -55,7 +55,7 @@ public class S3TaskLogsTest extends EasyMockSupport private static final String KEY_2 = "key2"; private static final String TEST_BUCKET = "test_bucket"; private static final String TEST_PREFIX = "test_prefix"; - private static final String CONTINUATION_STRING = "continuationToken"; + private static final URI PREFIX_URI = URI.create(StringUtils.format("s3://%s/%s", TEST_BUCKET, TEST_PREFIX)); private static final long TIME_0 = 0L; private static final long TIME_1 = 1L; private static final long TIME_NOW = 2L; @@ -64,8 +64,10 @@ public class S3TaskLogsTest extends EasyMockSupport private static final Exception RECOVERABLE_EXCEPTION = new SdkClientException(new IOException()); private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); - @Mock private TimeSupplier timeSupplier; - @Mock private ServerSideEncryptingAmazonS3 s3Client; + @Mock + private CurrentTimeMillisSupplier timeSupplier; + @Mock + private ServerSideEncryptingAmazonS3 s3Client; @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); @@ -104,23 +106,15 @@ public void testTaskLogsPushWithAclEnabled() throws Exception @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { - S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); - S3ObjectSummary objectSummary2 = S3TestUtils.mockS3ObjectSummary(TIME_1, KEY_2); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_1); - EasyMock.expect(timeSupplier.get()).andReturn(TIME_NOW); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); - ListObjectsV2Result result1 = S3TestUtils.mockResult(CONTINUATION_STRING, true, ImmutableList.of(objectSummary1)); - - ListObjectsV2Request request2 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, CONTINUATION_STRING); - ListObjectsV2Result result2 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary2)); - - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of( - request1, result1, - request2, result2 - ), - ImmutableMap.of() + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1, objectSummary2) ); DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) @@ -134,9 +128,13 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException new DeleteObjectsRequest.KeyVersion(KEY_2) )); - S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1, deleteRequest2), ImmutableList.of()); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1, deleteRequest2), + ImmutableMap.of() + ); - EasyMock.replay(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + EasyMock.replay(s3Client, timeSupplier); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setS3Bucket(TEST_BUCKET); @@ -146,24 +144,20 @@ public void test_killAll_noException_deletesAllTaskLogs() throws IOException S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); s3TaskLogs.killAll(); - EasyMock.verify(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + EasyMock.verify(s3Client, timeSupplier); } @Test - public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException + public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws IOException { - S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); - - EasyMock.expect(timeSupplier.get()).andReturn(TIME_NOW); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); - ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); - ListObjectsV2Result result1 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary1)); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of( - request1, result1 - ), - ImmutableMap.of(request1, RECOVERABLE_EXCEPTION) + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) ); DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) @@ -172,9 +166,13 @@ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllTaskLo new DeleteObjectsRequest.KeyVersion(KEY_1) )); - S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of()); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1), + ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION) + ); - EasyMock.replay(objectSummary1, request1, result1, s3Client, timeSupplier); + EasyMock.replay(s3Client, timeSupplier); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setS3Bucket(TEST_BUCKET); @@ -184,25 +182,34 @@ public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllTaskLo S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); s3TaskLogs.killAll(); - EasyMock.verify(objectSummary1, request1, result1, s3Client, timeSupplier); + EasyMock.verify(s3Client, timeSupplier); } @Test public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - ListObjectsV2Request request1 = null; try { - EasyMock.expect(timeSupplier.get()).andReturn(TIME_NOW); - - request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW); + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of(), - ImmutableMap.of(request1, NON_RECOVERABLE_EXCEPTION) + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(), + ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION) ); - EasyMock.replay(request1, s3Client, timeSupplier); + EasyMock.replay(s3Client, timeSupplier); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setS3Bucket(TEST_BUCKET); @@ -218,27 +225,19 @@ public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteA Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(request1, s3Client, timeSupplier); + EasyMock.verify(s3Client, timeSupplier); } @Test public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws IOException { - S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); - S3ObjectSummary objectSummary2 = S3TestUtils.mockS3ObjectSummary(TIME_FUTURE, KEY_2); - - ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); - ListObjectsV2Result result1 = S3TestUtils.mockResult(CONTINUATION_STRING, true, ImmutableList.of(objectSummary1)); - - ListObjectsV2Request request2 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, CONTINUATION_STRING); - ListObjectsV2Result result2 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary2)); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3ObjectSummary objectSummary2 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_2, TIME_FUTURE); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of( - request1, result1, - request2, result2 - ), - ImmutableMap.of() + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1, objectSummary2) ); DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) @@ -246,14 +245,10 @@ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws .withKeys(ImmutableList.of( new DeleteObjectsRequest.KeyVersion(KEY_1) )); - DeleteObjectsRequest deleteRequest2 = new DeleteObjectsRequest(TEST_BUCKET) - .withBucketName(TEST_BUCKET) - .withKeys(ImmutableList.of( - new DeleteObjectsRequest.KeyVersion(KEY_2) - )); - S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of(deleteRequest2)); - EasyMock.replay(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableMap.of()); + + EasyMock.replay(s3Client, timeSupplier); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setS3Bucket(TEST_BUCKET); @@ -263,22 +258,18 @@ public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); s3TaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(objectSummary1, objectSummary2, request1, request2, result1, result2, s3Client, timeSupplier); + EasyMock.verify(s3Client, timeSupplier); } @Test public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAllTaskLogs() throws IOException { - S3ObjectSummary objectSummary1 = S3TestUtils.mockS3ObjectSummary(TIME_0, KEY_1); - - ListObjectsV2Request request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); - ListObjectsV2Result result1 = S3TestUtils.mockResult(null, false, ImmutableList.of(objectSummary1)); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of( - request1, result1 - ), - ImmutableMap.of(request1, RECOVERABLE_EXCEPTION) + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) ); DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) @@ -287,9 +278,13 @@ public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAll new DeleteObjectsRequest.KeyVersion(KEY_1) )); - S3TestUtils.mockS3ClientDeleteObjects(s3Client, ImmutableList.of(deleteRequest1), ImmutableList.of()); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(deleteRequest1), + ImmutableMap.of(deleteRequest1, RECOVERABLE_EXCEPTION) + ); - EasyMock.replay(objectSummary1, request1, result1, s3Client, timeSupplier); + EasyMock.replay(s3Client, timeSupplier); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setS3Bucket(TEST_BUCKET); @@ -299,23 +294,33 @@ public void test_killOlderThan_recoverableExceptionWhenListingObjects_deletesAll S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); s3TaskLogs.killOlderThan(TIME_NOW); - EasyMock.verify(objectSummary1, request1, result1, s3Client, timeSupplier); + EasyMock.verify(s3Client, timeSupplier); } @Test public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() { boolean ioExceptionThrown = false; - ListObjectsV2Request request1 = null; try { - request1 = S3TestUtils.mockRequest(TEST_BUCKET, TEST_PREFIX, MAX_KEYS, null); + S3ObjectSummary objectSummary1 = S3TestUtils.newS3ObjectSummary(TEST_BUCKET, KEY_1, TIME_0); + S3TestUtils.expectListObjects( + s3Client, + PREFIX_URI, + ImmutableList.of(objectSummary1) + ); - s3Client = S3TestUtils.mockS3ClientListObjectsV2( - ImmutableMap.of(), - ImmutableMap.of(request1, NON_RECOVERABLE_EXCEPTION) + DeleteObjectsRequest deleteRequest1 = new DeleteObjectsRequest(TEST_BUCKET) + .withBucketName(TEST_BUCKET) + .withKeys(ImmutableList.of( + new DeleteObjectsRequest.KeyVersion(KEY_1) + )); + S3TestUtils.mockS3ClientDeleteObjects( + s3Client, + ImmutableList.of(), + ImmutableMap.of(deleteRequest1, NON_RECOVERABLE_EXCEPTION) ); - EasyMock.replay(request1, s3Client, timeSupplier); + EasyMock.replay(s3Client, timeSupplier); S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setS3Bucket(TEST_BUCKET); @@ -331,7 +336,7 @@ public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntD Assert.assertTrue(ioExceptionThrown); - EasyMock.verify(request1, s3Client, timeSupplier); + EasyMock.verify(s3Client, timeSupplier); } private List testPushInternal(boolean disableAcl, String ownerId, String ownerDisplayName) throws Exception @@ -356,7 +361,7 @@ private List testPushInternal(boolean disableAcl, String ownerId, String S3TaskLogsConfig config = new S3TaskLogsConfig(); config.setDisableAcl(disableAcl); config.setS3Bucket(TEST_BUCKET); - TimeSupplier timeSupplier = new TimeSupplier(); + CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier(); S3InputDataConfig inputDataConfig = new S3InputDataConfig(); S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); @@ -365,8 +370,6 @@ private List testPushInternal(boolean disableAcl, String ownerId, String s3TaskLogs.pushTaskLog(taskId, logFile); - List grantsAsList = aclExpected.getGrantsAsList(); - - return grantsAsList; + return aclExpected.getGrantsAsList(); } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java index b605ee3d601f..165dec42f9b8 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java @@ -23,13 +23,16 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3ObjectSummary; -import junit.framework.AssertionFailedError; import org.apache.commons.collections4.map.HashedMap; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.easymock.IArgumentMatcher; import org.easymock.IExpectationSetters; +import org.joda.time.DateTime; +import java.net.URI; import java.util.Date; import java.util.List; import java.util.Map; @@ -37,41 +40,9 @@ public class S3TestUtils extends EasyMockSupport { - public static ListObjectsV2Request listObjectsV2RequestArgumentMatcher(ListObjectsV2Request listObjectsV2Request) - { - EasyMock.reportMatcher(new IArgumentMatcher() - { - @Override - public boolean matches(Object argument) - { - - return argument instanceof ListObjectsV2Request - && listObjectsV2Request.getBucketName().equals(((ListObjectsV2Request) argument).getBucketName()) - && listObjectsV2Request.getPrefix().equals(((ListObjectsV2Request) argument).getPrefix()) - && ((listObjectsV2Request.getContinuationToken() == null - && ((ListObjectsV2Request) argument).getContinuationToken() == null) - || (listObjectsV2Request.getContinuationToken() - .equals(((ListObjectsV2Request) argument).getContinuationToken()))) - && listObjectsV2Request.getMaxKeys().equals(((ListObjectsV2Request) argument).getMaxKeys()); - } - - @Override - public void appendTo(StringBuffer buffer) - { - String str = "ListObjectsV2Request(\"bucketName:\" \"" - + listObjectsV2Request.getBucketName() - + "\", \"prefix:\"" - + listObjectsV2Request.getPrefix() - + "\", \"continuationToken:\"" - + listObjectsV2Request.getContinuationToken() - + "\", \"maxKeys:\"" - + listObjectsV2Request.getMaxKeys() - + "\")"; - buffer.append(str); - } - }); - return null; - } + private static final DateTime NOW = DateTimes.nowUtc(); + private static final byte[] CONTENT = + StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); public static DeleteObjectsRequest deleteObjectsRequestArgumentMatcher(DeleteObjectsRequest deleteObjectsRequest) { @@ -114,99 +85,96 @@ public void appendTo(StringBuffer buffer) return null; } - public static S3ObjectSummary mockS3ObjectSummary(long lastModified, String key) + public static void expectListObjects( + ServerSideEncryptingAmazonS3 s3Client, + URI prefix, + List objectSummaries) { - S3ObjectSummary objectSummary = EasyMock.createMock(S3ObjectSummary.class); - EasyMock.expect(objectSummary.getLastModified()).andReturn(new Date(lastModified)); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(objectSummary.getKey()).andReturn(key); - EasyMock.expectLastCall().anyTimes(); - return objectSummary; - } + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setBucketName(prefix.getAuthority()); + result.setKeyCount(objectSummaries.size()); + for (S3ObjectSummary objectSummary : objectSummaries) { + result.getObjectSummaries().add(objectSummary); + } - public static ListObjectsV2Request mockRequest( - String bucket, - String prefix, - int maxKeys, - String continuationToken - ) - { - ListObjectsV2Request request = EasyMock.createMock(ListObjectsV2Request.class); - EasyMock.expect(request.getBucketName()).andReturn(bucket); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(request.getPrefix()).andReturn(prefix); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(request.getMaxKeys()).andReturn(maxKeys); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(request.getContinuationToken()).andReturn(continuationToken); - EasyMock.expectLastCall().anyTimes(); - return request; + EasyMock.expect( + s3Client.listObjectsV2(matchListObjectsRequest(prefix)) + ).andReturn(result).once(); } - public static ListObjectsV2Result mockResult( - String continuationToken, - boolean isTruncated, - List objectSummaries + public static void mockS3ClientDeleteObjects( + ServerSideEncryptingAmazonS3 s3Client, + List deleteRequestsExpected, + Map requestToException ) { - ListObjectsV2Result result = EasyMock.createMock(ListObjectsV2Result.class); - EasyMock.expect(result.getNextContinuationToken()).andReturn(continuationToken); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(result.isTruncated()).andReturn(isTruncated); - EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(result.getObjectSummaries()).andReturn(objectSummaries); - return result; - } + Map> requestToResultExpectationSetter = new HashedMap<>(); - public static ServerSideEncryptingAmazonS3 mockS3ClientListObjectsV2( - Map requestsToResults, - Map requestsToExceptions - ) - { - Map> requestToResultExpectationSetter = new HashedMap<>(); - ServerSideEncryptingAmazonS3 s3Client = EasyMock.createMock(ServerSideEncryptingAmazonS3.class); - for (Map.Entry requestsAndErrors : requestsToExceptions.entrySet()) { - ListObjectsV2Request request = requestsAndErrors.getKey(); + for (Map.Entry requestsAndErrors : requestToException.entrySet()) { + DeleteObjectsRequest request = requestsAndErrors.getKey(); Exception exception = requestsAndErrors.getValue(); - IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); if (resultExpectationSetter == null) { - s3Client.listObjectsV2( - S3TestUtils.listObjectsV2RequestArgumentMatcher(request)); - resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); + s3Client.deleteObjects( + S3TestUtils.deleteObjectsRequestArgumentMatcher(request)); + resultExpectationSetter = EasyMock.expectLastCall().andThrow(exception); requestToResultExpectationSetter.put(request, resultExpectationSetter); } else { resultExpectationSetter.andThrow(exception); } } - for (Map.Entry requestsAndResults : requestsToResults.entrySet()) { - ListObjectsV2Request request = requestsAndResults.getKey(); - ListObjectsV2Result result = requestsAndResults.getValue(); - IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); + for (DeleteObjectsRequest request : deleteRequestsExpected) { + IExpectationSetters resultExpectationSetter = requestToResultExpectationSetter.get(request); if (resultExpectationSetter == null) { - resultExpectationSetter = EasyMock.expect(s3Client.listObjectsV2( - S3TestUtils.listObjectsV2RequestArgumentMatcher(request))); + s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(request)); + resultExpectationSetter = EasyMock.expectLastCall(); requestToResultExpectationSetter.put(request, resultExpectationSetter); } - resultExpectationSetter.andReturn(result); + resultExpectationSetter.andVoid(); } - return s3Client; } - public static void mockS3ClientDeleteObjects( - ServerSideEncryptingAmazonS3 s3Client, - List deleteRequestsExpected, - List deleteRequestsNotExpected - ) + public static ListObjectsV2Request matchListObjectsRequest(final URI prefixUri) { - for (DeleteObjectsRequest deleteRequestExpected : deleteRequestsExpected) { - s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(deleteRequestExpected)); - EasyMock.expectLastCall(); - } + // Use an IArgumentMatcher to verify that the request has the correct bucket and prefix. + EasyMock.reportMatcher( + new IArgumentMatcher() + { + @Override + public boolean matches(Object argument) + { + if (!(argument instanceof ListObjectsV2Request)) { + return false; + } + + final ListObjectsV2Request request = (ListObjectsV2Request) argument; + return prefixUri.getAuthority().equals(request.getBucketName()) + && S3Utils.extractS3Key(prefixUri).equals(request.getPrefix()); + } + + @Override + public void appendTo(StringBuffer buffer) + { + buffer.append(""); + } + } + ); - for (DeleteObjectsRequest deleteRequestNotExpected : deleteRequestsNotExpected) { - s3Client.deleteObjects(S3TestUtils.deleteObjectsRequestArgumentMatcher(deleteRequestNotExpected)); - EasyMock.expectLastCall().andThrow(new AssertionFailedError()).anyTimes(); - } + return null; + } + + public static S3ObjectSummary newS3ObjectSummary( + String bucket, + String key, + long lastModifiedTimestamp) + { + S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(key); + objectSummary.setLastModified(new Date(lastModifiedTimestamp)); + objectSummary.setETag("etag"); + objectSummary.setSize(CONTENT.length); + return objectSummary; } } From 1c54e4cddfc0c2fa09259078c645e8465b61b12b Mon Sep 17 00:00:00 2001 From: zachary sherman Date: Tue, 10 Mar 2020 00:01:59 -0700 Subject: [PATCH 5/5] * Fix failing dependency check --- .../test/java/org/apache/druid/storage/s3/S3TestUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java index 165dec42f9b8..b178bb1014f6 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TestUtils.java @@ -23,7 +23,6 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3ObjectSummary; -import org.apache.commons.collections4.map.HashedMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.easymock.EasyMock; @@ -34,6 +33,7 @@ import java.net.URI; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -108,7 +108,7 @@ public static void mockS3ClientDeleteObjects( Map requestToException ) { - Map> requestToResultExpectationSetter = new HashedMap<>(); + Map> requestToResultExpectationSetter = new HashMap<>(); for (Map.Entry requestsAndErrors : requestToException.entrySet()) { DeleteObjectsRequest request = requestsAndErrors.getKey();