Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.AzureProperties;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

Expand Down Expand Up @@ -72,6 +73,12 @@ public void analyzeResourceType() throws UserException {
if (type == null) {
throw new AnalysisException("Resource type can't be null");
}

if (AzureProperties.checkAzureProviderPropertyExist(properties)) {
resourceType = ResourceType.AZURE;
return;
}

resourceType = ResourceType.fromString(type);
if (resourceType == ResourceType.UNKNOWN) {
throw new AnalysisException("Unsupported resource type: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.S3Properties;
Expand All @@ -39,9 +38,8 @@
import java.util.Optional;

public class AzureResource extends Resource {

private static final Logger LOG = LogManager.getLogger(AzureResource.class);
private Map<String, String> properties;
private Map<String, String> properties = Maps.newHashMap();

public AzureResource() {
super();
Expand All @@ -52,89 +50,84 @@ public AzureResource(String name) {
}

@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
protected void setProperties(Map<String, String> newProperties) throws DdlException {
Preconditions.checkState(newProperties != null);
// check properties
S3Properties.requiredS3PingProperties(properties);
S3Properties.requiredS3PingProperties(newProperties);
// default need check resource conf valid, so need fix ut and regression case
boolean needCheck = isNeedCheck(properties);
boolean needCheck = isNeedCheck(newProperties);
if (LOG.isDebugEnabled()) {
LOG.debug("azure info need check validity : {}", needCheck);
}

// the endpoint for ping need add uri scheme.
String pingEndpoint = properties.get(S3Properties.ENDPOINT);
String pingEndpoint = newProperties.get(S3Properties.ENDPOINT);
if (!pingEndpoint.startsWith("http://")) {
pingEndpoint = "http://" + properties.get(S3Properties.ENDPOINT);
properties.put(S3Properties.ENDPOINT, pingEndpoint);
properties.put(S3Properties.Env.ENDPOINT, pingEndpoint);
pingEndpoint = "http://" + newProperties.get(S3Properties.ENDPOINT);
newProperties.put(S3Properties.ENDPOINT, pingEndpoint);
newProperties.put(S3Properties.Env.ENDPOINT, pingEndpoint);
}
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
String ak = properties.get(S3Properties.ACCESS_KEY);
String sk = properties.get(S3Properties.SECRET_KEY);
String token = properties.get(S3Properties.SESSION_TOKEN);
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);

if (needCheck) {
String bucketName = properties.get(S3Properties.BUCKET);
String rootPath = properties.get(S3Properties.ROOT_PATH);
pingAzure(credential, bucketName, rootPath, properties);
String bucketName = newProperties.get(S3Properties.BUCKET);
String rootPath = newProperties.get(S3Properties.ROOT_PATH);
pingAzure(bucketName, rootPath, newProperties);
}
// optional
S3Properties.optionalS3Property(properties);
this.properties = properties;
S3Properties.optionalS3Property(newProperties);
this.properties = newProperties;
}

private static void pingAzure(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) throws DdlException {
AzureFileSystem fileSystem = new AzureFileSystem(properties);
String testFile = rootPath + "/test-object-valid.txt";
private static void pingAzure(String bucketName, String rootPath,
Map<String, String> newProperties) throws DdlException {
if (FeConstants.runningUnitTest) {
return;
}

String testFile = "azure://" + bucketName + "/" + rootPath + "/test-object-valid.txt";
AzureFileSystem fileSystem = new AzureFileSystem(newProperties);
Status status = fileSystem.exists(testFile);
if (status != Status.OK || status.getErrCode() != Status.ErrCode.NOT_FOUND) {
if (status != Status.OK && status.getErrCode() != Status.ErrCode.NOT_FOUND) {
throw new DdlException(
"ping azure failed(head), status: " + status + ", properties: " + new PrintableMap<>(
properties, "=", true, false, true, false));
newProperties, "=", true, false, true, false));
}

LOG.info("success to ping azure");
}

@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
public void modifyProperties(Map<String, String> newProperties) throws DdlException {
if (references.containsValue(ReferenceType.POLICY)) {
// can't change, because remote fs use it info to find data.
List<String> cantChangeProperties = Arrays.asList(S3Properties.ENDPOINT, S3Properties.REGION,
S3Properties.ROOT_PATH, S3Properties.BUCKET, S3Properties.Env.ENDPOINT, S3Properties.Env.REGION,
S3Properties.Env.ROOT_PATH, S3Properties.Env.BUCKET);
Optional<String> any = cantChangeProperties.stream().filter(properties::containsKey).findAny();
Optional<String> any = cantChangeProperties.stream().filter(newProperties::containsKey).findAny();
if (any.isPresent()) {
throw new DdlException("current not support modify property : " + any.get());
}
}
// compatible with old version, Need convert if modified properties map uses old properties.
S3Properties.convertToStdProperties(properties);
boolean needCheck = isNeedCheck(properties);
S3Properties.convertToStdProperties(newProperties);
boolean needCheck = isNeedCheck(newProperties);
if (LOG.isDebugEnabled()) {
LOG.debug("s3 info need check validity : {}", needCheck);
}
if (needCheck) {
S3Properties.requiredS3PingProperties(this.properties);
Map<String, String> changedProperties = new HashMap<>(this.properties);
changedProperties.putAll(properties);
String bucketName = properties.getOrDefault(S3Properties.BUCKET, this.properties.get(S3Properties.BUCKET));
String rootPath = properties.getOrDefault(S3Properties.ROOT_PATH,
changedProperties.putAll(newProperties);
String bucketName = newProperties.getOrDefault(S3Properties.BUCKET,
this.properties.get(S3Properties.BUCKET));
String rootPath = newProperties.getOrDefault(S3Properties.ROOT_PATH,
this.properties.get(S3Properties.ROOT_PATH));

pingAzure(getS3PingCredentials(changedProperties), bucketName, rootPath, changedProperties);
pingAzure(bucketName, rootPath, changedProperties);
}

// modify properties
writeLock();
for (Map.Entry<String, String> kv : properties.entrySet()) {
for (Map.Entry<String, String> kv : newProperties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
if (kv.getKey().equals(S3Properties.Env.TOKEN)
|| kv.getKey().equals(S3Properties.SESSION_TOKEN)) {
Expand All @@ -143,19 +136,7 @@ public void modifyProperties(Map<String, String> properties) throws DdlException
}
++version;
writeUnlock();
super.modifyProperties(properties);
}

private CloudCredentialWithEndpoint getS3PingCredentials(Map<String, String> properties) {
String ak = properties.getOrDefault(S3Properties.ACCESS_KEY, this.properties.get(S3Properties.ACCESS_KEY));
String sk = properties.getOrDefault(S3Properties.SECRET_KEY, this.properties.get(S3Properties.SECRET_KEY));
String token = properties.getOrDefault(S3Properties.SESSION_TOKEN,
this.properties.get(S3Properties.SESSION_TOKEN));
String endpoint = properties.getOrDefault(S3Properties.ENDPOINT, this.properties.get(S3Properties.ENDPOINT));
String pingEndpoint = "http://" + endpoint;
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk, token);
super.modifyProperties(newProperties);
}

private boolean isNeedCheck(Map<String, String> newProperties) {
Expand All @@ -169,7 +150,7 @@ private boolean isNeedCheck(Map<String, String> newProperties) {

@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
return Maps.newHashMap(this.properties);
}

@Override
Expand All @@ -178,7 +159,7 @@ protected void getProcNodeData(BaseProcResult result) {
result.addRow(Lists.newArrayList(name, lowerCaseType, "id", String.valueOf(id)));
readLock();
result.addRow(Lists.newArrayList(name, lowerCaseType, "version", String.valueOf(version)));
for (Map.Entry<String, String> entry : properties.entrySet()) {
for (Map.Entry<String, String> entry : this.properties.entrySet()) {
if (PrintableMap.HIDDEN_KEY.contains(entry.getKey())) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ protected void setProperties(Map<String, String> properties) throws DdlException

private static void pingS3(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) throws DdlException {
String bucket = "s3://" + bucketName + "/";
S3FileSystem fileSystem = new S3FileSystem(properties);
String testFile = bucket + rootPath + "/test-object-valid.txt";
String testFile = "s3://" + bucketName + "/" + rootPath + "/test-object-valid.txt";
String content = "doris will be better";
if (FeConstants.runningUnitTest) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import com.google.common.base.Strings;
import com.google.protobuf.TextFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -38,7 +36,6 @@
import java.util.stream.IntStream;

public abstract class StorageVault {
private static final Logger LOG = LogManager.getLogger(StorageVault.class);
public static final String REFERENCE_SPLIT = "@";
public static final String INCLUDE_DATABASE_LIST = "include_database_list";
public static final String EXCLUDE_DATABASE_LIST = "exclude_database_list";
Expand Down Expand Up @@ -149,6 +146,7 @@ public void setId(String id) {
if (!stmt.getProperties().containsKey(PropertyConverter.USE_PATH_STYLE)) {
stmt.getProperties().put(PropertyConverter.USE_PATH_STYLE, "true");
}

CreateResourceStmt resourceStmt =
new CreateResourceStmt(false, ifNotExists, name, stmt.getProperties());
resourceStmt.analyzeResourceType();
Expand Down Expand Up @@ -215,6 +213,10 @@ public static List<String> convertToShowStorageVaultProperties(Cloud.StorageVaul
builder.mergeFrom(vault.getObjInfo());
builder.clearId();
builder.setSk("xxxxxxx");
if (!vault.getObjInfo().hasUsePathStyle()) {
// There is no `use_path_style` field in old version, think `use_path_style` false
builder.setUsePathStyle(false);
}
row.add(printer.shortDebugString(builder));
}
row.add("false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public class S3URI {
public static final String SCHEME_DELIM = "://";
public static final String PATH_DELIM = "/";
private static final Set<String> VALID_SCHEMES = ImmutableSet.of("http", "https", "s3", "s3a", "s3n",
"bos", "oss", "cos", "cosn", "obs");
"bos", "oss", "cos", "cosn", "obs", "azure");

private static final Set<String> OS_SCHEMES = ImmutableSet.of("s3", "s3a", "s3n",
"bos", "oss", "cos", "cosn", "obs");
"bos", "oss", "cos", "cosn", "obs", "azure");

private URI uri;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public Status headObject(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
BlobClient blobClient = getClient().getBlobContainerClient(uri.getBucket()).getBlobClient(uri.getKey());
BlobProperties properties = blobClient.getProperties();
LOG.info("head file {} success: {}", remotePath, properties.toString());
LOG.info("headObject remotePath:{} bucket:{} key:{} properties:{}",
remotePath, uri.getBucket(), uri.getKey(), blobClient.getProperties());
return Status.OK;
} catch (BlobStorageException e) {
if (e.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

public class AzureFileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(AzureFileSystem.class);

public AzureFileSystem(Map<String, String> properties) {
super(StorageType.AZURE.name(), StorageType.S3, new AzureObjStorage(properties));
initFsProperties();
Expand Down
Loading