Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,47 @@ Sample specs:
...
```

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
"secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd"
}
},
"inputFormat": {
"type": "json"
},
...
},
...
```

```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "s3",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
"secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
"assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe above are some dummy IDs and roles and not some actual ones.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. All fake.

}
},
"inputFormat": {
"type": "json"
},
...
},
...
```

|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `s3`.|None|yes|
Expand All @@ -917,6 +958,8 @@ Properties Object:
|--------|-----------|-------|---------|
|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's access key|None|yes if secretAccessKey is given|
|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 InputSource's secret key|None|yes if accessKeyId is given|
|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no|
|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no|

**Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.*

Expand Down
4 changes: 2 additions & 2 deletions extensions-core/s3-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<scope>provided</scope>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws-common doesn't contain it, surprisingly Kinesis extension contains it, when Kinesis ext. is removed then the S3 ext. fails.

</dependency>
<version>${aws.sdk.version}</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

package org.apache.druid.data.input.s3;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
Expand All @@ -35,6 +40,7 @@
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
Expand All @@ -47,6 +53,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -59,6 +66,7 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("properties")
private final S3InputSourceConfig s3InputSourceConfig;
private final S3InputDataConfig inputDataConfig;
private final AWSCredentialsProvider awsCredentialsProvider;

/**
* Constructor for S3InputSource
Expand All @@ -84,7 +92,8 @@ public S3InputSource(
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
@JacksonInject AWSCredentialsProvider awsCredentialsProvider
)
{
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
Expand All @@ -95,20 +104,76 @@ public S3InputSource(
() -> {
if (s3ClientBuilder != null && s3InputSourceConfig != null) {
if (s3InputSourceConfig.isCredentialsConfigured()) {
AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword()
)
);
s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(credentials);
if (s3InputSourceConfig.getAssumeRoleArn() == null) {
s3ClientBuilder
.getAmazonS3ClientBuilder()
.withCredentials(createStaticCredentialsProvider(s3InputSourceConfig));
} else {
applyAssumeRole(
s3ClientBuilder,
s3InputSourceConfig,
createStaticCredentialsProvider(s3InputSourceConfig)
);
}
} else {
applyAssumeRole(s3ClientBuilder, s3InputSourceConfig, awsCredentialsProvider);
}
return s3ClientBuilder.build();
} else {
return s3Client;
}
}
);
this.awsCredentialsProvider = awsCredentialsProvider;
}

@VisibleForTesting
public S3InputSource(
ServerSideEncryptingAmazonS3 s3Client,
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
S3InputDataConfig inputDataConfig,
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
S3InputSourceConfig s3InputSourceConfig
)
{
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
}

private void applyAssumeRole(
ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
S3InputSourceConfig s3InputSourceConfig,
AWSCredentialsProvider awsCredentialsProvider
)
{
String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
if (assumeRoleArn != null) {
String roleSessionName = StringUtils.format("druid-s3-input-source-%s", UUID.randomUUID().toString());
AWSSecurityTokenService securityTokenService = AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.build();
STSAssumeRoleSessionCredentialsProvider.Builder roleCredentialsProviderBuilder;
roleCredentialsProviderBuilder = new STSAssumeRoleSessionCredentialsProvider
.Builder(assumeRoleArn, roleSessionName).withStsClient(securityTokenService);

if (s3InputSourceConfig.getAssumeRoleExternalId() != null) {
roleCredentialsProviderBuilder.withExternalId(s3InputSourceConfig.getAssumeRoleExternalId());
}

s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(roleCredentialsProviderBuilder.build());
}
}

@Nonnull
private AWSStaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
{
return new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword()
)
);
}

@Nullable
Expand Down Expand Up @@ -149,7 +214,8 @@ public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<Lis
null,
null,
split.get(),
getS3InputSourceConfig()
getS3InputSourceConfig(),
awsCredentialsProvider
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,50 @@
*/
public class S3InputSourceConfig
{
@JsonProperty
private String assumeRoleArn;
@JsonProperty
private String assumeRoleExternalId;
@JsonProperty
private PasswordProvider accessKeyId;
@JsonProperty
private PasswordProvider secretAccessKey;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accessKeyId and SecretAccessKey are also nullable, Can we add annotation these as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


@JsonCreator
public S3InputSourceConfig(
@JsonProperty("accessKeyId") @Nullable PasswordProvider accessKeyId,
@JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey
@JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey,
@JsonProperty("assumeRoleArn") @Nullable String assumeRoleArn,
@JsonProperty("assumeRoleExternalId") @Nullable String assumeRoleExternalId
)
{
this.assumeRoleArn = assumeRoleArn;
this.assumeRoleExternalId = assumeRoleExternalId;
if (accessKeyId != null || secretAccessKey != null) {
this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "accessKeyId cannot be null if secretAccessKey is given");
this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "secretAccessKey cannot be null if accessKeyId is given");
}
}

@JsonProperty
private PasswordProvider accessKeyId;

@JsonProperty
private PasswordProvider secretAccessKey;
@Nullable
public String getAssumeRoleArn()
{
return assumeRoleArn;
}

@Nullable
public String getAssumeRoleExternalId()
{
return assumeRoleExternalId;
}

@Nullable
public PasswordProvider getAccessKeyId()
{
return accessKeyId;
}

@Nullable
public PasswordProvider getSecretAccessKey()
{
return secretAccessKey;
Expand All @@ -76,6 +96,8 @@ public String toString()
return "S3InputSourceConfig{" +
"accessKeyId=" + accessKeyId +
", secretAccessKey=" + secretAccessKey +
", assumeRoleArn=" + assumeRoleArn +
", assumeRoleExternalId=" + assumeRoleExternalId +
'}';
}

Expand All @@ -90,12 +112,14 @@ public boolean equals(Object o)
}
S3InputSourceConfig that = (S3InputSourceConfig) o;
return Objects.equals(accessKeyId, that.accessKeyId) &&
Objects.equals(secretAccessKey, that.secretAccessKey);
Objects.equals(secretAccessKey, that.secretAccessKey) &&
Objects.equals(assumeRoleArn, that.assumeRoleArn) &&
Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId);
}

@Override
public int hashCode()
{
return Objects.hash(accessKeyId, secretAccessKey);
return Objects.hash(accessKeyId, secretAccessKey, assumeRoleArn, assumeRoleExternalId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.data.input.s3;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
Expand All @@ -40,6 +41,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.common.aws.AWSCredentialsUtils;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
Expand Down Expand Up @@ -120,7 +122,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);

private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig(
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"));
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null);

private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
Expand Down Expand Up @@ -221,6 +223,7 @@ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutC
{
S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class);
EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
.andStubReturn(false);
EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
Expand Down Expand Up @@ -279,6 +282,7 @@ public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws Exceptio
null,
EXPECTED_LOCATION,
null

);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
Expand Down Expand Up @@ -664,7 +668,25 @@ public static ObjectMapper createS3ObjectMapper()
DruidModule baseModule = new TestS3Module();
final Injector injector = Guice.createInjector(
new ObjectMapperModule(),
baseModule
baseModule,
new DruidModule()
{
@Provides
public AWSCredentialsProvider getAWSCredentialsProvider()
{
return AWSCredentialsUtils.defaultAWSCredentialsProviderChain(null);
}

@Override public List<? extends Module> getJacksonModules()
{
return Collections.emptyList();
}

@Override public void configure(Binder binder)
{

}
}
);
final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);

Expand Down
3 changes: 3 additions & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ACL
APIs
AvroStorage
ARN
AWS
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
AWS_CONTAINER_CREDENTIALS_FULL_URI
Expand Down Expand Up @@ -194,6 +195,8 @@ aggregator
aggregators
ambari
analytics
assumeRoleArn
assumeRoleExternalId
async
authorizer
authorizers
Expand Down