diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 48539bb23e7f..77226fc754ce 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -894,6 +894,47 @@ Sample specs:
...
```
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "s3",
+ "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
+ "properties": {
+ "accessKeyId": "KLJ78979SDFdS2",
+ "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd"
+ }
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ ...
+ },
+...
+```
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "s3",
+ "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
+ "properties": {
+ "accessKeyId": "KLJ78979SDFdS2",
+ "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
+ "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
+ }
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ ...
+ },
+...
+```
+
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `s3`.|None|yes|
@@ -917,6 +958,8 @@ Properties Object:
|--------|-----------|-------|---------|
|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's access key|None|yes if secretAccessKey is given|
|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's secret key|None|yes if accessKeyId is given|
+|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no|
+|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no|
**Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.*
diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml
index 30c63f672bbf..1a2d2fc39830 100644
--- a/extensions-core/s3-extensions/pom.xml
+++ b/extensions-core/s3-extensions/pom.xml
@@ -115,8 +115,8 @@
com.amazonaws
aws-java-sdk-sts
- provided
-
+ ${aws.sdk.version}
+
org.apache.druid
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 22c10680f809..24fd99cf5202 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -19,12 +19,17 @@
package org.apache.druid.data.input.s3;
+import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -35,6 +40,7 @@
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
@@ -47,6 +53,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -59,6 +66,7 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("properties")
private final S3InputSourceConfig s3InputSourceConfig;
private final S3InputDataConfig inputDataConfig;
+ private final AWSCredentialsProvider awsCredentialsProvider;
/**
* Constructor for S3InputSource
@@ -84,7 +92,8 @@ public S3InputSource(
@JsonProperty("uris") @Nullable List uris,
@JsonProperty("prefixes") @Nullable List prefixes,
@JsonProperty("objects") @Nullable List objects,
- @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig
+ @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
+ @JacksonInject AWSCredentialsProvider awsCredentialsProvider
)
{
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
@@ -95,13 +104,19 @@ public S3InputSource(
() -> {
if (s3ClientBuilder != null && s3InputSourceConfig != null) {
if (s3InputSourceConfig.isCredentialsConfigured()) {
- AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(
- s3InputSourceConfig.getAccessKeyId().getPassword(),
- s3InputSourceConfig.getSecretAccessKey().getPassword()
- )
- );
- s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(credentials);
+ if (s3InputSourceConfig.getAssumeRoleArn() == null) {
+ s3ClientBuilder
+ .getAmazonS3ClientBuilder()
+ .withCredentials(createStaticCredentialsProvider(s3InputSourceConfig));
+ } else {
+ applyAssumeRole(
+ s3ClientBuilder,
+ s3InputSourceConfig,
+ createStaticCredentialsProvider(s3InputSourceConfig)
+ );
+ }
+ } else {
+ applyAssumeRole(s3ClientBuilder, s3InputSourceConfig, awsCredentialsProvider);
}
return s3ClientBuilder.build();
} else {
@@ -109,6 +124,56 @@ public S3InputSource(
}
}
);
+ this.awsCredentialsProvider = awsCredentialsProvider;
+ }
+
+ @VisibleForTesting
+ public S3InputSource(
+ ServerSideEncryptingAmazonS3 s3Client,
+ ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
+ S3InputDataConfig inputDataConfig,
+ List uris,
+ List prefixes,
+ List objects,
+ S3InputSourceConfig s3InputSourceConfig
+ )
+ {
+ this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
+ }
+
+ private void applyAssumeRole(
+ ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
+ S3InputSourceConfig s3InputSourceConfig,
+ AWSCredentialsProvider awsCredentialsProvider
+ )
+ {
+ String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
+ if (assumeRoleArn != null) {
+ String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString());
+ AWSSecurityTokenService securityTokenService = AWSSecurityTokenServiceClientBuilder.standard()
+ .withCredentials(awsCredentialsProvider)
+ .build();
+ STSAssumeRoleSessionCredentialsProvider.Builder roleCredentialsProviderBuilder;
+ roleCredentialsProviderBuilder = new STSAssumeRoleSessionCredentialsProvider
+ .Builder(assumeRoleArn, roleSessionName).withStsClient(securityTokenService);
+
+ if (s3InputSourceConfig.getAssumeRoleExternalId() != null) {
+ roleCredentialsProviderBuilder.withExternalId(s3InputSourceConfig.getAssumeRoleExternalId());
+ }
+
+ s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(roleCredentialsProviderBuilder.build());
+ }
+ }
+
+ @Nonnull
+ private AWSStaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
+ {
+ return new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(
+ s3InputSourceConfig.getAccessKeyId().getPassword(),
+ s3InputSourceConfig.getSecretAccessKey().getPassword()
+ )
+ );
}
@Nullable
@@ -149,7 +214,8 @@ public SplittableInputSource> withSplit(InputSplit EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@@ -221,6 +223,7 @@ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutC
{
S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class);
EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
+ EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
.andStubReturn(false);
EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
@@ -279,6 +282,7 @@ public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws Exceptio
null,
EXPECTED_LOCATION,
null
+
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
@@ -664,7 +668,25 @@ public static ObjectMapper createS3ObjectMapper()
DruidModule baseModule = new TestS3Module();
final Injector injector = Guice.createInjector(
new ObjectMapperModule(),
- baseModule
+ baseModule,
+ new DruidModule()
+ {
+ @Provides
+ public AWSCredentialsProvider getAWSCredentialsProvider()
+ {
+ return AWSCredentialsUtils.defaultAWSCredentialsProviderChain(null);
+ }
+
+ @Override public List extends Module> getJacksonModules()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override public void configure(Binder binder)
+ {
+
+ }
+ }
);
final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);
diff --git a/website/.spelling b/website/.spelling
index d717fbf9ecf7..6d2ae63828db 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -24,6 +24,7 @@
ACL
APIs
AvroStorage
+ARN
AWS
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
AWS_CONTAINER_CREDENTIALS_FULL_URI
@@ -194,6 +195,8 @@ aggregator
aggregators
ambari
analytics
+assumeRoleArn
+assumeRoleExternalId
async
authorizer
authorizers