diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index ea4d936095..ae75891368 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -236,6 +236,14 @@ public static enum ConfVars implements ConfigKey { // for RCFile HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()), + // S3 Configuration -------------------------------------------------- + S3_MAX_ERROR_RETRIES("tajo.s3.max-error-retries", 50), + S3_SSL_ENABLED("tajo.s3.ssl.enabled", true), + S3_CONNECT_TIMEOUT("tajo.s3.connect-timeout", "5m"), + S3_SOCKET_TIMEOUT("tajo.s3.socket-timeout", "5m"), + S3_MAX_CONNECTIONS("tajo.s3.max-connections", 500), + S3_USE_INSTANCE_CREDENTIALS("tajo.s3.use-instance-credentials", true), + // RPC -------------------------------------------------------------------- // Internal RPC Client INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num", diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index 095f128809..616f9e48f1 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -146,10 +146,12 @@ run cp -r $ROOT/tajo-sql-parser/target/tajo-sql-parser-${project.version}/* . run cp -r $ROOT/tajo-storage/tajo-storage-jdbc/target/tajo-storage-jdbc-${project.version}.jar . run cp -r $ROOT/tajo-storage/tajo-storage-pgsql/target/tajo-storage-pgsql-${project.version}.jar . + run cp -r $ROOT/tajo-storage/tajo-storage-s3/target/tajo-storage-s3-${project.version}.jar . run cp -r $ROOT/tajo-pullserver/target/tajo-pullserver-${project.version}.jar . run cp -r $ROOT/tajo-metrics/target/tajo-metrics-${project.version}.jar . run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar . run cp -r $ROOT/tajo-core/target/lib . + run cp -r $ROOT/tajo-storage/tajo-storage-s3/target/lib/* lib/ run cp -r ${project.basedir}/src/main/bin . run cp -r ${project.basedir}/src/main/conf . run rm -rf lib/tajo-*-${project.version}.jar diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml index a9a541aed1..53a9444460 100644 --- a/tajo-storage/tajo-storage-s3/pom.xml +++ b/tajo-storage/tajo-storage-s3/pom.xml @@ -34,6 +34,8 @@ UTF-8 UTF-8 + 1.7.4 + 0.97 @@ -97,6 +99,28 @@ true + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + runtime + ${project.build.directory}/lib + false + false + true + + + + + @@ -172,6 +196,66 @@ junit test + + + org.apache.hadoop + hadoop-aws + provided + ${hadoop.version} + + + + org.weakref + jmxutils + 1.18 + + + + org.apache.httpcomponents + httpclient + 4.2.5 + + + org.apache.httpcomponents + httpcore + 4.2.5 + + + + junit + junit + test + + + + org.testng + testng + 6.9.6 + test + + + + io.airlift + testing + ${airlft.version} + test + + + com.google.guava + guava + + + javax.validation + validation-api + + + org.apache.bval + bval-jsr303 + + + + + diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3FileSystem.java new file mode 100644 index 0000000000..7a14e35ffe --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3FileSystem.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.*; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.s3.S3Credentials; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.tajo.conf.TajoConf; +import org.weakref.jmx.internal.guava.collect.AbstractSequentialIterator; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.nullToEmpty; +import static com.google.common.collect.Iterables.toArray; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.Objects.requireNonNull; + +public class TajoS3FileSystem extends S3AFileSystem { + private static final DataSize BLOCK_SIZE = new DataSize(32, MEGABYTE); + + private URI uri; + private AmazonS3 s3; + private boolean useInstanceCredentials; + + private TajoConf conf; + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + requireNonNull(uri, "uri is null"); + requireNonNull(conf, "conf is null"); + super.initialize(uri, conf); + setConf(conf); + + this.conf = new TajoConf(conf); + + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + + int maxErrorRetries = this.conf.getIntVar(TajoConf.ConfVars.S3_MAX_ERROR_RETRIES); + boolean sslEnabled = this.conf.getBoolVar(TajoConf.ConfVars.S3_SSL_ENABLED); + + Duration connectTimeout = Duration.valueOf(this.conf.getVar(TajoConf.ConfVars.S3_CONNECT_TIMEOUT)); + Duration socketTimeout = Duration.valueOf(this.conf.getVar(TajoConf.ConfVars.S3_SOCKET_TIMEOUT)); + int maxConnections = this.conf.getIntVar(TajoConf.ConfVars.S3_MAX_CONNECTIONS); + + this.useInstanceCredentials = this.conf.getBoolVar(TajoConf.ConfVars.S3_USE_INSTANCE_CREDENTIALS); + + ClientConfiguration configuration = new ClientConfiguration() + .withMaxErrorRetry(maxErrorRetries) + .withProtocol(sslEnabled ? Protocol.HTTPS : Protocol.HTTP) + .withConnectionTimeout(Ints.checkedCast(connectTimeout.toMillis())) + .withSocketTimeout(Ints.checkedCast(socketTimeout.toMillis())) + .withMaxConnections(maxConnections); + + this.s3 = createAmazonS3Client(uri, conf, configuration); + } + + + private AmazonS3Client createAmazonS3Client(URI uri, Configuration hadoopConfig, ClientConfiguration clientConfig) { + AWSCredentialsProvider credentials = getAwsCredentialsProvider(uri, hadoopConfig); + AmazonS3Client client = new AmazonS3Client(credentials, clientConfig); + return client; + } + + private AWSCredentialsProvider getAwsCredentialsProvider(URI uri, Configuration conf) { + // first try credentials from URI or static properties + try { + return new StaticCredentialsProvider(getAwsCredentials(uri, conf)); + } catch (IllegalArgumentException ignored) { + } + + if (useInstanceCredentials) { + return new InstanceProfileCredentialsProvider(); + } + + throw new RuntimeException("S3 credentials not configured"); + } + + private static AWSCredentials getAwsCredentials(URI uri, Configuration conf) { + S3Credentials credentials = new S3Credentials(); + credentials.initialize(uri, conf); + return new BasicAWSCredentials(credentials.getAccessKey(), credentials.getSecretAccessKey()); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + List list = new ArrayList<>(); + RemoteIterator iterator = listLocatedStatus(path); + while (iterator.hasNext()) { + list.add(iterator.next()); + } + return toArray(list, LocatedFileStatus.class); + } + + @Override + public RemoteIterator listLocatedStatus(Path path) { + return new RemoteIterator() { + private final Iterator iterator = listPrefix(path); + + @Override + public boolean hasNext() throws IOException { + try { + return iterator.hasNext(); + } + catch (AmazonClientException e) { + throw new IOException(e); + } + } + + @Override + public LocatedFileStatus next() throws IOException { + try { + return iterator.next(); + } + catch (AmazonClientException e) { + throw new IOException(e); + } + } + }; + } + + private Iterator listPrefix(Path path) { + String key = keyFromPath(path); + if (!key.isEmpty()) { + key += "/"; + } + + ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(uri.getHost()) + .withPrefix(key) + .withDelimiter("/"); + + Iterator listings = new AbstractSequentialIterator(s3.listObjects(request)) { + @Override + protected ObjectListing computeNext(ObjectListing previous) { + if (!previous.isTruncated()) { + return null; + } + return s3.listNextBatchOfObjects(previous); + } + }; + + return Iterators.concat(Iterators.transform(listings, this::statusFromListing)); + } + + private static String keyFromPath(Path path) { + checkArgument(path.isAbsolute(), "Path is not absolute: %s", path); + String key = nullToEmpty(path.toUri().getPath()); + if (key.startsWith("/")) { + key = key.substring(1); + } + if (key.endsWith("/")) { + key = key.substring(0, key.length() - 1); + } + return key; + } + + private Iterator statusFromListing(ObjectListing listing) { + return Iterators.concat( + statusFromPrefixes(listing.getCommonPrefixes()), + statusFromObjects(listing.getObjectSummaries())); + } + + private Iterator statusFromPrefixes(List prefixes) { + List list = new ArrayList<>(); + for (String prefix : prefixes) { + Path path = qualifiedPath(new Path("/" + prefix)); + FileStatus status = new FileStatus(0, true, 1, 0, 0, path); + list.add(createLocatedFileStatus(status)); + } + return list.iterator(); + } + + private Iterator statusFromObjects(List objects) { + return objects.stream() + .filter(object -> !object.getKey().endsWith("/")) + .map(object -> new FileStatus( + object.getSize(), + false, + 1, + BLOCK_SIZE.toBytes(), + object.getLastModified().getTime(), + qualifiedPath(new Path("/" + object.getKey())))) + .map(this::createLocatedFileStatus) + .iterator(); + } + + private Path qualifiedPath(Path path) { + return path.makeQualified(this.uri, getWorkingDirectory()); + } + + private LocatedFileStatus createLocatedFileStatus(FileStatus status) { + try { + BlockLocation[] fakeLocation = getFileBlockLocations(status, 0, status.getLen()); + return new LocatedFileStatus(status, fakeLocation); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @VisibleForTesting + AmazonS3 getS3Client() { + return s3; + } + + @VisibleForTesting + void setS3Client(AmazonS3 client) { + s3 = client; + } +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestTajoS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestTajoS3FileSystem.java new file mode 100644 index 0000000000..3de3d5bc4b --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestTajoS3FileSystem.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.InstanceProfileCredentialsProvider; +import com.google.common.base.Throwables; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.URI; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.testing.Assertions.assertInstanceOf; +import static org.junit.Assert.assertEquals; + +public class TestTajoS3FileSystem { + + @Test + public void testInstanceCredentialsEnabled() throws Exception { + // the static credentials should be preferred + try (TajoS3FileSystem fs = new TajoS3FileSystem()) { + fs.initialize(new URI("s3n://test-bucket/"), getConfiguration()); + assertInstanceOf(getAwsCredentialsProvider(fs), InstanceProfileCredentialsProvider.class); + } + } + + @Test + public void testInitialization() throws IOException { + initializationTest("s3://a:b@c", "s3://a:b@c"); + initializationTest("s3://a:b@c/", "s3://a:b@c"); + initializationTest("s3://a:b@c/path", "s3://a:b@c"); + initializationTest("s3://a@c", "s3://a@c"); + initializationTest("s3://a@c/", "s3://a@c"); + initializationTest("s3://a@c/path", "s3://a@c"); + initializationTest("s3://c", "s3://c"); + initializationTest("s3://c/", "s3://c"); + initializationTest("s3://c/path", "s3://c"); + } + + private void initializationTest(String initializationUri, String expectedUri) + throws IOException { + TajoS3FileSystem fs = new TajoS3FileSystem(); + fs.initialize(URI.create(initializationUri), getConfiguration()); + assertEquals(URI.create(expectedUri), fs.getUri()); + } + + private Configuration getConfiguration() { + Configuration config = new Configuration(); + config.set("fs.s3a.access.key", "test_access_key_id"); + config.set("fs.s3a.secret.key", "test_secret_access_key"); + + return config; + } + + private static AWSCredentialsProvider getAwsCredentialsProvider(TajoS3FileSystem fs) { + return getFieldValue(fs.getS3Client(), "awsCredentialsProvider", AWSCredentialsProvider.class); + } + + @SuppressWarnings("unchecked") + private static T getFieldValue(Object instance, String name, Class type) { + try { + Field field = instance.getClass().getDeclaredField(name); + checkArgument(field.getType() == type, "expected %s but found %s", type, field.getType()); + field.setAccessible(true); + return (T) field.get(instance); + } + catch (ReflectiveOperationException e) { + throw Throwables.propagate(e); + } + } +} \ No newline at end of file