Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ default Status globList(String remotePath, List<RemoteFile> result) {
*/
Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly);

/**
* List files in remotePath <br/>
* @param remotePath remote path
* @param result All eligible files under the path
* @param startFile start file name
* @param fileSizeLimit limit the total size of files to be listed.
* @param fileNumLimit limit the total number of files to be listed.
* @return
*/
default String globListWithLimit(String remotePath, List<String> result,
String startFile, long fileSizeLimit, long fileNumLimit) {
throw new UnsupportedOperationException("Unsupported operation glob list with limit on current file system.");
}

default Status listDirectories(String remotePath, Set<String> result) {
throw new UnsupportedOperationException("Unsupported operation list directories on current file system.");
}
Expand Down
139 changes: 139 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request.Builder;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
Expand Down Expand Up @@ -630,6 +631,144 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
}
}


/**
* List all files under the given path with glob pattern.
* For example, if the path is "s3://bucket/path/to/*.csv",
* it will list all files under "s3://bucket/path/to/" with ".csv" suffix.
* <p>
* Limit: Starting from startFile, until the total file size is greater than fileSizeLimit,
* or the number of files is greater than fileNumLimit.
*
* @return The largest file name after listObject this time
*/
public String globListWithLimit(String remotePath, List<String> result, String startFile,
long fileSizeLimit, long fileNumLimit) {
long roundCnt = 0;
long elementCnt = 0;
long matchCnt = 0;
long matchFileSize = 0L;
long startTime = System.nanoTime();
try {
S3URI uri = S3URI.create(remotePath, isUsePathStyle, forceParsingByStandardUri);
if (uri.useS3DirectoryBucket()) {
throw new RuntimeException("Not support glob with limit for directory bucket");
}

String bucket = uri.getBucket();
String globPath = uri.getKey(); // eg: path/to/*.csv

if (LOG.isDebugEnabled()) {
LOG.debug("globList globPath:{}, remotePath:{}", globPath, remotePath);
}
java.nio.file.Path pathPattern = Paths.get(globPath);
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern);
HashSet<String> directorySet = new HashSet<>();

String listPrefix = S3Util.getLongestPrefix(globPath); // similar to Azure
if (LOG.isDebugEnabled()) {
LOG.debug("globList listPrefix: '{}' (from globPath: '{}')", listPrefix, globPath);
}

Builder builder = ListObjectsV2Request.builder();
builder.bucket(bucket)
.prefix(listPrefix);

if (startFile != null) {
builder.startAfter(startFile);
}

ListObjectsV2Request request = builder.build();

String currentMaxFile = "";
boolean isTruncated = false;
do {
roundCnt++;
ListObjectsV2Response response = listObjectsV2(request);
for (S3Object obj : response.contents()) {
elementCnt++;
java.nio.file.Path objPath = Paths.get(obj.key());

boolean isPrefix = false;
while (objPath != null && objPath.normalize().toString().startsWith(listPrefix)) {
if (!matcher.matches(objPath)) {
isPrefix = true;
objPath = objPath.getParent();
continue;
}
if (directorySet.contains(objPath.normalize().toString())) {
break;
}
if (isPrefix) {
directorySet.add(objPath.normalize().toString());
}

matchCnt++;
matchFileSize += obj.size();
String remoteFileName = "s3://" + bucket + "/" + objPath;
result.add(remoteFileName);

if (reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) {
break;
}

objPath = objPath.getParent();
isPrefix = true;
}
}
//record current last object file name
S3Object lastS3Object = response.contents().get(response.contents().size() - 1);
java.nio.file.Path lastObjPath = Paths.get(lastS3Object.key());
currentMaxFile = "s3://" + bucket + "/" + lastObjPath;

isTruncated = response.isTruncated();
if (isTruncated) {
request = request.toBuilder()
.continuationToken(response.nextContinuationToken())
.build();
}
} while (isTruncated);

if (LOG.isDebugEnabled()) {
LOG.debug("remotePath:{}, result:{}", remotePath, result);
}
return currentMaxFile;
} catch (Exception e) {
LOG.warn("Errors while getting file status", e);
throw new RuntimeException(e);
} finally {
long endTime = System.nanoTime();
long duration = endTime - startTime;
if (LOG.isDebugEnabled()) {
LOG.debug("process {} elements under prefix {} for {} round, match {} elements, take {} ms",
elementCnt, remotePath, roundCnt, matchCnt,
duration / 1000 / 1000);
}
}
}

private static boolean reachLimit(int matchFileCnt, long matchFileSize, long sizeLimit, long fileNum) {
if (matchFileCnt < 0 || sizeLimit < 0 || fileNum < 0) {
return false;
}
if (fileNum > 0 && matchFileCnt >= fileNum) {
LOG.info(
"reach file num limit fileNum:{} objectFiles count:{}",
fileNum,
matchFileCnt);
return true;
}

if (sizeLimit > 0 && matchFileSize >= sizeLimit) {
LOG.info(
"reach size limit sizeLimit:{}, objectFilesSize:{}",
sizeLimit,
matchFileSize);
return true;
}
return false;
}

@Override
public synchronized void close() throws Exception {
if (client != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
return objStorage.globList(remotePath, result, fileNameOnly);
}

@Override
public String globListWithLimit(String remotePath, List<String> result, String startFile,
long fileSizeLimit, long fileNumLimit) {
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
return objStorage.globListWithLimit(remotePath, result, startFile, fileSizeLimit, fileNumLimit);
}

@Override
public Status listDirectories(String remotePath, Set<String> result) {
S3ObjStorage objStorage = (S3ObjStorage) this.objStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void validateStartTimeMs() {
if (timerDefinition.getStartTimeMs() == null) {
throw new IllegalArgumentException("startTimeMs cannot be null");
}
if (isImmediate()) {
if (isImmediate() || JobExecuteType.STREAMING.equals(executeType)) {
return;
}
if (timerDefinition.getStartTimeMs() < System.currentTimeMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,8 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(
loadStatistic == null ? FeConstants.null_string : loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
return trow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonUtils;
Expand All @@ -64,8 +66,6 @@

public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, Map<Object, Object>> implements
TxnStateChangeCallback {

@SerializedName("did")
private final long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@SerializedName("fm")
Expand All @@ -79,7 +79,6 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
@Setter
protected long autoResumeCount;
@Getter
@SerializedName("jp")
private StreamingJobProperties jobProperties;
@Getter
StreamingInsertTask runningStreamTask;
Expand All @@ -101,12 +100,15 @@ public StreamingInsertJob(String jobName,
this.jobProperties = jobProperties;
String tvfType = parseTvfType();
this.offsetProvider = SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
this.offsetProvider.init(getExecuteSql(), jobProperties);
}

private String parseTvfType() {
NereidsParser parser = new NereidsParser();
InsertIntoTableCommand command = (InsertIntoTableCommand) parser.parseSingle(getExecuteSql());
return command.getFirstTvfName();
UnboundTVFRelation firstTVF = command.getFirstTVF();
Preconditions.checkNotNull(firstTVF, "Only support insert sql with tvf");
return firstTVF.getFunctionName();
}

@Override
Expand Down Expand Up @@ -138,7 +140,8 @@ public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, Map<Object
}

protected StreamingInsertTask createStreamingInsertTask() {
InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(getExecuteSql());
Offset nextOffset = offsetProvider.getNextOffset();
InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(nextOffset);
this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), command,
getCurrentDbName(), offsetProvider.getCurrentOffset(), jobProperties);
return this.runningStreamTask;
Expand Down Expand Up @@ -221,7 +224,8 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
return trow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ protected void executeCancelLogic(boolean needWaitCancelComplete) throws Excepti
@Override
public TRow getTvfInfo(String jobName) {
StreamingInsertTask runningTask = streamingInsertJob.getRunningStreamTask();
if (runningTask == null) {
return null;
}
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getTaskId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(runningTask.getJobId())));
Expand All @@ -117,6 +120,6 @@ public TRow getTvfInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(runningTask.getOffset().toJson()));
return null;
return trow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

package org.apache.doris.job.offset;

import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;

/**
* Interface for managing offsets and metadata of a data source.
*/
public interface SourceOffsetProvider {

/**
* init
*/
void init(String executeSql, StreamingJobProperties jobProperties);

/**
* Get source type, e.g. s3, kafka
* @return
Expand All @@ -43,10 +50,10 @@ public interface SourceOffsetProvider {

/**
* Rewrite the TVF parameters in the SQL based on the current offset.
* @param sql
* @param nextOffset
* @return rewritten InsertIntoTableCommand
*/
InsertIntoTableCommand rewriteTvfParams(String sql);
InsertIntoTableCommand rewriteTvfParams(Offset nextOffset);

/**
* Update the offset of the source.
Expand All @@ -64,5 +71,6 @@ public interface SourceOffsetProvider {
* @return
*/
boolean hasMoreDataToConsume();

}

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SourceOffsetProviderFactory {

public static SourceOffsetProvider createSourceOffsetProvider(String sourceType) {
try {
Class<? extends SourceOffsetProvider> cla = map.get(sourceType.toUpperCase());
Class<? extends SourceOffsetProvider> cla = map.get(sourceType.toLowerCase());
if (cla == null) {
throw new JobException("Unsupported source type: " + sourceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.doris.persist.gson.GsonUtils;

import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Getter
@Setter
public class S3Offset implements Offset {
String startFile;
String endFile;
@Getter
List<String> fileLists;

@Override
Expand Down
Loading
Loading