diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index e6a018c7e8..c259c7afb3 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -41,6 +41,8 @@ 6.1.26 1.8.1 0.10.0.1 + 1.7.4 + 4.5 ${project.parent.relativePath}/.. src/main/hadoop-${hadoop.version} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 2785de4ea4..dd7db31a9f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -128,13 +128,13 @@ protected void storageInit() throws IOException { @Override public long getTableVolume(TableDesc table, Optional filter) { Path path = new Path(table.getUri()); - ContentSummary summary; + long totalVolume = 0L; try { - summary = fs.getContentSummary(path); + totalVolume = calculateSize(path); } catch (IOException e) { throw new TajoInternalError(e); } - return summary.getLength(); + return totalVolume; } @Override @@ -246,6 +246,13 @@ public static FileFragment[] splitNG(Configuration conf, String tableName, Table return tablets; } + /** + * Calculate the total size of all files in the indicated Path + * + * @param path to use + * @return calculated size + * @throws IOException + */ public long calculateSize(Path tablePath) throws IOException { FileSystem fs = tablePath.getFileSystem(conf); long totalSize = 0; diff --git a/tajo-storage/tajo-storage-s3/pom.xml b/tajo-storage/tajo-storage-s3/pom.xml index a9a541aed1..69564111f9 100644 --- a/tajo-storage/tajo-storage-s3/pom.xml +++ b/tajo-storage/tajo-storage-s3/pom.xml @@ -61,13 +61,6 @@ org.apache.rat apache-rat-plugin - - - src/test/resources/dataset/** - src/test/resources/queries/** - src/test/resources/results/** - - verify @@ -89,13 +82,9 @@ - org.apache.maven.plugins - maven-surefire-plugin - - true - + maven-surefire-report-plugin @@ -116,83 +105,40 @@ tajo-storage-hdfs provided - org.apache.hadoop hadoop-common provided - - - zookeeper - org.apache.zookeeper - - - slf4j-api - org.slf4j - - - jersey-json - com.sun.jersey - - org.apache.hadoop hadoop-hdfs provided - - - commons-el - commons-el - - - tomcat - jasper-runtime - - - tomcat - jasper-compiler - - - org.mortbay.jetty - jsp-2.1-jetty - - - com.sun.jersey.jersey-test-framework - jersey-test-framework-grizzly2 - - - netty-all - io.netty - - + + + com.amazonaws + aws-java-sdk + ${aws-java-sdk.version} + provided + + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + test + + junit junit test + - - - test-storage-s3 - - - - org.apache.maven.plugins - maven-surefire-plugin - - - TRUE - - -Xms128m -Xmx1024m -Dfile.encoding=UTF-8 - - - - - docs diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java new file mode 100644 index 0000000000..a884ca89b7 --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/AnonymousAWSCredentialsProvider.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.AWSCredentials; + +/** + * Borrow from org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider. + * + */ +public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider { + public AWSCredentials getCredentials() { + return new AnonymousAWSCredentials(); + } + + public void refresh() {} + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java new file mode 100644 index 0000000000..0f4fbde763 --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/BasicAWSCredentialsProvider.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import org.apache.commons.lang.StringUtils; + +/** + * Borrow from org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider. + * + */ +public class BasicAWSCredentialsProvider implements AWSCredentialsProvider { + private final String accessKey; + private final String secretKey; + + public BasicAWSCredentialsProvider(String accessKey, String secretKey) { + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public AWSCredentials getCredentials() { + if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { + return new BasicAWSCredentials(accessKey, secretKey); + } + throw new AmazonClientException( + "Access key or secret key is null"); + } + + public void refresh() {} + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java index 4bcdb60a68..ee6af31103 100644 --- a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/S3TableSpace.java @@ -18,14 +18,225 @@ package org.apache.tajo.storage.s3; +import java.io.IOException; import java.net.URI; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.*; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.storage.FileTablespace; import net.minidev.json.JSONObject; +import static org.apache.tajo.storage.s3.TajoS3Constants.*; + public class S3TableSpace extends FileTablespace { + private final static Log LOG = LogFactory.getLog(S3TableSpace.class); + + private AmazonS3 s3; + private boolean s3Enabled; + private int maxKeys; + public S3TableSpace(String spaceName, URI uri, JSONObject config) { super(spaceName, uri, config); } + + @Override + public void init(TajoConf tajoConf) throws IOException { + super.init(tajoConf); + + try { + // Try to get our credentials or just connect anonymously + String accessKey = conf.get(ACCESS_KEY, null); + String secretKey = conf.get(SECRET_KEY, null); + + String userInfo = uri.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + + AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( + new BasicAWSCredentialsProvider(accessKey, secretKey), + new InstanceProfileCredentialsProvider(), + new AnonymousAWSCredentialsProvider() + ); + + ClientConfiguration awsConf = new ClientConfiguration(); + awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES)); + awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT)); + awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)); + + String proxyHost = conf.getTrimmed(PROXY_HOST,""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (secureConnections) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Using proxy server %s:%d as user %s with password %s on domain %s as workstation " + + "%s", awsConf.getProxyHost(), awsConf.getProxyPort(), awsConf.getProxyUsername(), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), awsConf.getProxyWorkstation())); + } + } else if (proxyPort >= 0) { + String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + } + + s3 = new AmazonS3Client(credentials, awsConf); + String endPoint = conf.getTrimmed(ENDPOINT,""); + if (!endPoint.isEmpty()) { + try { + s3.setEndpoint(endPoint); + } catch (IllegalArgumentException e) { + String msg = "Incorrect endpoint: " + e.getMessage(); + LOG.error(msg); + } + } + + maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); + s3Enabled = true; + } catch (NoClassDefFoundError e) { + // If the version of hadoop is less than 2.6.0, hadoop doesn't include aws dependencies because it doesn't provide + // S3AFileSystem. In this case, tajo never uses aws s3 api directly. + LOG.warn(e); + s3Enabled = false; + } catch (Exception e) { + throw new TajoInternalError(e); + } + } + + /** + * Calculate the total size of all objects in the indicated bucket + * + * @param path to use + * @return calculated size + * @throws IOException + */ + @Override + public long calculateSize(Path path) throws IOException { + long totalBucketSize = 0L; + + if (s3Enabled) { + String key = pathToKey(path); + + final FileStatus fileStatus = fs.getFileStatus(path); + + if (fileStatus.isDirectory()) { + if (!key.isEmpty()) { + key = key + "/"; + } + + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(uri.getHost()); + request.setPrefix(key); + request.setMaxKeys(maxKeys); + + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: doing listObjects for directory " + key); + } + + ObjectListing objects = s3.listObjects(request); + + while (true) { + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, fs.getWorkingDirectory()); + + // Skip over keys that are ourselves and old S3N _$folder$ files + if (keyPath.equals(path) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + keyPath); + } + continue; + } + + if (!objectRepresentsDirectory(summary.getKey(), summary.getSize())) { + totalBucketSize += summary.getSize(); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + objects = s3.listNextBatchOfObjects(objects); + } else { + break; + } + } + } else { + return fileStatus.getLen(); + } + } else { + totalBucketSize = fs.getContentSummary(path).getLength(); + } + + return totalBucketSize; + } + + private boolean objectRepresentsDirectory(final String name, final long size) { + return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L; + } + + private Path keyToPath(String key) { + return new Path("/" + key); + } + + /* Turns a path (relative or otherwise) into an S3 key + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(fs.getWorkingDirectory(), path); + } + + if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { + return ""; + } + + return path.toUri().getPath().substring(1); + } + + @VisibleForTesting + public void setAmazonS3Client(AmazonS3 s3) { + this.s3 = s3; + } } diff --git a/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java new file mode 100644 index 0000000000..48f76b882b --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/main/java/org/apache/tajo/storage/s3/TajoS3Constants.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +/** + * Borrow from org.apache.hadoop.fs.s3a.TajoS3Constants. + * + */ +public class TajoS3Constants { + // s3 access key + public static final String ACCESS_KEY = "fs.s3a.access.key"; + + // s3 secret key + public static final String SECRET_KEY = "fs.s3a.secret.key"; + + //use a custom endpoint? + public static final String ENDPOINT = "fs.s3a.endpoint"; + + // number of times we should retry errors + public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; + public static final int DEFAULT_MAX_ERROR_RETRIES = 10; + + // connect to s3 over ssl? + public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; + public static final boolean DEFAULT_SECURE_CONNECTIONS = true; + + // seconds until we give up trying to establish a connection to s3 + public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout"; + public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000; + + // seconds until we give up on a connection to s3 + public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout"; + public static final int DEFAULT_SOCKET_TIMEOUT = 50000; + + // number of simultaneous connections to s3 + public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; + public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; + + // number of records to get while paging through a directory listing + public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; + public static final int DEFAULT_MAX_PAGING_KEYS = 5000; + + public static final String S3N_FOLDER_SUFFIX = "_$folder$"; + + //connect to s3 through a proxy server? + public static final String PROXY_HOST = "fs.s3a.proxy.host"; + public static final String PROXY_PORT = "fs.s3a.proxy.port"; + public static final String PROXY_USERNAME = "fs.s3a.proxy.username"; + public static final String PROXY_PASSWORD = "fs.s3a.proxy.password"; + public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain"; + public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation"; + +} diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java new file mode 100644 index 0000000000..a0d8c2675d --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockAmazonS3.java @@ -0,0 +1,616 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.HttpMethod; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.S3ResponseMetadata; +import com.amazonaws.services.s3.model.*; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnsupportedException; + +import java.io.File; +import java.io.InputStream; +import java.net.URL; +import java.util.Date; +import java.util.List; + +import static org.apache.http.HttpStatus.SC_OK; + +public class MockAmazonS3 implements AmazonS3 { + private int getObjectHttpCode = SC_OK; + private int getObjectMetadataHttpCode = SC_OK; + + @Override + public void setEndpoint(String endpoint) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setRegion(Region region) + throws IllegalArgumentException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setS3ClientOptions(S3ClientOptions clientOptions) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setObjectRedirectLocation(String bucketName, String key, String newRedirectLocation) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public ObjectListing listObjects(String bucketName) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public ObjectListing listObjects(String bucketName, String prefix) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) + throws AmazonClientException { + if (listObjectsRequest.getBucketName().equals("tajo-test") && listObjectsRequest.getPrefix().equals("test/")) { + MockObjectListing objectListing = new MockObjectListing(); + return objectListing; + } else { + throw new TajoInternalError(new UnsupportedException()); + } + } + + @Override + public ObjectListing listNextBatchOfObjects(ObjectListing previousObjectListing) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public VersionListing listVersions(String bucketName, String prefix) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public VersionListing listNextBatchOfVersions(VersionListing previousVersionListing) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public VersionListing listVersions(String bucketName, String prefix, String keyMarker, String versionIdMarker, + String delimiter, Integer maxResults) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public VersionListing listVersions(ListVersionsRequest listVersionsRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public Owner getS3AccountOwner() + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public boolean doesBucketExist(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public List listBuckets() + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public List listBuckets(ListBucketsRequest listBucketsRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public String getBucketLocation(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public String getBucketLocation(GetBucketLocationRequest getBucketLocationRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public Bucket createBucket(CreateBucketRequest createBucketRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public Bucket createBucket(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public Bucket createBucket(String bucketName, com.amazonaws.services.s3.model.Region region) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public Bucket createBucket(String bucketName, String region) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public AccessControlList getObjectAcl(String bucketName, String key) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public AccessControlList getObjectAcl(String bucketName, String key, String versionId) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setObjectAcl(String bucketName, String key, AccessControlList acl) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setObjectAcl(String bucketName, String key, CannedAccessControlList acl) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setObjectAcl(String bucketName, String key, String versionId, AccessControlList acl) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setObjectAcl(String bucketName, String key, String versionId, CannedAccessControlList acl) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public AccessControlList getBucketAcl(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketAcl(SetBucketAclRequest setBucketAclRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public AccessControlList getBucketAcl(GetBucketAclRequest getBucketAclRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketAcl(String bucketName, AccessControlList acl) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketAcl(String bucketName, CannedAccessControlList acl) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public ObjectMetadata getObjectMetadata(String bucketName, String key) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public S3Object getObject(String bucketName, String key) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public S3Object getObject(GetObjectRequest getObjectRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinationFile) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucket(DeleteBucketRequest deleteBucketRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucket(String bucketName) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public PutObjectResult putObject(PutObjectRequest putObjectRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public PutObjectResult putObject(String bucketName, String key, File file) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public CopyObjectResult copyObject(String sourceBucketName, String sourceKey, String destinationBucketName, + String destinationKey) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public CopyPartResult copyPart(CopyPartRequest copyPartRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteObject(String bucketName, String key) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteVersion(String bucketName, String key, String versionId) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteVersion(DeleteVersionRequest deleteVersionRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketLoggingConfiguration getBucketLoggingConfiguration(String bucketName) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketLoggingConfiguration(SetBucketLoggingConfigurationRequest setBucketLoggingConfigurationRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketVersioningConfiguration getBucketVersioningConfiguration(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketVersioningConfiguration(SetBucketVersioningConfigurationRequest + setBucketVersioningConfigurationRequest) throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketLifecycleConfiguration getBucketLifecycleConfiguration(String bucketName) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketLifecycleConfiguration(String bucketName, + BucketLifecycleConfiguration bucketLifecycleConfiguration) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketLifecycleConfiguration(SetBucketLifecycleConfigurationRequest + setBucketLifecycleConfigurationRequest) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketLifecycleConfiguration(String bucketName) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketLifecycleConfiguration(DeleteBucketLifecycleConfigurationRequest + deleteBucketLifecycleConfigurationRequest) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketCrossOriginConfiguration getBucketCrossOriginConfiguration(String bucketName) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketCrossOriginConfiguration(String bucketName, BucketCrossOriginConfiguration + bucketCrossOriginConfiguration) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketCrossOriginConfiguration(SetBucketCrossOriginConfigurationRequest + setBucketCrossOriginConfigurationRequest) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketCrossOriginConfiguration(String bucketName) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketCrossOriginConfiguration(DeleteBucketCrossOriginConfigurationRequest + deleteBucketCrossOriginConfigurationRequest) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketTaggingConfiguration getBucketTaggingConfiguration(String bucketName) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketTaggingConfiguration(String bucketName, BucketTaggingConfiguration bucketTaggingConfiguration) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketTaggingConfiguration(SetBucketTaggingConfigurationRequest setBucketTaggingConfigurationRequest) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketTaggingConfiguration(String bucketName) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketTaggingConfiguration(DeleteBucketTaggingConfigurationRequest + deleteBucketTaggingConfigurationRequest) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketNotificationConfiguration getBucketNotificationConfiguration(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketNotificationConfiguration(SetBucketNotificationConfigurationRequest + setBucketNotificationConfigurationRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketNotificationConfiguration(String bucketName, BucketNotificationConfiguration + bucketNotificationConfiguration) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketWebsiteConfiguration getBucketWebsiteConfiguration(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketWebsiteConfiguration getBucketWebsiteConfiguration(GetBucketWebsiteConfigurationRequest + getBucketWebsiteConfigurationRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketWebsiteConfiguration(String bucketName, BucketWebsiteConfiguration configuration) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketWebsiteConfiguration(SetBucketWebsiteConfigurationRequest setBucketWebsiteConfigurationRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketWebsiteConfiguration(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketWebsiteConfiguration(DeleteBucketWebsiteConfigurationRequest + deleteBucketWebsiteConfigurationRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketPolicy getBucketPolicy(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public BucketPolicy getBucketPolicy(GetBucketPolicyRequest getBucketPolicyRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketPolicy(String bucketName, String policyText) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void setBucketPolicy(SetBucketPolicyRequest setBucketPolicyRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketPolicy(String bucketName) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void deleteBucketPolicy(DeleteBucketPolicyRequest deleteBucketPolicyRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public URL generatePresignedUrl(String bucketName, String key, Date expiration) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public URL generatePresignedUrl(String bucketName, String key, Date expiration, HttpMethod method) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public URL generatePresignedUrl(GeneratePresignedUrlRequest generatePresignedUrlRequest) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest request) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public UploadPartResult uploadPart(UploadPartRequest request) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public PartListing listParts(ListPartsRequest request) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void abortMultipartUpload(AbortMultipartUploadRequest request) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public MultipartUploadListing listMultipartUploads(ListMultipartUploadsRequest request) + throws AmazonClientException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public S3ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void restoreObject(RestoreObjectRequest request) + throws AmazonServiceException { + throw new TajoInternalError(new UnsupportedException()); + } + + @Override + public void restoreObject(String bucketName, String key, int expirationInDays) + throws AmazonServiceException { + throw new TajoInternalError(new UnsupportedException()); + } +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java new file mode 100644 index 0000000000..96a7f809df --- /dev/null +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockObjectListing.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.s3; + +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.Lists; + +import java.util.List; + +public class MockObjectListing extends ObjectListing { + + @Override + public List getObjectSummaries() { + final String bucketName = "tajo-test"; + + List objectSummaries = Lists.newArrayList(); + objectSummaries.add(getS3ObjectSummary(bucketName, "test/data01", 10L)); + objectSummaries.add(getS3ObjectSummary(bucketName, "test/data02", 10L)); + objectSummaries.add(getS3ObjectSummary(bucketName, "test/data03", 10L)); + + return objectSummaries; + } + + private S3ObjectSummary getS3ObjectSummary(String bucketName, String key, long size) { + S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucketName); + objectSummary.setKey(key); + objectSummary.setSize(size); + return objectSummary; + } +} diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java index 15c6a337f6..6526269731 100644 --- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/MockS3FileSystem.java @@ -21,6 +21,8 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; +import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.UnsupportedException; import java.io.IOException; import java.net.URI; @@ -36,7 +38,6 @@ public void initialize(URI uri, Configuration conf) throws IOException { this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); } - /** * Return the protocol scheme for the FileSystem. *

@@ -60,18 +61,18 @@ public Path makeQualified(Path path) { @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return null; + throw new TajoInternalError(new UnsupportedException()); } @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return null; + throw new TajoInternalError(new UnsupportedException()); } @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { - return null; + throw new TajoInternalError(new UnsupportedException()); } @Override @@ -95,7 +96,7 @@ public void setWorkingDirectory(Path new_dir) { @Override public Path getWorkingDirectory() { - return null; + return new Path(uri); } @Override @@ -105,6 +106,12 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { @Override public FileStatus getFileStatus(Path f) throws IOException { - return null; + if (f.equals(new Path(TestS3TableSpace.S3_URI, "test")) + || f.equals(new Path(TestS3TableSpace.S3N_URI, "test")) + || f.equals(new Path(TestS3TableSpace.S3A_URI, "test"))) { + return new FileStatus(0, true, 1, 0, 0, f); + } else { + throw new TajoInternalError(new UnsupportedException()); + } } -} +} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java index 2d0677885c..2b630c034f 100644 --- a/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java +++ b/tajo-storage/tajo-storage-s3/src/test/java/org/apache/tajo/storage/s3/TestS3TableSpace.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.s3; import net.minidev.json.JSONObject; +import org.apache.hadoop.fs.Path; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.TablespaceManager; import org.junit.AfterClass; @@ -28,35 +29,103 @@ import java.io.IOException; import java.net.URI; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestS3TableSpace { - public static final String SPACENAME = "s3_cluster"; + public static final String S3_SPACENAME = "s3_cluster"; + public static final String S3N_SPACENAME = "s3N_cluster"; + public static final String S3A_SPACENAME = "s3A_cluster"; + public static final String S3_URI = "s3://tajo-test/"; + public static final String S3N_URI = "s3n://tajo-test/"; + public static final String S3A_URI = "s3a://tajo-test/"; @BeforeClass public static void setUp() throws Exception { - S3TableSpace tablespace = new S3TableSpace(SPACENAME, URI.create(S3_URI), new JSONObject()); - + // Add tablespace for s3 prefix + S3TableSpace s3TableSpace = new S3TableSpace(S3_SPACENAME, URI.create(S3_URI), new JSONObject()); TajoConf tajoConf = new TajoConf(); tajoConf.set("fs.s3.impl", MockS3FileSystem.class.getName()); - tablespace.init(tajoConf); + tajoConf.set("fs.s3.awsAccessKeyId", "test_access_key_id"); + tajoConf.set("fs.s3.awsSecretAccessKey", "test_secret_access_key"); + s3TableSpace.init(tajoConf); + TablespaceManager.addTableSpaceForTest(s3TableSpace); - TablespaceManager.addTableSpaceForTest(tablespace); + // Add tablespace for s3n prefix + S3TableSpace s3nTableSpace = new S3TableSpace(S3N_SPACENAME, URI.create(S3N_URI), new JSONObject()); + tajoConf = new TajoConf(); + tajoConf.set("fs.s3n.impl", MockS3FileSystem.class.getName()); + tajoConf.set("fs.s3n.awsAccessKeyId", "test_access_key_id"); + tajoConf.set("fs.s3n.awsSecretAccessKey", "test_secret_access_key"); + s3nTableSpace.init(tajoConf); + TablespaceManager.addTableSpaceForTest(s3nTableSpace); + + // Add tablespace for s3a prefix + S3TableSpace s3aTableSpace = new S3TableSpace(S3A_SPACENAME, URI.create(S3A_URI), new JSONObject()); + tajoConf = new TajoConf(); + tajoConf.set("fs.s3a.impl", MockS3FileSystem.class.getName()); + tajoConf.set("fs.s3a.access.key", "test_access_key_id"); + tajoConf.set("fs.s3a.secret.key", "test_secret_access_key"); + s3aTableSpace.init(tajoConf); + TablespaceManager.addTableSpaceForTest(s3aTableSpace); } @AfterClass public static void tearDown() throws IOException { - TablespaceManager.removeTablespaceForTest(SPACENAME); + TablespaceManager.removeTablespaceForTest(S3_SPACENAME); + TablespaceManager.removeTablespaceForTest(S3N_SPACENAME); + TablespaceManager.removeTablespaceForTest(S3A_SPACENAME); } @Test public void testTablespaceHandler() throws Exception { - assertTrue((TablespaceManager.getByName(SPACENAME)) instanceof S3TableSpace); - assertEquals(SPACENAME, (TablespaceManager.getByName(SPACENAME).getName())); - + // Verify the tablespace for s3 prefix + assertTrue((TablespaceManager.getByName(S3_SPACENAME)) instanceof S3TableSpace); + assertEquals(S3_SPACENAME, (TablespaceManager.getByName(S3_SPACENAME).getName())); assertTrue((TablespaceManager.get(S3_URI)) instanceof S3TableSpace); assertEquals(S3_URI, TablespaceManager.get(S3_URI).getUri().toASCIIString()); + + // Verify the tablespace for s3n prefix + assertTrue((TablespaceManager.getByName(S3N_SPACENAME)) instanceof S3TableSpace); + assertEquals(S3N_SPACENAME, (TablespaceManager.getByName(S3N_SPACENAME).getName())); + assertTrue((TablespaceManager.get(S3N_URI)) instanceof S3TableSpace); + assertEquals(S3N_URI, TablespaceManager.get(S3N_URI).getUri().toASCIIString()); + + // Verify the tablespace for s3a prefix + assertTrue((TablespaceManager.getByName(S3A_SPACENAME)) instanceof S3TableSpace); + assertEquals(S3A_SPACENAME, (TablespaceManager.getByName(S3A_SPACENAME).getName())); + assertTrue((TablespaceManager.get(S3A_URI)) instanceof S3TableSpace); + assertEquals(S3A_URI, TablespaceManager.get(S3A_URI).getUri().toASCIIString()); + } + + @Test + public void testCalculateSizeWithS3Prefix() throws Exception { + Path path = new Path(S3_URI, "test"); + assertTrue((TablespaceManager.getByName(S3_SPACENAME)) instanceof S3TableSpace); + S3TableSpace tableSpace = TablespaceManager.get(path.toUri()); + tableSpace.setAmazonS3Client(new MockAmazonS3()); + long size = tableSpace.calculateSize(path); + assertEquals(30L, size); + } + + @Test + public void testCalculateSizeWithS3NPrefix() throws Exception { + Path path = new Path(S3N_URI, "test"); + assertTrue((TablespaceManager.getByName(S3N_SPACENAME)) instanceof S3TableSpace); + S3TableSpace tableSpace = TablespaceManager.get(path.toUri()); + tableSpace.setAmazonS3Client(new MockAmazonS3()); + long size = tableSpace.calculateSize(path); + assertEquals(30L, size); } + + @Test + public void testCalculateSizeWithS3APrefix() throws Exception { + Path path = new Path(S3A_URI, "test"); + assertTrue((TablespaceManager.getByName(S3A_SPACENAME)) instanceof S3TableSpace); + S3TableSpace tableSpace = TablespaceManager.get(path.toUri()); + tableSpace.setAmazonS3Client(new MockAmazonS3()); + long size = tableSpace.calculateSize(path); + assertEquals(30L, size); + } + }