Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@

import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.obj.AzureObjStorage;
import org.apache.doris.fs.obj.ObjStorage;
import org.apache.doris.fs.obj.RemoteObjects;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -78,21 +80,57 @@ protected void setProperties(Map<String, String> newProperties) throws DdlExcept
this.properties = newProperties;
}

private static void pingAzure(String bucketName, String rootPath,
protected static void pingAzure(String bucketName, String rootPath,
Map<String, String> newProperties) throws DdlException {
if (FeConstants.runningUnitTest) {
return;

Long timestamp = System.currentTimeMillis();
String testObj = "azure://" + bucketName + "/" + rootPath
+ "/doris-test-object-valid-" + timestamp.toString() + ".txt";

byte[] contentData = new byte[2 * ObjStorage.CHUNK_SIZE];
Arrays.fill(contentData, (byte) 'A');
AzureObjStorage azureObjStorage = new AzureObjStorage(newProperties);

Status status = azureObjStorage.putObject(testObj, new ByteArrayInputStream(contentData), contentData.length);
if (!Status.OK.equals(status)) {
throw new DdlException(
"ping azure failed(put), status: " + status + ", properties: " + new PrintableMap<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to convert the raw message to what user can understand.
e.g.
if status is 403, return "failed try to test to put object, lack of permission of PUT"
if status is connection refused, return "failed to connect to azure, please check your connection or endpoint"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to convert the raw message to what user can understand. e.g. if status is 403, return "failed try to test to put object, lack of permission of PUT" if status is connection refused, return "failed to connect to azure, please check your connection or endpoint"

it is not necessary, keep the same as before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to convert the raw message to what user can understand. e.g. if status is 403, return "failed try to test to put object, lack of permission of PUT" if status is connection refused, return "failed to connect to azure, please check your connection or endpoint"

it is not necessary, keep the same as before

Can you show me the code, what you expect to deal with

newProperties, "=", true, false, true, false));
}

String testFile = "azure://" + bucketName + "/" + rootPath + "/test-object-valid.txt";
AzureFileSystem fileSystem = new AzureFileSystem(newProperties);
Status status = fileSystem.exists(testFile);
if (status != Status.OK && status.getErrCode() != Status.ErrCode.NOT_FOUND) {
status = azureObjStorage.headObject(testObj);
if (!Status.OK.equals(status)) {
throw new DdlException(
"ping azure failed(head), status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}
LOG.info("success to ping azure");

RemoteObjects remoteObjects = azureObjStorage.listObjects(testObj, null);
LOG.info("remoteObjects: {}", remoteObjects);
Preconditions.checkArgument(remoteObjects.getObjectList().size() == 1, "remoteObjects.size() must equal 1");

status = azureObjStorage.deleteObject(testObj);
if (!Status.OK.equals(status)) {
throw new DdlException(
"ping azure failed(delete), status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}

status = azureObjStorage.multipartUpload(testObj,
new ByteArrayInputStream(contentData), contentData.length);
if (!Status.OK.equals(status)) {
throw new DdlException(
"ping azure failed(multiPartPut), status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}

status = azureObjStorage.deleteObject(testObj);
if (!Status.OK.equals(status)) {
throw new DdlException(
"ping azure failed(delete), status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false));
}
LOG.info("Success to ping azure blob storage.");
}

@Override
Expand Down
84 changes: 53 additions & 31 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.fs.obj.ObjStorage;
import org.apache.doris.fs.obj.RemoteObjects;
import org.apache.doris.fs.obj.S3ObjStorage;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand All @@ -33,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -102,45 +104,65 @@ protected void setProperties(Map<String, String> properties) throws DdlException
}
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
String ak = properties.get(S3Properties.ACCESS_KEY);
String sk = properties.get(S3Properties.SECRET_KEY);
String token = properties.get(S3Properties.SESSION_TOKEN);
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);

if (needCheck) {
String bucketName = properties.get(S3Properties.BUCKET);
String rootPath = properties.get(S3Properties.ROOT_PATH);
pingS3(credential, bucketName, rootPath, properties);
pingS3(bucketName, rootPath, properties);
}
// optional
S3Properties.optionalS3Property(properties);
this.properties = properties;
}

private static void pingS3(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) throws DdlException {
S3FileSystem fileSystem = new S3FileSystem(properties);
String testFile = "s3://" + bucketName + "/" + rootPath + "/test-object-valid.txt";
String content = "doris will be better";
if (FeConstants.runningUnitTest) {
return;
protected static void pingS3(String bucketName, String rootPath, Map<String, String> newProperties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments to this section like AzureResource.java

throws DdlException {

Long timestamp = System.currentTimeMillis();
String prefix = "s3://" + bucketName + "/" + rootPath;
String testObj = prefix + "/doris-test-object-valid-" + timestamp.toString() + ".txt";

byte[] contentData = new byte[2 * ObjStorage.CHUNK_SIZE];
Arrays.fill(contentData, (byte) 'A');
S3ObjStorage s3ObjStorage = new S3ObjStorage(newProperties);

Status status = s3ObjStorage.putObject(testObj, new ByteArrayInputStream(contentData), contentData.length);
if (!Status.OK.equals(status)) {
String errMsg = "pingS3 failed(put),"
+ " please check your endpoint, ak/sk or permissions(put/head/delete/list/multipartUpload),"
+ " status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false);
throw new DdlException(errMsg);
}
Status status = Status.OK;
try {
status = fileSystem.directUpload(content, testFile);
if (status != Status.OK) {
throw new DdlException(
"ping s3 failed(upload), status: " + status + ", properties: " + new PrintableMap<>(
properties, "=", true, false, true, false));
}
} finally {
if (status.ok()) {
Status delete = fileSystem.delete(testFile);
if (delete != Status.OK) {
LOG.warn("delete test file failed, status: {}, properties: {}", delete, new PrintableMap<>(
properties, "=", true, false, true, false));
}
}

status = s3ObjStorage.headObject(testObj);
if (!Status.OK.equals(status)) {
String errMsg = "pingS3 failed(head),"
+ " please check your endpoint, ak/sk or permissions(put/head/delete/list/multipartUpload),"
+ " status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false);
throw new DdlException(errMsg);
}

RemoteObjects remoteObjects = s3ObjStorage.listObjects(testObj, null);
LOG.info("remoteObjects: {}", remoteObjects);

status = s3ObjStorage.multipartUpload(testObj, new ByteArrayInputStream(contentData), contentData.length);
if (!Status.OK.equals(status)) {
String errMsg = "pingS3 failed(multipartUpload),"
+ " please check your endpoint, ak/sk or permissions(put/head/delete/list/multipartUpload),"
+ " status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false);
throw new DdlException(errMsg);
}

status = s3ObjStorage.deleteObject(testObj);
if (!Status.OK.equals(status)) {
String errMsg = "pingS3 failed(delete),"
+ " please check your endpoint, ak/sk or permissions(put/head/delete/list/multipartUpload),"
+ " status: " + status + ", properties: " + new PrintableMap<>(
newProperties, "=", true, false, true, false);
throw new DdlException(errMsg);
}

LOG.info("success to ping s3");
Expand Down Expand Up @@ -172,7 +194,7 @@ public void modifyProperties(Map<String, String> properties) throws DdlException
String rootPath = properties.getOrDefault(S3Properties.ROOT_PATH,
this.properties.get(S3Properties.ROOT_PATH));

pingS3(getS3PingCredentials(changedProperties), bucketName, rootPath, changedProperties);
pingS3(bucketName, rootPath, changedProperties);
}

// modify properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,27 @@
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;

public class AzureObjStorage implements ObjStorage<BlobServiceClient> {
private static final Logger LOG = LogManager.getLogger(AzureObjStorage.class);
Expand Down Expand Up @@ -165,10 +169,12 @@ public Status getObject(String remoteFilePath, File localFile) {
LOG.info("get file " + remoteFilePath + " success: " + properties.toString());
return Status.OK;
} catch (BlobStorageException e) {
LOG.warn("{} getObject exception:", remoteFilePath, e);
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from azure error: " + e.getServiceMessage());
} catch (UserException e) {
LOG.warn("{} getObject exception:", remoteFilePath, e);
return new Status(Status.ErrCode.COMMON_ERROR, "getObject "
+ remoteFilePath + " failed: " + e.getMessage());
}
Expand All @@ -182,10 +188,12 @@ public Status putObject(String remotePath, @Nullable InputStream content, long c
blobClient.upload(content, contentLength);
return Status.OK;
} catch (BlobStorageException e) {
LOG.warn("{} putObject exception:", remotePath, e);
return new Status(
Status.ErrCode.COMMON_ERROR,
"Error occurred while copying the blob:: " + e.getServiceMessage());
} catch (UserException e) {
LOG.warn("{} putObject exception:", remotePath, e);
return new Status(Status.ErrCode.COMMON_ERROR, "putObject "
+ remotePath + " failed: " + e.getMessage());
}
Expand Down Expand Up @@ -276,8 +284,8 @@ public Status copyObject(String origFilePath, String destFilePath) {
@Override
public RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException {
try {
ListBlobsOptions options = new ListBlobsOptions().setPrefix(remotePath);
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
ListBlobsOptions options = new ListBlobsOptions().setPrefix(uri.getKey());
PagedIterable<BlobItem> pagedBlobs = getClient().getBlobContainerClient(uri.getBucket())
.listBlobs(options, continuationToken, null);
PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next();
Expand Down Expand Up @@ -408,4 +416,41 @@ public PagedResponse<BlobItem> getPagedBlobItems(BlobContainerClient client, Lis
PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, newContinuationToken, null);
return pagedBlobs.iterableByPage().iterator().next();
}


public Status multipartUpload(String remotePath, @Nullable InputStream inputStream, long totalBytes) {
Status st = Status.OK;
long uploadedBytes = 0;
int bytesRead = 0;
byte[] buffer = new byte[CHUNK_SIZE];
List<String> blockIds = new ArrayList<>();
BlockBlobClient blockBlobClient = null;


try {
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
blockBlobClient = getClient().getBlobContainerClient(uri.getBucket())
.getBlobClient(uri.getKey()).getBlockBlobClient();
while (uploadedBytes < totalBytes && (bytesRead = inputStream.read(buffer)) != -1) {
String blockId = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes());
blockIds.add(blockId);
blockBlobClient.stageBlock(blockId, new ByteArrayInputStream(buffer, 0, bytesRead), bytesRead);
uploadedBytes += bytesRead;
}
blockBlobClient.commitBlockList(blockIds);
} catch (Exception e) {
LOG.warn("remotePath:{}, ", remotePath, e);
st = new Status(Status.ErrCode.COMMON_ERROR, "Failed to multipartUpload " + remotePath
+ " reason: " + e.getMessage());

if (blockBlobClient != null) {
try {
blockBlobClient.delete();
} catch (Exception e1) {
LOG.warn("abort multipartUpload failed", e1);
}
}
}
return st;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
* @param <C> cloud SDK Client
*/
public interface ObjStorage<C> {

// CHUNK_SIZE for multi part upload
public static final int CHUNK_SIZE = 5 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment what is chunk_size and what it is for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment what is chunk_size and what it is for

done


C getClient() throws UserException;

Triple<String, String, String> getStsToken() throws DdlException;
Expand Down
Loading
Loading