Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ OzoneVolume getVolumeDetails(String volumeName)
OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName)
throws IOException;

/**
* Get OzoneKey in S3 context.
* @param bucketName Name of the Bucket
* @param keyName Key name
* @param partNumber Multipart-upload part number
* @return {@link OzoneKey}
* @throws IOException
*/
OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
int partNumber)
throws IOException;

OzoneVolume buildOzoneVolume(OmVolumeArgs volume);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,6 @@ public OzoneKeyDetails getKeyDetails(
throws IOException {
OmKeyInfo keyInfo =
getKeyInfo(volumeName, bucketName, keyName, false);

return getOzoneKeyDetails(keyInfo);
}

Expand Down Expand Up @@ -1678,6 +1677,22 @@ public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName)
return getOzoneKeyDetails(keyInfo);
}

@Override
public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
int partNumber) throws IOException {
OmKeyInfo keyInfo = getS3KeyInfo(bucketName, keyName, false);
List<OmKeyLocationInfo> filteredKeyLocationInfo = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly().stream()
.filter(omKeyLocationInfo -> omKeyLocationInfo.getPartNumber() ==
partNumber)
.collect(Collectors.toList());
keyInfo.updateLocationInfoList(filteredKeyLocationInfo, false);
keyInfo.setDataSize(filteredKeyLocationInfo.stream()
.mapToLong(OmKeyLocationInfo::getLength)
.sum());
return getOzoneKeyDetails(keyInfo);
}

@NotNull
private OmKeyInfo getS3KeyInfo(
String bucketName, String keyName, boolean isHeadOp) throws IOException {
Expand Down
12 changes: 12 additions & 0 deletions hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ Test Multipart Upload Complete
Execute cat /tmp/part1 /tmp/part2 > /tmp/${PREFIX}-multipartKey1
Compare files /tmp/${PREFIX}-multipartKey1 /tmp/${PREFIX}-multipartKey1.result

${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 1 /tmp/${PREFIX}-multipartKey1-part1.result
Compare files /tmp/part1 /tmp/${PREFIX}-multipartKey1-part1.result

${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 2 /tmp/${PREFIX}-multipartKey1-part2.result
Compare files /tmp/part2 /tmp/${PREFIX}-multipartKey1-part2.result

Test Multipart Upload Complete Entity too small
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/multipartKey2
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
Expand Down Expand Up @@ -183,6 +189,12 @@ Test Multipart Upload Complete Invalid part errors and complete mpu with few par
Execute cat /tmp/part1 /tmp/part3 > /tmp/${PREFIX}-multipartKey3
Compare files /tmp/${PREFIX}-multipartKey3 /tmp/${PREFIX}-multipartKey3.result

${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey3 --part-number 1 /tmp/${PREFIX}-multipartKey3-part1.result
Compare files /tmp/part1 /tmp/${PREFIX}-multipartKey3-part1.result

${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey3 --part-number 3 /tmp/${PREFIX}-multipartKey3-part3.result
Compare files /tmp/part3 /tmp/${PREFIX}-multipartKey3-part3.result

Test abort Multipart upload
${result} = Execute AWSS3APICli create-multipart-upload --bucket ${BUCKET} --key ${PREFIX}/multipartKey4 --storage-class REDUCED_REDUNDANCY
${uploadID} = Execute and checkrc echo '${result}' | jq -r '.UploadId' 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone;

import org.apache.hadoop.hdds.conf.DefaultConfigManager;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest;
import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadResponse;
import org.apache.hadoop.ozone.s3.endpoint.MultipartUploadInitiateResponse;
import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.UUID;
import java.util.List;
import java.util.Base64;
import java.util.concurrent.TimeoutException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.when;

/**
* Integration test class for testing 'ranged' GET request for the part
* specified.
*/
public class TestMultipartObjectGet {

public static final Logger LOG = LoggerFactory.getLogger(
TestMultipartObjectGet.class);
private static OzoneConfiguration conf;
private static String clusterId;
private static String scmId;
private static String omServiceId;
private static String scmServiceId;
private static final String BUCKET = OzoneConsts.BUCKET;
private static final String KEY = OzoneConsts.KEY;
private static MiniOzoneHAClusterImpl cluster;
private static OzoneClient client;
private static HttpHeaders headers;
private static ContainerRequestContext context;

private static final ObjectEndpoint REST = new ObjectEndpoint();

@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omServiceId = "om-service-test";
scmServiceId = "scm-service-test";

startCluster();
client = cluster.newClient();
client.getObjectStore().createS3Bucket(BUCKET);

headers = Mockito.mock(HttpHeaders.class);
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
"STANDARD");

context = Mockito.mock(ContainerRequestContext.class);
Mockito.when(context.getUriInfo()).thenReturn(Mockito.mock(UriInfo.class));
Mockito.when(context.getUriInfo().getQueryParameters())
.thenReturn(new MultivaluedHashMap<>());

REST.setHeaders(headers);
REST.setClient(client);
REST.setOzoneConfiguration(conf);
REST.setContext(context);
}

private static void startCluster()
throws IOException, TimeoutException, InterruptedException {
OzoneManager.setTestSecureOmFlag(true);
MiniOzoneCluster.Builder builder = MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setSCMServiceId(scmServiceId)
.setOMServiceId(omServiceId)
.setScmId(scmId)
.setNumDatanodes(3)
.setNumOfStorageContainerManagers(3)
.setNumOfOzoneManagers(3);
cluster = (MiniOzoneHAClusterImpl) builder.build();
cluster.waitForClusterToBeReady();
}

@AfterClass
public static void stop() {
IOUtils.close(LOG, client);
if (cluster != null) {
cluster.stop();
}
DefaultConfigManager.clearDefaultConfigs();
}

private String initiateMultipartUpload() throws IOException, OS3Exception {
Response response = REST.initializeMultipartUpload(BUCKET, KEY);
MultipartUploadInitiateResponse multipartUploadInitiateResponse =
(MultipartUploadInitiateResponse) response.getEntity();
assertNotNull(multipartUploadInitiateResponse.getUploadID());
String uploadID = multipartUploadInitiateResponse.getUploadID();
assertEquals(200, response.getStatus());
return uploadID;
}

private CompleteMultipartUploadRequest.Part uploadPart(String uploadID,
int partNumber,
String content)
throws IOException, OS3Exception {
ByteArrayInputStream body =
new ByteArrayInputStream(content.getBytes(UTF_8));
Response response = REST.put(BUCKET, KEY, content.length(),
partNumber, uploadID, body);
assertEquals(200, response.getStatus());
assertNotNull(response.getHeaderString("ETag"));

CompleteMultipartUploadRequest.Part
part = new CompleteMultipartUploadRequest.Part();
part.seteTag(response.getHeaderString("ETag"));
part.setPartNumber(partNumber);
return part;
}

private void completeMultipartUpload(
CompleteMultipartUploadRequest completeMultipartUploadRequest,
String uploadID) throws IOException, OS3Exception {
Response response = REST.completeMultipartUpload(BUCKET, KEY, uploadID,
completeMultipartUploadRequest);
assertEquals(200, response.getStatus());

CompleteMultipartUploadResponse completeMultipartUploadResponse =
(CompleteMultipartUploadResponse) response.getEntity();
assertEquals(BUCKET, completeMultipartUploadResponse.getBucket());
assertEquals(KEY, completeMultipartUploadResponse.getKey());
assertEquals(BUCKET, completeMultipartUploadResponse.getLocation());
assertNotNull(completeMultipartUploadResponse.getETag());
}

private void getObjectMultipart(int partNumber, long bytes)
throws IOException, OS3Exception {
Response response =
REST.get(BUCKET, KEY, partNumber, null, 100, null);
assertEquals(200, response.getStatus());
assertEquals(bytes, response.getLength());
}

@Test
public void testMultipart() throws Exception {
String uploadID = initiateMultipartUpload();
List<CompleteMultipartUploadRequest.Part> partsList = new ArrayList<>();

String content1 = generateRandomContent(5);
int partNumber = 1;
CompleteMultipartUploadRequest.Part
part1 = uploadPart(uploadID, partNumber, content1);
partsList.add(part1);

String content2 = generateRandomContent(5);
partNumber = 2;
CompleteMultipartUploadRequest.Part
part2 = uploadPart(uploadID, partNumber, content2);
partsList.add(part2);

String content3 = generateRandomContent(1);
partNumber = 3;
CompleteMultipartUploadRequest.Part
part3 = uploadPart(uploadID, partNumber, content3);
partsList.add(part3);

CompleteMultipartUploadRequest completeMultipartUploadRequest = new
CompleteMultipartUploadRequest();
completeMultipartUploadRequest.setPartList(partsList);
completeMultipartUpload(completeMultipartUploadRequest, uploadID);

getObjectMultipart(0,
(content1 + content2 + content3).getBytes(UTF_8).length);
getObjectMultipart(1, content1.getBytes(UTF_8).length);
getObjectMultipart(2, content2.getBytes(UTF_8).length);
getObjectMultipart(3, content3.getBytes(UTF_8).length);
}

private static String generateRandomContent(int sizeInMB) {
int bytesToGenerate = sizeInMB * 1024 * 1024;
byte[] randomBytes = new byte[bytesToGenerate];
new SecureRandom().nextBytes(randomBytes);
return Base64.getEncoder().encodeToString(randomBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ public Response put(
public Response get(
@PathParam("bucket") String bucketName,
@PathParam("path") String keyPath,
@QueryParam("partNumber") int partNumber,
@QueryParam("uploadId") String uploadId,
@QueryParam("max-parts") @DefaultValue("1000") int maxParts,
@QueryParam("part-number-marker") String partNumberMarker)
Expand All @@ -393,8 +394,9 @@ public Response get(
partMarker, maxParts);
}

OzoneKeyDetails keyDetails = getClientProtocol()
.getS3KeyDetails(bucketName, keyPath);
OzoneKeyDetails keyDetails = (partNumber != 0) ?
getClientProtocol().getS3KeyDetails(bucketName, keyPath, partNumber) :
getClientProtocol().getS3KeyDetails(bucketName, keyPath);

isFile(keyPath, keyDetails);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName)
return objectStoreStub.getS3Volume().getBucket(bucketName).getKey(keyName);
}

@Override
public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName,
int partNumber)
throws IOException {
return objectStoreStub.getS3Volume().getBucket(bucketName).getKey(keyName);
}

@Override
public OzoneVolume buildOzoneVolume(OmVolumeArgs volume) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void setUp() throws Exception {

@Test
public void testListParts() throws Exception {
Response response = REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY,
Response response = REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 0,
uploadID, 3, "0");

ListPartsResponse listPartsResponse =
Expand All @@ -106,7 +106,7 @@ public void testListParts() throws Exception {

@Test
public void testListPartsContinuation() throws Exception {
Response response = REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY,
Response response = REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 0,
uploadID, 2, "0");
ListPartsResponse listPartsResponse =
(ListPartsResponse) response.getEntity();
Expand All @@ -115,7 +115,7 @@ public void testListPartsContinuation() throws Exception {
assertEquals(2, listPartsResponse.getPartList().size());

// Continue
response = REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, uploadID, 2,
response = REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 0, uploadID, 2,
Integer.toString(listPartsResponse.getNextPartNumberMarker()));
listPartsResponse = (ListPartsResponse) response.getEntity();

Expand All @@ -127,7 +127,7 @@ public void testListPartsContinuation() throws Exception {
@Test
public void testListPartsWithUnknownUploadID() throws Exception {
try {
REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY,
REST.get(OzoneConsts.S3_BUCKET, OzoneConsts.KEY, 0,
uploadID, 2, "0");
} catch (OS3Exception ex) {
Assert.assertEquals(S3ErrorTable.NO_SUCH_UPLOAD.getErrorMessage(),
Expand Down
Loading