Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private void initReader() throws IOException {
int[] projected = getProjected();
readBuilder.withProjection(projected);
readBuilder.withFilter(getPredicates());
reader = readBuilder.newRead().createReader(getSplit());
reader = readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode {
protected boolean needCheckColumnPriv;

protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableFileCache)
&& (ConnectContext.get().getSessionVariable().enableFileCache
|| ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan()))
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
: new FederationBackendPolicy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public void funnel(Backend backend, PrimitiveSink primitiveSink) {
private static class SplitHash implements Funnel<Split> {
@Override
public void funnel(Split split, PrimitiveSink primitiveSink) {
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putLong(split.getStart());
primitiveSink.putLong(split.getLength());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public class FileSplit implements Split {
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;

public Long selfSplitWeight;
public Long targetSplitSize;

public FileSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues) {
this.path = path;
Expand Down Expand Up @@ -89,4 +92,20 @@ public Split create(LocationPath path, long start, long length, long fileLength,
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}
}

@Override
public void setTargetSplitSize(Long targetSplitSize) {
this.targetSplitSize = targetSplitSize;
}

@Override
public SplitWeight getSplitWeight() {
if (selfSplitWeight != null && targetSplitSize != null) {
double computedWeight = selfSplitWeight * 1.0 / targetSplitSize;
// Clamp the value be between the minimum weight and 1.0 (standard weight)
return SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0));
} else {
return SplitWeight.standard();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,34 @@
@Data
public class IcebergDeleteFileFilter {
private String deleteFilePath;
private long filesize;

public IcebergDeleteFileFilter(String deleteFilePath) {
public IcebergDeleteFileFilter(String deleteFilePath, long filesize) {
this.deleteFilePath = deleteFilePath;
this.filesize = filesize;
}

public static PositionDelete createPositionDelete(String deleteFilePath, Long positionLowerBound,
Long positionUpperBound) {
return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound);
Long positionUpperBound, long filesize) {
return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound, filesize);
}

public static EqualityDelete createEqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
public static EqualityDelete createEqualityDelete(String deleteFilePath, List<Integer> fieldIds, long fileSize) {
// todo:
// Schema deleteSchema = TypeUtil.select(scan.schema(), new HashSet<>(fieldIds));
// StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct());
// pass deleteSet to BE
// compare two StructLike value, if equals, filtered
return new EqualityDelete(deleteFilePath, fieldIds);
return new EqualityDelete(deleteFilePath, fieldIds, fileSize);
}

static class PositionDelete extends IcebergDeleteFileFilter {
private final Long positionLowerBound;
private final Long positionUpperBound;

public PositionDelete(String deleteFilePath, Long positionLowerBound,
Long positionUpperBound) {
super(deleteFilePath);
Long positionUpperBound, long fileSize) {
super(deleteFilePath, fileSize);
this.positionLowerBound = positionLowerBound;
this.positionUpperBound = positionUpperBound;
}
Expand All @@ -67,8 +69,8 @@ public OptionalLong getPositionUpperBound() {
static class EqualityDelete extends IcebergDeleteFileFilter {
private List<Integer> fieldIds;

public EqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
super(deleteFilePath);
public EqualityDelete(String deleteFilePath, List<Integer> fieldIds, long fileSize) {
super(deleteFilePath, fileSize);
this.fieldIds = fieldIds;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private List<Split> doGetSplits() throws UserException {
}

selectedPartitionNum = partitionPathSet.size();

splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}

Expand Down Expand Up @@ -315,10 +315,11 @@ private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask
.map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L),
delete.fileSizeInBytes()));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
filters.add(IcebergDeleteFileFilter.createEqualityDelete(
delete.path().toString(), delete.equalityFieldIds()));
delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes()));
} else {
throw new IllegalStateException("Unknown delete content: " + delete.content());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength,
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
this.selfSplitWeight = length;
}

public long getRowCount() {
Expand All @@ -56,4 +57,9 @@ public long getRowCount() {
public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}

public void setDeleteFileFilters(List<IcebergDeleteFileFilter> deleteFileFilters) {
this.deleteFileFilters = deleteFileFilters;
this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
Expand Down Expand Up @@ -101,9 +101,14 @@ public String toString() {
private int rawFileSplitNum = 0;
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
private SessionVariable sessionVariable;

public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
SessionVariable sessionVariable) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
this.sessionVariable = sessionVariable;
}

@Override
Expand Down Expand Up @@ -176,7 +181,9 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit)

@Override
public List<Split> getSplits() throws UserException {
boolean forceJniScanner = ConnectContext.get().getSessionVariable().isForceJniScanner();
boolean forceJniScanner = sessionVariable.isForceJniScanner();
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
Expand All @@ -196,7 +203,11 @@ public List<Split> getSplits() throws UserException {
selectedPartitionValues.add(partitionValue);
Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles();
Optional<List<DeletionFile>> optDeletionFiles = dataSplit.deletionFiles();

if (supportNativeReader(optRawFiles)) {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
continue;
}
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
Expand Down Expand Up @@ -252,17 +263,25 @@ public List<Split> getSplits() throws UserException {
}
}
} else {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
continue;
}
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
} else {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
continue;
}
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
splitStats.add(splitStat);
}
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
// We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile.
splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,51 @@
import org.apache.doris.datasource.TableFormatType;

import com.google.common.collect.Maps;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.Split;

import java.util.List;
import java.util.Optional;
import java.util.UUID;

public class PaimonSplit extends FileSplit {
private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap());
private Split split;
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;


public PaimonSplit(Split split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();

if (split instanceof DataSplit) {
List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
this.path = new LocationPath("/" + dataFileMetas.get(0).fileName());
this.selfSplitWeight = dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum();
} else {
this.selfSplitWeight = split.rowCount();
}
}

private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime,
String[] hosts, List<String> partitionList) {
super(file, start, length, fileLength, modificationTime, hosts, partitionList);
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
this.selfSplitWeight = length;
}

@Override
public String getConsistentHashString() {
if (this.path == DUMMY_PATH) {
return UUID.randomUUID().toString();
}
return getPathString();
}

public Split getSplit() {
Expand All @@ -66,6 +87,7 @@ public Optional<DeletionFile> getDeletionFile() {
}

public void setDeletionFile(DeletionFile deletionFile) {
this.selfSplitWeight += deletionFile.length();
this.optDeletionFile = Optional.of(deletionFile);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
ConnectContext.get().getSessionVariable());
} else if (table instanceof TrinoConnectorExternalTable) {
scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,8 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case PAIMON_EXTERNAL_TABLE:
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true,
ConnectContext.get().getSessionVariable());
break;
case TRINO_CONNECTOR_EXTERNAL_TABLE:
scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,26 @@ public class SessionVariable implements Serializable, Writable {
setter = "setPipelineTaskNum")
public int parallelPipelineTaskNum = 0;


public enum IgnoreSplitType {
NONE,
IGNORE_JNI,
IGNORE_NATIVE
}

public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
@VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
checker = "checkIgnoreSplitType",
options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
description = {"忽略指定类型的split", "Ignore splits of the specified type"})
public String ignoreSplitType = IgnoreSplitType.NONE.toString();

public static final String USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN = "use_consistent_hash_for_external_scan";
@VariableMgr.VarAttr(name = USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN,
description = {"对外表采用一致性hash的方式做split的分发",
"Use consistent hashing to split the appearance for external scan"})
public boolean useConsistentHashForExternalScan = false;

@VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true)
public int profileLevel = 1;

Expand Down Expand Up @@ -4323,6 +4343,22 @@ public boolean isForceJniScanner() {
return forceJniScanner;
}

public String getIgnoreSplitType() {
return ignoreSplitType;
}

public void checkIgnoreSplitType(String value) {
try {
IgnoreSplitType.valueOf(value);
} catch (Exception e) {
throw new UnsupportedOperationException("We only support `NONE`, `IGNORE_JNI` and `IGNORE_NATIVE`");
}
}

public boolean getUseConsistentHashForExternalScan() {
return useConsistentHashForExternalScan;
}

public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ default boolean isRemotelyAccessible() {

void setAlternativeHosts(List<String> alternativeHosts);

default String getConsistentHashString() {
return getPathString();
}

void setTargetSplitSize(Long targetSplitSize);
}
Loading