Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ under the License.
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TFileType;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -53,6 +55,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
// just for multi load
public static final String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__";
public static final String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__";
@Deprecated
@SerializedName("cts3")
private boolean convertedToS3 = false;

Expand All @@ -75,42 +78,56 @@ public BrokerDesc(String name, Map<String, String> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
// Assume the storage type is BROKER by default
// If it's a multi-load broker, override the storage type to LOCAL
if (isMultiLoadBroker()) {
this.storageType = StorageBackend.StorageType.LOCAL;
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;

// Try to determine the actual storage type from properties if available
if (MapUtils.isNotEmpty(this.properties)) {
try {
// Create primary storage properties from the given configuration
this.storageProperties = StorageProperties.createPrimary(this.properties);
// Override the storage type based on property configuration
this.storageType = StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
} catch (StoragePropertiesException e) {
// Currently ignored: these properties might be broker-specific.
// Support for broker properties will be added in the future.
LOG.info("Failed to create storage properties for broker: {}, properties: {}", name, properties, e);
}
}
//only storage type is broker
if (StringUtils.isBlank(this.name) && (this.getStorageType() != StorageType.BROKER)) {
this.name = this.storageType().name();
}
}

public BrokerDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
this.name = name;
this.properties = Maps.newHashMap();
this.storageType = storageType;
if (properties != null) {
this.properties.putAll(properties);
}
this.storageType = storageType;
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;
if (MapUtils.isNotEmpty(this.properties) && StorageType.REFACTOR_STORAGE_TYPES.contains(storageType)) {
this.storageProperties = StorageProperties.createPrimary(properties);
}

}

public String getFileLocation(String location) {
return this.convertedToS3 ? BosProperties.convertPathToS3(location) : location;
public String getFileLocation(String location) throws UserException {
return (null != storageProperties) ? storageProperties.validateAndNormalizeUri(location) : location;
}

public static BrokerDesc createForStreamLoad() {
return new BrokerDesc("", StorageType.STREAM, null);
}

public boolean isMultiLoadBroker() {
return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
return StringUtils.isNotBlank(this.name) && this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
}

public TFileType getFileType() {
Expand Down Expand Up @@ -150,16 +167,18 @@ public void readFields(DataInput in) throws IOException {
final String val = Text.readString(in);
properties.put(key, val);
}
StorageBackend.StorageType st = StorageBackend.StorageType.BROKER;
String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE);
if (typeStr != null) {
if (MapUtils.isNotEmpty(properties)) {
try {
st = StorageBackend.StorageType.valueOf(typeStr);
} catch (IllegalArgumentException e) {
LOG.warn("set to BROKER, because of exception", e);
this.storageProperties = StorageProperties.createPrimary(properties);
this.storageType = StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
} catch (RuntimeException e) {
// Currently ignored: these properties might be broker-specific.
// Support for broker properties will be added in the future.
LOG.warn("Failed to create storage properties for broker: {}, properties: {}", name, properties, e);
this.storageType = StorageBackend.StorageType.BROKER;
}

}
storageType = st;
}

public static BrokerDesc read(DataInput in) throws IOException {
Expand Down
116 changes: 15 additions & 101 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,26 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.cloud.storage.RemoteBase;
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.AzureProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
import org.apache.doris.fsv2.FileSystemFactory;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileType;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -108,10 +105,6 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser {
public static final String BOS_ACCESSKEY = "bos_accesskey";
public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";

// for S3 load check
public static final List<String> PROVIDERS =
new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos", "azure"));

// mini load params
public static final String KEY_IN_PARAM_COLUMNS = "columns";
public static final String KEY_IN_PARAM_SET = "set";
Expand Down Expand Up @@ -455,8 +448,6 @@ public void analyze(Analyzer analyzer) throws UserException {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
StorageBackend.checkPath(dataDescription.getFilePaths().get(i),
brokerDesc.getStorageType(), "DATA INFILE must be specified.");
dataDescription.getFilePaths().set(i, dataDescription.getFilePaths().get(i));
}
}
Expand Down Expand Up @@ -523,31 +514,6 @@ public void analyze(Analyzer analyzer) throws UserException {
user = ConnectContext.get().getQualifiedUser();
}

private String getProviderFromEndpoint() {
Map<String, String> properties = brokerDesc.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
// S3 Provider properties should be case insensitive.
return entry.getValue().toUpperCase();
}
}
return S3Properties.S3_PROVIDER;
}

private Pair<String, String> getBucketAndObjectFromPath(String filePath) throws UserException {
String[] parts = filePath.split("\\/\\/");
if (parts.length < 2) {
throw new UserException("Invalid file path format: " + filePath);
}

String[] bucketAndObject = parts[1].split("\\/", 2);
if (bucketAndObject.length < 2) {
throw new UserException("Cannot extract bucket and object from path: " + filePath);
}

return Pair.of(bucketAndObject[0], bucketAndObject[1]);
}

public String getComment() {
return comment;
}
Expand Down Expand Up @@ -606,8 +572,9 @@ public RedirectStatus getRedirectStatus() {
private void checkEndpoint(String endpoint) throws UserException {
HttpURLConnection connection = null;
try {
SecurityChecker.getInstance().startSSRFChecking(endpoint);
URL url = new URL(endpoint);
String urlStr = "http://" + endpoint;
SecurityChecker.getInstance().startSSRFChecking(urlStr);
URL url = new URL(urlStr);
connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(10000);
connection.connect();
Expand All @@ -634,29 +601,21 @@ private void checkEndpoint(String endpoint) throws UserException {
}

public void checkS3Param() throws UserException {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
&& brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
&& brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY)
&& brokerDescProperties.containsKey(S3Properties.Env.REGION)) {
String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT);
if (brokerDesc.getFileType() != null && brokerDesc.getFileType().equals(TFileType.FILE_S3)) {

ObjectStorageProperties storageProperties = (ObjectStorageProperties) brokerDesc.getStorageProperties();
String endpoint = storageProperties.getEndpoint();
checkEndpoint(endpoint);
checkWhiteList(endpoint);
// Add default protocol if not specified
if (!endpoint.startsWith("http://") && !endpoint.startsWith("https://")) {
endpoint = "http://" + endpoint;
//should add connectivity test
boolean connectivityTest = FileSystemFactory.get(brokerDesc.getStorageProperties()).connectivityTest();
if (!connectivityTest) {
throw new UserException("Failed to access object storage, message=connectivity test failed");
}
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
if (AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) {
return;
}
checkEndpoint(endpoint);
checkAkSk();
}
}

public void checkWhiteList(String endpoint) throws UserException {
endpoint = endpoint.replaceFirst("^http://", "");
endpoint = endpoint.replaceFirst("^https://", "");
List<String> whiteList = new ArrayList<>(Arrays.asList(Config.s3_load_endpoint_white_list));
whiteList.removeIf(String::isEmpty);
if (!whiteList.isEmpty() && !whiteList.contains(endpoint)) {
Expand All @@ -665,51 +624,6 @@ public void checkWhiteList(String endpoint) throws UserException {
}
}

private void checkAkSk() throws UserException {
RemoteBase remote = null;
ObjectInfo objectInfo = null;
String curFile = null;
try {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
String provider = getProviderFromEndpoint();
for (DataDescription dataDescription : dataDescriptions) {
for (String filePath : dataDescription.getFilePaths()) {
curFile = filePath;
Pair<String, String> pair = getBucketAndObjectFromPath(filePath);
String bucket = pair.getLeft();
String object = pair.getRight();
objectInfo = new ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()),
brokerDescProperties.get(S3Properties.Env.ACCESS_KEY),
brokerDescProperties.get(S3Properties.Env.SECRET_KEY),
bucket, brokerDescProperties.get(S3Properties.Env.ENDPOINT),
brokerDescProperties.get(S3Properties.Env.REGION), "");
remote = RemoteBase.newInstance(objectInfo);
// Verify read permissions by calling headObject() on the S3 object.
// RemoteBase#headObject does not throw exception if key does not exist.
remote.headObject(object);
// Verify list permissions by calling listObjects() on the S3 bucket.
remote.listObjects(null);
remote.close();
}
}
} catch (Exception e) {
LOG.warn("Failed to access object storage, file={}, proto={}, err={}", curFile, objectInfo, e.toString());
String msg;
if (e instanceof UserException) {
msg = ((UserException) e).getDetailMessage();
} else {
msg = e.getMessage();
}
throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to access object storage, message=" + msg, e);
} finally {
if (remote != null) {
remote.close();
}
}

}

@Override
public StmtType stmtType() {
return StmtType.LOAD;
Expand Down
Loading