diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 27bd524c1eb082..1ed6bd027aaa64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -592,6 +592,14 @@ public boolean useSelfSplitter() { return ret; } + public String bindBrokerName() { + Map properties = catalogProperty.getProperties(); + if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + return properties.get(HMSExternalCatalog.BIND_BROKER_NAME); + } + return null; + } + @Override public Collection getAllDbs() { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 47562ebb1f1a5f..6e3543dfccef94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog { private long lastSyncedEventId = -1L; public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter"; public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second"; + // broker name for file split and query scan. + public static final String BIND_BROKER_NAME = "broker.name"; private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; // -1 means file cache no ttl set diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 850619cb246430..bac891eb920225 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -353,11 +353,13 @@ private Map loadPartitions(Iterable inputFormat, JobConf jobConf, - List partitionValues) throws UserException { + List partitionValues, + String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); - result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity( + location, bindBrokerName), jobConf, bindBrokerName)); + result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); try { // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -419,7 +421,8 @@ private FileCacheValue loadFiles(FileCacheKey key) { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues()); + result = getFileCache(finalLocation, inputFormat, jobConf, + key.getPartitionValues(), key.bindBrokerName); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); @@ -498,23 +501,23 @@ public HivePartitionValues getPartitionValues(PartitionValueCacheKey key) { } public List getFilesByPartitionsWithCache(List partitions, - boolean useSelfSplitter) { - return getFilesByPartitions(partitions, useSelfSplitter, true); + boolean useSelfSplitter, String bindBrokerName) { + return getFilesByPartitions(partitions, useSelfSplitter, true, bindBrokerName); } public List getFilesByPartitionsWithoutCache(List partitions, - boolean useSelfSplitter) { - return getFilesByPartitions(partitions, useSelfSplitter, false); + boolean useSelfSplitter, String bindBrokerName) { + return getFilesByPartitions(partitions, useSelfSplitter, false, bindBrokerName); } private List getFilesByPartitions(List partitions, - boolean useSelfSplitter, boolean withCache) { + boolean useSelfSplitter, boolean withCache, String bindBrokerName) { long start = System.currentTimeMillis(); List keys = partitions.stream().map(p -> { FileCacheKey fileCacheKey = p.isDummyPartition() ? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(), - p.getInputFormat(), useSelfSplitter) - : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues()); + p.getInputFormat(), useSelfSplitter, bindBrokerName) + : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName); fileCacheKey.setUseSelfSplitter(useSelfSplitter); return fileCacheKey; }).collect(Collectors.toList()); @@ -592,7 +595,7 @@ public void invalidateTableCache(String dbName, String tblName) { HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), - null, partition.getPartitionValues())); + null, partition.getPartitionValues(), null)); partitionCache.invalidate(partKey); } } @@ -610,7 +613,7 @@ public void invalidateTableCache(String dbName, String tblName) { * and FE will exit if some network problems occur. * */ FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey( - dbName, tblName, null, null, false); + dbName, tblName, null, null, false, null); fileCacheRef.get().invalidate(fileCacheKey); } } @@ -625,7 +628,7 @@ public void invalidatePartitionCache(String dbName, String tblName, String parti HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), - null, partition.getPartitionValues())); + null, partition.getPartitionValues(), null)); partitionCache.invalidate(partKey); } } @@ -771,7 +774,7 @@ public AtomicReference> getFileCacheR } public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, - boolean isFullAcid, long tableId) { + boolean isFullAcid, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); try { @@ -802,7 +805,8 @@ public List getFilesByTransaction(List partitions String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf)); + FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(), + bindBrokerName), jobConf, bindBrokerName)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { if (status.getErrCode() == ErrCode.NOT_FOUND) { @@ -823,7 +827,9 @@ public List getFilesByTransaction(List partitions for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { String location = delta.getPath().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); + new FileSystemCache.FileSystemCacheKey( + FileSystemFactory.getFSIdentity(location, bindBrokerName), + jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); if (delta.isDeleteDelta()) { List deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter( @@ -841,7 +847,9 @@ public List getFilesByTransaction(List partitions if (directory.getBaseDirectory() != null) { String location = directory.getBaseDirectory().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); + new FileSystemCache.FileSystemCacheKey( + FileSystemFactory.getFSIdentity(location, bindBrokerName), + jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); locatedFiles.files().stream().filter( f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) @@ -939,6 +947,8 @@ public static class FileCacheKey { private String location; // not in key private String inputFormat; + // Broker name for file split and file scan. + private String bindBrokerName; // Temp variable, use self file splitter or use InputFormat.getSplits. // Will remove after self splitter is stable. private boolean useSelfSplitter; @@ -947,16 +957,18 @@ public static class FileCacheKey { // partitionValues would be ["part1", "part2"] protected List partitionValues; - public FileCacheKey(String location, String inputFormat, List partitionValues) { + public FileCacheKey(String location, String inputFormat, List partitionValues, String bindBrokerName) { this.location = location; this.inputFormat = inputFormat; this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues; this.useSelfSplitter = true; + this.bindBrokerName = bindBrokerName; } public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location, - String inputFormat, boolean useSelfSplitter) { - FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null); + String inputFormat, boolean useSelfSplitter, + String bindBrokerName) { + FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName); fileCacheKey.dummyKey = dbName + "." + tblName; fileCacheKey.useSelfSplitter = useSelfSplitter; return fileCacheKey; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index 4bf01910f826ff..deb048b59439ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -24,6 +24,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.BrokerFileSystem; +import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -188,12 +190,17 @@ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) { } } - public static boolean isSplittable(InputFormat inputFormat, Path path, JobConf jobConf) { + public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat inputFormat, + String location, JobConf jobConf) throws UserException { + if (remoteFileSystem instanceof BrokerFileSystem) { + return ((BrokerFileSystem) remoteFileSystem) + .isSplittable(location, inputFormat.getClass().getCanonicalName()); + } + // ORC uses a custom InputFormat but is always splittable if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { return true; } - // use reflection to get isSplitable method on FileInputFormat // ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable" Method method = null; @@ -209,6 +216,7 @@ public static boolean isSplittable(InputFormat inputFormat, Path path, Job if (method == null) { return false; } + Path path = new Path(location); try { method.setAccessible(true); return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index edc746ebe24e53..7946dd5e8a7cea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -53,7 +53,7 @@ public RemoteFileSystem load(FileSystemCacheKey key) { } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { - return FileSystemFactory.getByType(key.type, key.conf); + return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, key.bindBrokerName); } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { @@ -69,11 +69,13 @@ public static class FileSystemCacheKey { // eg: hdfs://nameservices1 private final String fsIdent; private final JobConf conf; + private final String bindBrokerName; - public FileSystemCacheKey(Pair fs, JobConf conf) { + public FileSystemCacheKey(Pair fs, JobConf conf, String bindBrokerName) { this.type = fs.first; this.fsIdent = fs.second; this.conf = conf; + this.bindBrokerName = bindBrokerName; } @Override @@ -84,14 +86,21 @@ public boolean equals(Object obj) { if (!(obj instanceof FileSystemCacheKey)) { return false; } - return type.equals(((FileSystemCacheKey) obj).type) + boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type) && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent) && conf == ((FileSystemCacheKey) obj).conf; + if (bindBrokerName == null) { + return equalsWithoutBroker; + } + return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName); } @Override public int hashCode() { - return Objects.hash(conf, fsIdent, type); + if (bindBrokerName == null) { + return Objects.hash(conf, fsIdent, type); + } + return Objects.hash(conf, fsIdent, type, bindBrokerName); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index 3837a7eb95be50..e54a73bbff3062 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -56,9 +56,11 @@ public static RemoteFileSystem get(String name, StorageBackend.StorageType type, } } - public static Pair getFSIdentity(String location) { + public static Pair getFSIdentity(String location, String bindBrokerName) { FileSystemType fsType; - if (S3Util.isObjStorage(location)) { + if (bindBrokerName != null) { + fsType = FileSystemType.BROKER; + } else if (S3Util.isObjStorage(location)) { if (S3Util.isHdfsOnOssEndpoint(location)) { // if hdfs service is enabled on oss, use hdfs lib to access oss. fsType = FileSystemType.DFS; @@ -83,7 +85,8 @@ public static Pair getFSIdentity(String location) { return Pair.of(fsType, fsIdent); } - public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) { + public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf, + String bindBrokerName) { Map properties = new HashMap<>(); conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); switch (type) { @@ -95,6 +98,8 @@ public static RemoteFileSystem getByType(FileSystemType type, Configuration conf return new OFSFileSystem(properties); case JFS: return new JFSFileSystem(properties); + case BROKER: + return new BrokerFileSystem(bindBrokerName, properties); default: throw new IllegalStateException("Not supported file system type: " + type); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java index 5ddea01174441b..018130f0c14d58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java @@ -22,5 +22,6 @@ public enum FileSystemType { DFS, OFS, JFS, + BROKER, FILE } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index cb87150928869f..ef8d484bda99b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -24,8 +24,10 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.operations.BrokerFileOperations; import org.apache.doris.fs.operations.OpParams; import org.apache.doris.service.FrontendOptions; @@ -34,6 +36,8 @@ import org.apache.doris.thrift.TBrokerDeletePathRequest; import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerIsSplittableRequest; +import org.apache.doris.thrift.TBrokerIsSplittableResponse; import org.apache.doris.thrift.TBrokerListPathRequest; import org.apache.doris.thrift.TBrokerListResponse; import org.apache.doris.thrift.TBrokerOperationStatus; @@ -65,6 +69,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -556,6 +561,88 @@ public Status delete(String remotePath) { return Status.OK; } + @Override + public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { + // get a proper broker + Pair pair = getBroker(); + if (pair == null) { + throw new UserException("failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // invoke broker 'listLocatedFiles' interface + boolean needReturn = true; + try { + TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, + recursive, properties); + req.setOnlyFiles(onlyFiles); + TBrokerListResponse response = client.listLocatedFiles(req); + TBrokerOperationStatus operationStatus = response.getOpStatus(); + if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to listLocatedFiles, remote path: " + remotePath + ". msg: " + + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } + List result = new ArrayList<>(); + List fileStatus = response.getFiles(); + for (TBrokerFileStatus tFile : fileStatus) { + org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path); + RemoteFile file = new RemoteFile(path.getName(), path, !tFile.isDir, tFile.isDir, tFile.size, + tFile.getBlockSize(), tFile.getModificationTime(), null /* blockLocations is null*/); + result.add(file); + } + LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result); + return new RemoteFiles(result); + } catch (TException e) { + needReturn = false; + throw new UserException("failed to listLocatedFiles, remote path: " + + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + } + + public boolean isSplittable(String remotePath, String inputFormat) throws UserException { + // get a proper broker + Pair pair = getBroker(); + if (pair == null) { + throw new UserException("failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // invoke 'isSplittable' interface + boolean needReturn = true; + try { + TBrokerIsSplittableRequest req = new TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE) + .setPath(remotePath).setInputFormat(inputFormat).setProperties(properties); + TBrokerIsSplittableResponse response = client.isSplittable(req); + TBrokerOperationStatus operationStatus = response.getOpStatus(); + if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to get path isSplittable, remote path: " + remotePath + ". msg: " + + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } + boolean result = response.isSplittable(); + LOG.info("finished to get path isSplittable, remote path {} with format {}, isSplittable: {}", + remotePath, inputFormat, result); + return result; + } catch (TException e) { + needReturn = false; + throw new UserException("failed to get path isSplittable, remote path: " + + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + } + // List files in remotePath @Override public Status list(String remotePath, List result, boolean fileNameOnly) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 1ba77fa5f9c09f..d41418104871a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -196,8 +196,14 @@ protected List getSplits() throws UserException { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter(); + String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); + if (bindBrokerName != null && useSelfSplitter == false) { + // useSelfSplitter must be true if bindBrokerName is set. + throw new UserException(HMSExternalCatalog.ENABLE_SELF_SPLITTER + " should be true if " + + HMSExternalCatalog.BIND_BROKER_NAME + " is set"); + } List allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter); + getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter, bindBrokerName); LOG.debug("get #{} files for table: {}.{}, cost: {} ms", allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); return allFiles; @@ -210,12 +216,13 @@ protected List getSplits() throws UserException { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles, boolean useSelfSplitter) throws IOException { + List allFiles, boolean useSelfSplitter, + String bindBrokerName) throws IOException { List fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions); + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); } else { - fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter); + fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter, bindBrokerName); } if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); @@ -287,7 +294,8 @@ private List selectFiles(List return fileList.subList(0, index); } - private List getFileSplitByTransaction(HiveMetaStoreCache cache, List partitions) { + private List getFileSplitByTransaction(HiveMetaStoreCache cache, List partitions, + String bindBrokerName) { for (HivePartition partition : partitions) { if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) { // this is unpartitioned table. @@ -297,7 +305,8 @@ private List getFileSplitByTransaction(HiveMetaStoreCache cache, } ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - return cache.getFilesByTransaction(partitions, validWriteIds, hiveTransaction.isFullAcid(), hmsTable.getId()); + return cache.getFilesByTransaction(partitions, validWriteIds, + hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName); } @Override @@ -319,6 +328,10 @@ protected TFileType getLocationType() throws UserException { @Override protected TFileType getLocationType(String location) throws UserException { + String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); + if (bindBrokerName != null) { + return TFileType.FILE_BROKER; + } return getTFileType(location).orElseThrow(() -> new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 927e556c91af67..f2c77026312d6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -702,7 +702,8 @@ public static List getFilesForPartitions( table.getRemoteTable().getSd().getLocation(), null)); } // Get files for all partitions. - return cache.getFilesByPartitionsWithoutCache(hivePartitions, true); + String bindBrokerName = table.getCatalog().bindBrokerName(); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, true, bindBrokerName); } /** diff --git a/fs_brokers/apache_hdfs_broker/pom.xml b/fs_brokers/apache_hdfs_broker/pom.xml index bbd58e5d5d4240..2cb8d892dee7f6 100644 --- a/fs_brokers/apache_hdfs_broker/pom.xml +++ b/fs_brokers/apache_hdfs_broker/pom.xml @@ -69,9 +69,10 @@ under the License. 1.8 2.18.0 github - 2.10.2 + 3.3.6 4.1.65.Final hadoop2-2.2.15 + 1.0.1 @@ -224,6 +225,29 @@ under the License. + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + javax.servlet + servlet-api + + + io.netty + netty + + + + + + org.apache.doris + hive-catalog-shade + ${doris.hive.catalog.shade.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 55ba457404a01e..22be82e34c2edc 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -25,17 +25,19 @@ import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; -import org.apache.hadoop.fs.CommonConfigurationKeys; import java.io.File; import java.io.FileNotFoundException; @@ -1061,6 +1063,49 @@ public BrokerFileSystem getGooseFSFileSystem(String path, Map pr } } + public List listLocatedFiles(String path, boolean onlyFiles, + boolean recursive, Map properties) { + List resultFileStatus = null; + BrokerFileSystem fileSystem = getFileSystem(path, properties); + Path locatedPath = new Path(path); + try { + FileSystem innerFileSystem = fileSystem.getDFSFileSystem(); + RemoteIterator locatedFiles = onlyFiles ? innerFileSystem.listFiles(locatedPath, recursive) + : innerFileSystem.listLocatedStatus(locatedPath); + return getFileLocations(locatedFiles); + } catch (FileNotFoundException e) { + logger.info("file not found: " + e.getMessage()); + throw new BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND, + e, "file not found"); + } catch (Exception e) { + logger.error("errors while get file status ", e); + fileSystem.closeFileSystem(); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + e, "unknown error when listLocatedFiles"); + } + } + + private List getFileLocations(RemoteIterator locatedFiles) throws IOException { + List locations = new ArrayList<>(); + while (locatedFiles.hasNext()) { + LocatedFileStatus fileStatus = locatedFiles.next(); + TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus(); + brokerFileStatus.setPath(fileStatus.getPath().toString()); + brokerFileStatus.setIsDir(fileStatus.isDirectory()); + if (fileStatus.isDirectory()) { + brokerFileStatus.setIsSplitable(false); + brokerFileStatus.setSize(-1); + } else { + brokerFileStatus.setSize(fileStatus.getLen()); + brokerFileStatus.setIsSplitable(true); + } + brokerFileStatus.setModificationTime(fileStatus.getModificationTime()); + brokerFileStatus.setBlockSize(fileStatus.getBlockSize()); + locations.add(brokerFileStatus); + } + return locations; + } + public List listPath(String path, boolean fileNameOnly, Map properties) { List resultFileStatus = null; WildcardURI pathUri = new WildcardURI(path); @@ -1282,13 +1327,7 @@ public void pwrite(TBrokerFD fd, long offset, byte[] data) { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { long currentStreamOffset; - try { - currentStreamOffset = fsDataOutputStream.getPos(); - } catch (IOException e) { - logger.error("errors while get file pos from output stream", e); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "errors while get file pos from output stream"); - } + currentStreamOffset = fsDataOutputStream.getPos(); if (currentStreamOffset != offset) { // it's ok, it means that last pwrite succeed finally if (currentStreamOffset == offset + data.length) { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java index 14ff74dd41e62e..816462ecb340e7 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java @@ -19,6 +19,7 @@ import com.google.common.base.Stopwatch; import org.apache.doris.common.BrokerPerfMonitor; +import org.apache.doris.common.HiveUtils; import org.apache.doris.thrift.TBrokerCheckPathExistRequest; import org.apache.doris.thrift.TBrokerCheckPathExistResponse; import org.apache.doris.thrift.TBrokerCloseReaderRequest; @@ -28,6 +29,8 @@ import org.apache.doris.thrift.TBrokerFileSizeRequest; import org.apache.doris.thrift.TBrokerFileSizeResponse; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerIsSplittableResponse; +import org.apache.doris.thrift.TBrokerIsSplittableRequest; import org.apache.doris.thrift.TBrokerListPathRequest; import org.apache.doris.thrift.TBrokerListResponse; import org.apache.doris.thrift.TBrokerOpenReaderRequest; @@ -86,6 +89,47 @@ public TBrokerListResponse listPath(TBrokerListPathRequest request) } } + @Override + public TBrokerListResponse listLocatedFiles(TBrokerListPathRequest request) + throws TException { + logger.info("received a listLocatedFiles request, request detail: " + request); + TBrokerListResponse response = new TBrokerListResponse(); + try { + boolean recursive = request.isIsRecursive(); + boolean onlyFiles = false; + if (request.isSetOnlyFiles()) { + onlyFiles = request.isOnlyFiles(); + } + List fileStatuses = fileSystemManager.listLocatedFiles(request.path, + onlyFiles, recursive, request.properties); + response.setOpStatus(generateOKStatus()); + response.setFiles(fileStatuses); + return response; + } catch (BrokerException e) { + logger.warn("failed to list path: " + request.path, e); + TBrokerOperationStatus errorStatus = e.generateFailedOperationStatus(); + response.setOpStatus(errorStatus); + return response; + } + } + + @Override + public TBrokerIsSplittableResponse isSplittable(TBrokerIsSplittableRequest request) throws TException { + logger.info("received a isSplittable request, request detail: " + request); + TBrokerIsSplittableResponse response = new TBrokerIsSplittableResponse(); + try { + boolean isSplittable = HiveUtils.isSplittable(request.path, request.inputFormat, request.properties); + response.setOpStatus(generateOKStatus()); + response.setSplittable(isSplittable); + return response; + } catch (BrokerException e) { + logger.warn("failed to get isSplitable with path: " + request.path, e); + TBrokerOperationStatus errorStatus = e.generateFailedOperationStatus(); + response.setOpStatus(errorStatus); + return response; + } + } + @Override public TBrokerOperationStatus deletePath(TBrokerDeletePathRequest request) throws TException { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java new file mode 100644 index 00000000000000..f2211eb202662a --- /dev/null +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.broker.hdfs.BrokerException; +import org.apache.doris.thrift.TBrokerOperationStatusCode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +public class HiveUtils { + private static final Logger logger = Logger.getLogger(HiveUtils.class.getName()); + + public static boolean isSplittable(String path, String inputFormatName, + Map properties) throws BrokerException { + JobConf jobConf = getJobConf(properties); + InputFormat inputFormat = getInputFormat(jobConf, inputFormatName); + return isSplittableInternal(inputFormat, new Path(path), jobConf); + } + + private static JobConf getJobConf(Map properties) { + Configuration configuration = new Configuration(); + for (Map.Entry entry : properties.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + return new JobConf(configuration); + } + + private static InputFormat getInputFormat(JobConf jobConf, String inputFormatName) throws BrokerException { + try { + Class> inputFormatClass = getInputFormatClass(jobConf, inputFormatName); + if (inputFormatClass == SymlinkTextInputFormat.class) { + // symlink targets are always TextInputFormat + inputFormatClass = TextInputFormat.class; + } + + return ReflectionUtils.newInstance(inputFormatClass, jobConf); + } catch (ClassNotFoundException | RuntimeException e) { + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + "Unable to create input format " + inputFormatName, e); + } + } + + @SuppressWarnings({"unchecked", "RedundantCast"}) + private static Class> getInputFormatClass(JobConf conf, String inputFormatName) + throws ClassNotFoundException { + // CDH uses different names for Parquet + if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) + || "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) { + return MapredParquetInputFormat.class; + } + + Class clazz = conf.getClassByName(inputFormatName); + return (Class>) clazz.asSubclass(InputFormat.class); + } + + private static boolean isSplittableInternal(InputFormat inputFormat, Path path, JobConf jobConf) { + // ORC uses a custom InputFormat but is always splittable + if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { + return true; + } + + // use reflection to get isSplittable method on FileInputFormat + Method method = null; + for (Class clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) { + try { + method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class); + break; + } catch (NoSuchMethodException ignored) { + logger.warn(LoggerMessageFormat.format("Class {} doesn't contain isSplitable method", clazz)); + } + } + + if (method == null) { + return false; + } + try { + method.setAccessible(true); + return (boolean) method.invoke(inputFormat, path.getFileSystem(jobConf), path); + } catch (InvocationTargetException | IllegalAccessException | IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/gensrc/thrift/PaloBrokerService.thrift b/gensrc/thrift/PaloBrokerService.thrift index 308c606544e939..e4bc60a201fa3e 100644 --- a/gensrc/thrift/PaloBrokerService.thrift +++ b/gensrc/thrift/PaloBrokerService.thrift @@ -91,12 +91,25 @@ struct TBrokerCheckPathExistResponse { 2: required bool isPathExist; } +struct TBrokerIsSplittableResponse { + 1: optional TBrokerOperationStatus opStatus; + 2: optional bool splittable; +} + struct TBrokerListPathRequest { 1: required TBrokerVersion version; 2: required string path; 3: required bool isRecursive; 4: required map properties; 5: optional bool fileNameOnly; + 6: optional bool onlyFiles; +} + +struct TBrokerIsSplittableRequest { + 1: optional TBrokerVersion version; + 2: optional string path; + 3: optional string inputFormat; + 4: optional map properties; } struct TBrokerDeletePathRequest { @@ -184,6 +197,13 @@ service TPaloBrokerService { // return a list of files under a path TBrokerListResponse listPath(1: TBrokerListPathRequest request); + + // return located files of a given path. A broker implementation refers to + // 'org.apache.doris.fs.remote.RemoteFileSystem#listLocatedFiles' in fe-core. + TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request); + + // return whether the path with specified input format is splittable. + TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest request); // delete a file, if the deletion of the file fails, the status code will return an error message // input: