Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,14 @@ public boolean useSelfSplitter() {
return ret;
}

public String bindBrokerName() {
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
}
return null;
}

@Override
public Collection<DatabaseIf> getAllDbs() {
makeSureInitialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
private long lastSyncedEventId = -1L;
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
// broker name for file split and query scan.
public static final String BIND_BROKER_NAME = "broker.name";
private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed";

// -1 means file cache no ttl set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,13 @@ private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends
// Get File Status by using FileSystem API.
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
JobConf jobConf,
List<String> partitionValues) throws UserException {
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
Expand Down Expand Up @@ -419,7 +421,8 @@ private FileCacheValue loadFiles(FileCacheKey key) {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
if (key.useSelfSplitter) {
result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues());
result = getFileCache(finalLocation, inputFormat, jobConf,
key.getPartitionValues(), key.bindBrokerName);
} else {
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
Expand Down Expand Up @@ -498,23 +501,23 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) {
}

public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, true);
boolean useSelfSplitter, String bindBrokerName) {
return getFilesByPartitions(partitions, useSelfSplitter, true, bindBrokerName);
}

public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, false);
boolean useSelfSplitter, String bindBrokerName) {
return getFilesByPartitions(partitions, useSelfSplitter, false, bindBrokerName);
}

private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean useSelfSplitter, boolean withCache) {
boolean useSelfSplitter, boolean withCache, String bindBrokerName) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
p.getInputFormat(), useSelfSplitter)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
p.getInputFormat(), useSelfSplitter, bindBrokerName)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName);
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
return fileCacheKey;
}).collect(Collectors.toList());
Expand Down Expand Up @@ -592,7 +595,7 @@ public void invalidateTableCache(String dbName, String tblName) {
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
Expand All @@ -610,7 +613,7 @@ public void invalidateTableCache(String dbName, String tblName) {
* and FE will exit if some network problems occur.
* */
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
dbName, tblName, null, null, false);
dbName, tblName, null, null, false, null);
fileCacheRef.get().invalidate(fileCacheKey);
}
}
Expand All @@ -625,7 +628,7 @@ public void invalidatePartitionCache(String dbName, String tblName, String parti
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
Expand Down Expand Up @@ -771,7 +774,7 @@ public AtomicReference<LoadingCache<FileCacheKey, FileCacheValue>> getFileCacheR
}

public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, long tableId) {
boolean isFullAcid, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
try {
Expand Down Expand Up @@ -802,7 +805,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf));
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName), jobConf, bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
Expand All @@ -823,7 +827,9 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
Expand All @@ -841,7 +847,9 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
Expand Down Expand Up @@ -939,6 +947,8 @@ public static class FileCacheKey {
private String location;
// not in key
private String inputFormat;
// Broker name for file split and file scan.
private String bindBrokerName;
// Temp variable, use self file splitter or use InputFormat.getSplits.
// Will remove after self splitter is stable.
private boolean useSelfSplitter;
Expand All @@ -947,16 +957,18 @@ public static class FileCacheKey {
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;

public FileCacheKey(String location, String inputFormat, List<String> partitionValues) {
public FileCacheKey(String location, String inputFormat, List<String> partitionValues, String bindBrokerName) {
this.location = location;
this.inputFormat = inputFormat;
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
this.useSelfSplitter = true;
this.bindBrokerName = bindBrokerName;
}

public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
String inputFormat, boolean useSelfSplitter) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null);
String inputFormat, boolean useSelfSplitter,
String bindBrokerName) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName);
fileCacheKey.dummyKey = dbName + "." + tblName;
fileCacheKey.useSelfSplitter = useSelfSplitter;
return fileCacheKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -188,12 +190,17 @@ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
}
}

public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path path, JobConf jobConf) {
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat<?, ?> inputFormat,
String location, JobConf jobConf) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem) remoteFileSystem)
.isSplittable(location, inputFormat.getClass().getCanonicalName());
}

// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}

// use reflection to get isSplitable method on FileInputFormat
// ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable"
Method method = null;
Expand All @@ -209,6 +216,7 @@ public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path path, Job
if (method == null) {
return false;
}
Path path = new Path(location);
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path);
Expand Down
17 changes: 13 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public RemoteFileSystem load(FileSystemCacheKey key) {
}

private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
return FileSystemFactory.getByType(key.type, key.conf);
return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, key.bindBrokerName);
}

public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
Expand All @@ -69,11 +69,13 @@ public static class FileSystemCacheKey {
// eg: hdfs://nameservices1
private final String fsIdent;
private final JobConf conf;
private final String bindBrokerName;

public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf) {
public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf, String bindBrokerName) {
this.type = fs.first;
this.fsIdent = fs.second;
this.conf = conf;
this.bindBrokerName = bindBrokerName;
}

@Override
Expand All @@ -84,14 +86,21 @@ public boolean equals(Object obj) {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
return type.equals(((FileSystemCacheKey) obj).type)
boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
&& conf == ((FileSystemCacheKey) obj).conf;
if (bindBrokerName == null) {
return equalsWithoutBroker;
}
return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
}

@Override
public int hashCode() {
return Objects.hash(conf, fsIdent, type);
if (bindBrokerName == null) {
return Objects.hash(conf, fsIdent, type);
}
return Objects.hash(conf, fsIdent, type, bindBrokerName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public static RemoteFileSystem get(String name, StorageBackend.StorageType type,
}
}

public static Pair<FileSystemType, String> getFSIdentity(String location) {
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
FileSystemType fsType;
if (S3Util.isObjStorage(location)) {
if (bindBrokerName != null) {
fsType = FileSystemType.BROKER;
} else if (S3Util.isObjStorage(location)) {
if (S3Util.isHdfsOnOssEndpoint(location)) {
// if hdfs service is enabled on oss, use hdfs lib to access oss.
fsType = FileSystemType.DFS;
Expand All @@ -83,7 +85,8 @@ public static Pair<FileSystemType, String> getFSIdentity(String location) {
return Pair.of(fsType, fsIdent);
}

public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) {
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf,
String bindBrokerName) {
Map<String, String> properties = new HashMap<>();
conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
switch (type) {
Expand All @@ -95,6 +98,8 @@ public static RemoteFileSystem getByType(FileSystemType type, Configuration conf
return new OFSFileSystem(properties);
case JFS:
return new JFSFileSystem(properties);
case BROKER:
return new BrokerFileSystem(bindBrokerName, properties);
default:
throw new IllegalStateException("Not supported file system type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public enum FileSystemType {
DFS,
OFS,
JFS,
BROKER,
FILE
}
Loading