Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ Properties Object:

|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 input source access key|None|yes if secretAccessKey is given|
|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 input source secret key|None|yes if accessKeyId is given|
|accessKeyId|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 input source access key|None|Yes, if `secretAccessKey` or `sessionToken` is given.|
|secretAccessKey|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 input source secret key|None|Yes, if `accessKeyId` or `sessionToken` is given.|
|sessionToken|The [Password Provider](../operations/password-provider.md) or plain text string of this S3 input source session token|None|no|
|assumeRoleArn|AWS ARN of the role to assume [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html). **assumeRoleArn** can be used either with the ingestion spec AWS credentials or with the default S3 credentials|None|no|
|assumeRoleExternalId|A unique identifier that might be required when you assume a role in another account [see](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html)|None|no|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class S3InputSourceDefn extends FormattedInputSourceDefn
public static final String ACCESS_KEY_ID_PARAMETER = "accessKeyId";
public static final String SECRET_ACCESS_KEY_PARAMETER = "secretAccessKey";
public static final String ASSUME_ROLE_ARN_PARAMETER = "assumeRoleArn";
public static final String SESSION_TOKEN_PARAMETER = "sessionToken";

/**
* The {@code objectGlob} property exists in S3, but is not documented. The corresponding
Expand All @@ -102,7 +103,8 @@ public class S3InputSourceDefn extends FormattedInputSourceDefn
private static final List<ParameterDefn> SECURITY_PARAMS = Arrays.asList(
new Parameter(ACCESS_KEY_ID_PARAMETER, ParameterType.VARCHAR, true),
new Parameter(SECRET_ACCESS_KEY_PARAMETER, ParameterType.VARCHAR, true),
new Parameter(ASSUME_ROLE_ARN_PARAMETER, ParameterType.VARCHAR, true)
new Parameter(ASSUME_ROLE_ARN_PARAMETER, ParameterType.VARCHAR, true),
new Parameter(SESSION_TOKEN_PARAMETER, ParameterType.VARCHAR, true)
);

// Field names in the S3InputSource
Expand All @@ -114,6 +116,7 @@ public class S3InputSourceDefn extends FormattedInputSourceDefn
private static final String ACCESS_KEY_ID_FIELD = "accessKeyId";
private static final String SECRET_ACCESS_KEY_FIELD = "secretAccessKey";
private static final String ASSUME_ROLE_ARN_FIELD = "assumeRoleArn";
private static final String SESSION_TOKEN_FIELD = "sessionToken";

@Override
public String typeValue()
Expand Down Expand Up @@ -250,6 +253,7 @@ private void applySecurityParams(Map<String, Object> jsonMap, Map<String, Object
final String accessKeyId = CatalogUtils.getNonBlankString(args, ACCESS_KEY_ID_PARAMETER);
final String secretAccessKey = CatalogUtils.getNonBlankString(args, SECRET_ACCESS_KEY_PARAMETER);
final String assumeRoleArn = CatalogUtils.getNonBlankString(args, ASSUME_ROLE_ARN_PARAMETER);
final String sessionToken = CatalogUtils.getNonBlankString(args, SESSION_TOKEN_PARAMETER);
if (accessKeyId != null || secretAccessKey != null || assumeRoleArn != null) {
Map<String, Object> properties = new HashMap<>();
if (accessKeyId != null) {
Expand All @@ -258,6 +262,9 @@ private void applySecurityParams(Map<String, Object> jsonMap, Map<String, Object
if (secretAccessKey != null) {
properties.put(SECRET_ACCESS_KEY_FIELD, secretAccessKey);
}
if (sessionToken != null) {
properties.put(SESSION_TOKEN_FIELD, sessionToken);
}
if (assumeRoleArn != null) {
properties.put(ASSUME_ROLE_ARN_FIELD, assumeRoleArn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -282,12 +284,21 @@ private void applyAssumeRole(
@Nonnull
private AWSStaticCredentialsProvider createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
{
return new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword()
)
);
if (s3InputSourceConfig.getSessionToken() != null) {
AWSSessionCredentials sessionCredentials = new BasicSessionCredentials(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword(),
s3InputSourceConfig.getSessionToken().getPassword()
);
return new AWSStaticCredentialsProvider(sessionCredentials);
} else {
return new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
s3InputSourceConfig.getAccessKeyId().getPassword(),
s3InputSourceConfig.getSecretAccessKey().getPassword()
)
);
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,27 @@ public class S3InputSourceConfig
private final String assumeRoleExternalId;
private final PasswordProvider accessKeyId;
private final PasswordProvider secretAccessKey;
private final PasswordProvider sessionToken;

@JsonCreator
public S3InputSourceConfig(
@JsonProperty("accessKeyId") @Nullable PasswordProvider accessKeyId,
@JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey,
@JsonProperty("assumeRoleArn") @Nullable String assumeRoleArn,
@JsonProperty("assumeRoleExternalId") @Nullable String assumeRoleExternalId
@JsonProperty("assumeRoleExternalId") @Nullable String assumeRoleExternalId,
@JsonProperty("sessionToken") @Nullable PasswordProvider sessionToken
)
{
this.assumeRoleArn = assumeRoleArn;
this.assumeRoleExternalId = assumeRoleExternalId;
if (accessKeyId != null || secretAccessKey != null) {
this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "accessKeyId cannot be null if secretAccessKey is given");
this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "secretAccessKey cannot be null if accessKeyId is given");
if (sessionToken != null || accessKeyId != null || secretAccessKey != null) {
this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "'accessKeyId' cannot be null if 'secretAccessKey' or 'sessionToken' is given");
this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "'secretAccessKey' cannot be null if 'accessKeyId' or 'sessionToken' is given");
} else {
this.accessKeyId = null;
this.secretAccessKey = null;
}
this.sessionToken = sessionToken;
}

@Nullable
Expand Down Expand Up @@ -91,6 +94,14 @@ public PasswordProvider getSecretAccessKey()
return secretAccessKey;
}

@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public PasswordProvider getSessionToken()
{
return sessionToken;
}

@JsonIgnore
public boolean isCredentialsConfigured()
{
Expand All @@ -106,6 +117,7 @@ public String toString()
", secretAccessKey=" + secretAccessKey +
", assumeRoleArn=" + assumeRoleArn +
", assumeRoleExternalId=" + assumeRoleExternalId +
", sessionToken=" + sessionToken +
'}';
}

Expand All @@ -122,12 +134,13 @@ public boolean equals(Object o)
return Objects.equals(accessKeyId, that.accessKeyId) &&
Objects.equals(secretAccessKey, that.secretAccessKey) &&
Objects.equals(assumeRoleArn, that.assumeRoleArn) &&
Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId);
Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId) &&
Objects.equals(sessionToken, that.sessionToken);
}

@Override
public int hashCode()
{
return Objects.hash(accessKeyId, secretAccessKey, assumeRoleArn, assumeRoleExternalId);
return Objects.hash(accessKeyId, secretAccessKey, assumeRoleArn, assumeRoleExternalId, sessionToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.junit.Test;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -558,7 +557,7 @@ public void testAdHocPrefixPathConflict()
@Test
public void testFullTableSpecHappyPath()
{
S3InputSourceConfig config = new S3InputSourceConfig(null, null, "foo", null);
S3InputSourceConfig config = new S3InputSourceConfig(null, null, "foo", null, null);
S3InputSource s3InputSource = s3InputSource(
Arrays.asList("s3://foo/bar/", "s3://mumble/"), null, null, "*.csv", config);
TableMetadata table = TableBuilder.external("foo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void testSerdeAccessSecretKey() throws Exception
new DefaultPasswordProvider("the-access-key"),
new DefaultPasswordProvider("the-secret-key"),
null,
null
null,
new DefaultPasswordProvider("the-secret-token")
);

Assert.assertEquals(
Expand All @@ -53,7 +54,8 @@ public void testSerdeAssumeRole() throws Exception
null,
null,
"the-role-arn",
"the-role-external-id"
"the-role-external-id",
null
);

Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);

private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig(
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null);
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null, null);
private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES_WITH_SESSION_TOKEN = new S3InputSourceConfig(
new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"), null, null,
new DefaultPasswordProvider("mySessionToken"));
private static final AWSEndpointConfig ENDPOINT_CONFIG = new AWSEndpointConfig();
private static final AWSProxyConfig PROXY_CONFIG = new AWSProxyConfig();
private static final AWSClientConfig CLIENT_CONFIG = new AWSClientConfig();
Expand Down Expand Up @@ -370,6 +373,152 @@ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exceptio
EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
}

@Test
public void testSerdeWithCloudConfigPropertiesWithSessionToken() throws Exception
{
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
.andStubReturn(AMAZON_S3_CLIENT_BUILDER);
AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION);
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
.andReturn(SERVICE);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
final S3InputSource withSessionToken = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES_WITH_SESSION_TOKEN,
null,
null,
null
);
final S3InputSource serdeWithSessionToken =
MAPPER.readValue(MAPPER.writeValueAsString(withSessionToken), S3InputSource.class);
// This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3
serdeWithSessionToken.createEntity(new CloudObjectLocation("bucket", "path"));
Assert.assertEquals(withSessionToken, serdeWithSessionToken);
// Verify that the session token is properly set
Assert.assertNotNull(serdeWithSessionToken.getS3InputSourceConfig());
Assert.assertNotNull(serdeWithSessionToken.getS3InputSourceConfig().getSessionToken());
Assert.assertEquals("mySessionToken", serdeWithSessionToken.getS3InputSourceConfig().getSessionToken().getPassword());
EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
}

@Test
public void testGetSetSessionToken()
{
// Test that session token getter/setter work correctly
final S3InputSource inputSourceWithSessionToken = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
null,
CLOUD_CONFIG_PROPERTIES_WITH_SESSION_TOKEN,
null,
null,
null
);

Assert.assertNotNull(inputSourceWithSessionToken.getS3InputSourceConfig());
Assert.assertNotNull(inputSourceWithSessionToken.getS3InputSourceConfig().getSessionToken());
Assert.assertEquals(
"mySessionToken",
inputSourceWithSessionToken.getS3InputSourceConfig().getSessionToken().getPassword()
);

// Test without session token
final S3InputSource inputSourceWithoutSessionToken = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
null,
CLOUD_CONFIG_PROPERTIES,
null,
null,
null
);

Assert.assertNotNull(inputSourceWithoutSessionToken.getS3InputSourceConfig());
Assert.assertNull(inputSourceWithoutSessionToken.getS3InputSourceConfig().getSessionToken());
}

@Test
public void testSessionCredentialsUsedWhenSessionTokenProvided() throws IOException
{
// This test verifies that when session token is provided, the S3InputSource
// correctly uses BasicSessionCredentials instead of BasicAWSCredentials
EasyMock.reset(S3_CLIENT);
expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)), CONTENT);
expectGetObject(EXPECTED_URIS.get(0));
EasyMock.replay(S3_CLIENT);

EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
.andStubReturn(AMAZON_S3_CLIENT_BUILDER);
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
.andReturn(SERVICE);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);

// Create S3InputSource with session token
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
ImmutableList.of(PREFIXES.get(0)),
null,
null,
CLOUD_CONFIG_PROPERTIES_WITH_SESSION_TOKEN,
null,
null,
null
);

// Verify session token is set
Assert.assertNotNull(inputSource.getS3InputSourceConfig());
Assert.assertNotNull(inputSource.getS3InputSourceConfig().getSessionToken());
Assert.assertEquals(
"mySessionToken",
inputSource.getS3InputSourceConfig().getSessionToken().getPassword()
);

// Create a reader which will trigger the s3ClientSupplier and use the session credentials
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ColumnsFilter.all()
);

InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0, null),
temporaryFolder.newFolder()
);

// Read data - this exercises the session credentials path
CloseableIterator<InputRow> iterator = reader.read();

while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}

EasyMock.verify(S3_CLIENT);
EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
}

@Test
public void testGetTypes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class MSQTaskQueryMakerUtils
{

public static final Set<String> SENSISTIVE_JSON_KEYS = ImmutableSet.of("accessKeyId", "secretAccessKey");
public static final Set<String> SENSISTIVE_JSON_KEYS = ImmutableSet.of("accessKeyId", "secretAccessKey", "sessionToken");
public static final Set<Pattern> SENSITIVE_KEYS_REGEX_PATTERNS = SENSISTIVE_JSON_KEYS.stream()
.map(sensitiveKey ->
Pattern.compile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void maskSensitiveJsonKeys()
+ "OVERWRITE ALL\\n"
+ "WITH ext AS "
+ "(SELECT *\\nFROM TABLE(\\n "
+ "EXTERN(\\n '{\\\"type\\\":\\\"s3\\\",\\\"prefixes\\\":[\\\"s3://prefix\\\"],\\\"properties\\\":{\\\"accessKeyId\\\":{\\\"type\\\":\\\"default\\\",\\\"password\\\":\\\"secret_pass\\\"},\\\"secretAccessKey\\\":{\\\"type\\\":\\\"default\\\",\\\"password\\\":\\\"secret_pass\\\"}}}',\\n"
+ "EXTERN(\\n '{\\\"type\\\":\\\"s3\\\",\\\"prefixes\\\":[\\\"s3://prefix\\\"],\\\"properties\\\":{\\\"accessKeyId\\\":{\\\"type\\\":\\\"default\\\",\\\"password\\\":\\\"secret_pass\\\"},\\\"secretAccessKey\\\":{\\\"type\\\":\\\"default\\\",\\\"password\\\":\\\"secret_pass\\\"},\\\"sessionToken\\\":{\\\"type\\\":\\\"default\\\",\\\"password\\\":\\\"secret_pass\\\"}}}',\\n"
+ "'{\\\"type\\\":\\\"json\\\"}',\\n"
+ "'[{\\\"name\\\":\\\"time\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\"}]'\\n )\\n))\\n"
+ "SELECT\\n TIME_PARSE(\\\"time\\\") AS __time,\\n name,\\n country "
Expand Down Expand Up @@ -113,7 +113,7 @@ public void maskSensitiveJsonKeys()
+ "OVERWRITE ALL\\n"
+ "WITH ext AS "
+ "(SELECT *\\nFROM TABLE(\\n "
+ "EXTERN(\\n '{\\\"type\\\":\\\"s3\\\",\\\"prefixes\\\":[\\\"s3://prefix\\\"],\\\"properties\\\":{\\\"accessKeyId\\\":<masked>,\\\"secretAccessKey\\\":<masked>}}',\\n"
+ "EXTERN(\\n '{\\\"type\\\":\\\"s3\\\",\\\"prefixes\\\":[\\\"s3://prefix\\\"],\\\"properties\\\":{\\\"accessKeyId\\\":<masked>,\\\"secretAccessKey\\\":<masked>,\\\"sessionToken\\\":<masked>}}',\\n"
+ "'{\\\"type\\\":\\\"json\\\"}',\\n"
+ "'[{\\\"name\\\":\\\"time\\\",\\\"type\\\":\\\"string\\\"},{\\\"name\\\":\\\"name\\\",\\\"type\\\":\\\"string\\\"}]'\\n )\\n))\\n"
+ "SELECT\\n TIME_PARSE(\\\"time\\\") AS __time,\\n name,\\n country "
Expand Down
Loading