Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
433 changes: 220 additions & 213 deletions fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -268,7 +266,7 @@ public void createScanRangeLocations() throws UserException {
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
if (isCsvOrJson || isWal) {
params.setFileAttributes(getFileAttributes());
if (getLocationType() == TFileType.FILE_STREAM) {
if (isFileStreamType()) {
params.setFileType(TFileType.FILE_STREAM);
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
ExternalFileTableValuedFunction tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
Expand Down Expand Up @@ -309,19 +307,13 @@ public void createScanRangeLocations() throws UserException {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) {
return;
}
selectedSplitNum = numApproximateSplits();

TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
TFileType locationType = fileSplit.getLocationType();
totalFileSize = fileSplit.getLength() * selectedSplitNum;
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
Expand Down Expand Up @@ -351,7 +343,7 @@ public void createScanRangeLocations() throws UserException {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
selectedSplitNum = inputSplits.size();
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
if (inputSplits.isEmpty() && !isFileStreamType()) {
return;
}
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
Expand Down Expand Up @@ -379,14 +371,6 @@ private TScanRangeLocations splitToScanRange(
Split split,
List<String> pathPartitionKeys) throws UserException {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType;
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}

TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
Expand All @@ -396,41 +380,42 @@ private TScanRangeLocations splitToScanRange(
isACID = hiveSplit.isACID();
}
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();

TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
locationType);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
rangeDesc.setCompressType(fileCompressType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
if (fileSplit instanceof HiveSplit) {
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else if (fileSplit instanceof HiveSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
setLocationPropertiesIfNecessary(backend, fileSplit.getLocationType(), locationProperties);
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
curLocations.addToLocations(location);
Expand Down Expand Up @@ -493,8 +478,7 @@ private TScanRangeLocations newLocations() {
}

private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
List<String> columnsFromPathKeys, TFileType locationType)
throws UserException {
List<String> columnsFromPathKeys) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
Expand All @@ -504,10 +488,10 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);

rangeDesc.setFileType(locationType);
rangeDesc.setPath(fileSplit.getPath().toString());
if (locationType == TFileType.FILE_HDFS) {
URI fileUri = fileSplit.getPath().toUri();
rangeDesc.setFileType(fileSplit.getLocationType());
rangeDesc.setPath(fileSplit.getPath().toStorageLocation().toString());
if (fileSplit.getLocationType() == TFileType.FILE_HDFS) {
URI fileUri = fileSplit.getPath().getPath().toUri();
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
}
rangeDesc.setModificationTime(fileSplit.getModificationTime());
Expand Down Expand Up @@ -555,14 +539,16 @@ public int getNumInstances() {
return scanRangeLocations.size();
}

protected abstract TFileType getLocationType() throws UserException;

protected abstract TFileType getLocationType(String location) throws UserException;
// Return true if this is a TFileType.FILE_STREAM type.
// Currently, only TVFScanNode may be TFileType.FILE_STREAM type.
protected boolean isFileStreamType() throws UserException {
return false;
}

protected abstract TFileFormatType getFileFormatType() throws UserException;

protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
return Util.inferFileCompressTypeByPath(fileSplit.getPathString());
}

protected TFileAttributes getFileAttributes() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -46,7 +47,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -241,14 +241,14 @@ protected void setDefaultValueExprs(TableIf tbl,
}
}

protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.toString());
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileType;

import lombok.Data;
import org.apache.hadoop.fs.Path;

import java.util.List;

@Data
public class FileSplit implements Split {
public Path path;
public LocationPath path;
public long start;
// length of this split, in bytes
public long length;
Expand All @@ -43,27 +44,30 @@ public class FileSplit implements Split {
public List<String> partitionValues;

public List<String> alternativeHosts;
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;

public FileSplit(Path path, long start, long length, long fileLength,
public FileSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues) {
this.path = path;
this.start = start;
this.length = length;
this.fileLength = fileLength;
this.modificationTime = modificationTime;
// BE requires modification time to be non-negative.
this.modificationTime = modificationTime < 0 ? 0 : modificationTime;
this.hosts = hosts == null ? new String[0] : hosts;
this.partitionValues = partitionValues;
}

public FileSplit(Path path, long start, long length, long fileLength,
String[] hosts, List<String> partitionValues) {
this(path, start, length, fileLength, 0, hosts, partitionValues);
this.locationType = path.isBindBroker() ? TFileType.FILE_BROKER : path.getTFileTypeForBE();
}

public String[] getHosts() {
return hosts;
}

public TFileType getLocationType() {
return locationType;
}

@Override
public Object getInfo() {
return null;
Expand All @@ -79,7 +83,8 @@ public static class FileSplitCreator implements SplitCreator {
public static final FileSplitCreator DEFAULT = new FileSplitCreator();

@Override
public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
public Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;

import org.apache.hadoop.fs.Path;

import java.util.List;

public interface SplitCreator {
Split create(Path path, long start, long length, long fileLength,
Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,7 @@ private FileCacheValue getFileCache(String location, String inputFormat,
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties());
Path convertedPath = locationPath.toStorageLocation();
if (!convertedPath.toString().equals(srcPath)) {
remoteFile.setPath(convertedPath);
}
result.addFile(remoteFile);
result.addFile(remoteFile, locationPath);
}
} else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
// User may manually remove partition under HDFS, in this case,
Expand Down Expand Up @@ -813,14 +809,17 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
Expand All @@ -837,8 +836,12 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
Expand Down Expand Up @@ -998,11 +1001,11 @@ public static class FileCacheValue {

private AcidInfo acidInfo;

public void addFile(RemoteFile file) {
public void addFile(RemoteFile file, LocationPath locationPath) {
if (isFileVisible(file.getPath())) {
HiveFileStatus status = new HiveFileStatus();
status.setBlockLocations(file.getBlockLocations());
status.setPath(file.getPath());
status.setPath(locationPath);
status.length = file.getSize();
status.blockSize = file.getBlockSize();
status.modificationTime = file.getModificationTime();
Expand All @@ -1014,7 +1017,6 @@ public int getValuesSize() {
return partitionValues == null ? 0 : partitionValues.size();
}


public AcidInfo getAcidInfo() {
return acidInfo;
}
Expand Down Expand Up @@ -1062,7 +1064,7 @@ private static boolean isGeneratedPath(String name) {
@Data
public static class HiveFileStatus {
BlockLocation[] blockLocations;
Path path;
LocationPath path;
long length;
long blockSize;
long modificationTime;
Expand Down
Loading