Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ supportedJobStatement
commentSpec?
DO supportedDmlStatement #createScheduledJob
| PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #pauseJob
| ALTER JOB FOR (jobNameKey=identifier) (propertyClause | supportedDmlStatement | propertyClause supportedDmlStatement) #alterJob
| ALTER JOB FOR (jobName=multipartIdentifier) (propertyClause | supportedDmlStatement | propertyClause supportedDmlStatement) #alterJob
| DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #dropJob
| RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #resumeJob
| CANCEL TASK WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ (taskIdValue=INTEGER_VALUE) #cancelJobTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public static TxnCommitAttachmentPB streamingTaskTxnCommitAttachmentToPb(Streami
.setFileSize(streamingTaskTxnCommitAttachment.getFileSize());

if (streamingTaskTxnCommitAttachment.getOffset() != null) {
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset());
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
}

attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ public String globListWithLimit(String remotePath, List<RemoteFile> result, Stri

String currentMaxFile = "";
boolean isTruncated = false;
boolean reachLimit = false;
do {
roundCnt++;
ListObjectsV2Response response = listObjectsV2(request);
Expand Down Expand Up @@ -716,12 +717,16 @@ public String globListWithLimit(String remotePath, List<RemoteFile> result, Stri
result.add(remoteFile);

if (reachLimit(result.size(), matchFileSize, fileSizeLimit, fileNumLimit)) {
reachLimit = true;
break;
}

objPath = objPath.getParent();
isPrefix = true;
}
if (reachLimit) {
break;
}
}
//record current last object file name
S3Object lastS3Object = response.contents().get(response.contents().size() - 1);
Expand All @@ -733,7 +738,7 @@ public String globListWithLimit(String remotePath, List<RemoteFile> result, Stri
.continuationToken(response.nextContinuationToken())
.build();
}
} while (isTruncated);
} while (isTruncated && !reachLimit);

if (LOG.isDebugEnabled()) {
LOG.debug("remotePath:{}, result:{}", remotePath, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,7 @@ public List<Long> getTriggerDelayTimes(Long currentTimeMs, Long startTimeMs, Lon
return delayTimeSeconds;
}

if (JobExecuteType.STREAMING.equals(executeType) && null != timerDefinition) {
if (null == timerDefinition.getStartTimeMs() || null != timerDefinition.getLatestSchedulerTimeMs()) {
return delayTimeSeconds;
}

// If the job is already executed or in the schedule queue, or not within this schedule window
if (endTimeMs < timerDefinition.getStartTimeMs()) {
return delayTimeSeconds;
}

delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, timerDefinition.getStartTimeMs()));
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
return delayTimeSeconds;
}

if (JobExecuteType.RECURRING.equals(executeType)) {
if (JobExecuteType.RECURRING.equals(executeType) || JobExecuteType.STREAMING.equals(executeType)) {
if (timerDefinition.getStartTimeMs() > endTimeMs || null != timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < startTimeMs) {
return delayTimeSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@
import java.io.DataOutput;
import java.io.IOException;

public class PauseReason implements Writable {
public class FailureReason implements Writable {
@SerializedName(value = "code")
private InternalErrorCode code;
@SerializedName(value = "msg")
private String msg;

public PauseReason(InternalErrorCode errCode, String msg) {
public FailureReason(InternalErrorCode errCode, String msg) {
this.code = errCode;
this.msg = msg;
}

public FailureReason(String msg) {
this.msg = msg;
}

public InternalErrorCode getCode() {
return code;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
.addAll(COMMON_SCHEMA)
.add(new Column("Comment", ScalarType.createStringType()))
// only execute type = streaming need record
.add(new Column("Properties", ScalarType.createStringType()))
.add(new Column("Progress", ScalarType.createStringType()))
.add(new Column("RemoteOffset", ScalarType.createStringType()))
.add(new Column("LoadStatistic", ScalarType.createStringType()))
Expand Down Expand Up @@ -549,6 +550,7 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(
loadStatistic == null ? FeConstants.null_string : loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class InsertTask extends AbstractTask {
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
new Column("User", ScalarType.createStringType()),
new Column("Offset", ScalarType.createStringType()));
new Column("Offset", ScalarType.createStringType()),
new Column("OtherMsg", ScalarType.createStringType()));

public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;

Expand Down Expand Up @@ -295,6 +296,7 @@ private TRow getPendingTaskTVFInfo(String jobName) {
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
return trow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.PauseReason;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
Expand Down Expand Up @@ -72,6 +71,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Log4j2
Expand All @@ -80,7 +80,8 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
private final long dbId;
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@Getter
protected PauseReason pauseReason;
@SerializedName("fr")
protected FailureReason failureReason;
@Getter
@Setter
protected long latestAutoResumeTimestamp;
Expand Down Expand Up @@ -225,6 +226,9 @@ public boolean hasMoreDataToConsume() {

@Override
public void onTaskFail(StreamingJobSchedulerTask task) throws JobException {
if (task.getErrMsg() != null) {
this.failureReason = new FailureReason(task.getErrMsg());
}
// Here is the failure of StreamingJobSchedulerTask, no processing is required
getRunningTasks().remove(task);
}
Expand All @@ -240,7 +244,7 @@ public void onStreamTaskFail(StreamingInsertTask task) throws JobException {
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
this.failureReason = new FailureReason(task.getErrMsg());
}
} finally {
lock.writeLock().unlock();
Expand All @@ -262,11 +266,14 @@ public void onStreamTaskSuccess(StreamingInsertTask task) {
}

private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
if (this.jobStatistic == null) {
this.jobStatistic = new StreamingJobStatistic();
}
this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + attachment.getScannedRows());
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + attachment.getLoadBytes());
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + attachment.getFileNumber());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + attachment.getFileSize());
offsetProvider.updateOffset(attachment.getOffset());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
}

@Override
Expand Down Expand Up @@ -297,14 +304,16 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getJobName()));
trow.addToColumnValue(new TCell().setStringVal(getCreateUser().getQualifiedUser()));
trow.addToColumnValue(new TCell().setStringVal(getJobConfig().getExecuteType().name()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(getExecuteSql()));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getSucceedTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(properties != null
? GsonUtils.GSON.toJson(properties) : FeConstants.null_string));

if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
trow.addToColumnValue(new TCell().setStringVal(offsetProvider.getSyncOffset()));
Expand All @@ -320,8 +329,8 @@ public TRow getTvfInfo() {

trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
trow.addToColumnValue(
new TCell().setStringVal(pauseReason == null ? FeConstants.null_string : pauseReason.getMsg()));
trow.addToColumnValue(new TCell().setStringVal(failureReason == null
? FeConstants.null_string : failureReason.getMsg()));
return trow;
}

Expand Down Expand Up @@ -359,6 +368,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
taskIds.add(runningStreamTask.getTaskId());
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
if (loadJobs.size() != 1) {
shouldRealseLock = true;
throw new TransactionException("load job not found, insert job id is " + runningStreamTask.getTaskId());
}
LoadJob loadJob = loadJobs.get(0);
Expand All @@ -377,7 +387,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
runningStreamTask.getRunningOffset()));
runningStreamTask.getRunningOffset().toJson()));
} finally {
if (shouldRealseLock) {
lock.writeLock().unlock();
Expand All @@ -387,6 +397,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti

@Override
public void beforeAborted(TransactionState txnState) throws TransactionException {

}

@Override
Expand All @@ -403,6 +414,7 @@ public void replayOnCommitted(TransactionState txnState) {
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
updateJobStatisticAndOffset(attachment);
succeedTaskCount.incrementAndGet();
}

public void replayOnCloudMode() throws UserException {
Expand Down Expand Up @@ -462,5 +474,19 @@ public void gsonPostProcess() throws IOException {
if (jobProperties == null && properties != null) {
jobProperties = new StreamingJobProperties(properties);
}

if (null == getSucceedTaskCount()) {
setSucceedTaskCount(new AtomicLong(0));
}
if (null == getFailedTaskCount()) {
setFailedTaskCount(new AtomicLong(0));
}
if (null == getCanceledTaskCount()) {
setCanceledTaskCount(new AtomicLong(0));
}

if (null == lock) {
this.lock = new ReentrantReadWriteLock(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class StreamingInsertTask {
@Setter
private TaskStatus status;
private String errMsg;
@Setter
private String otherMsg;
private Long createTimeMs;
private Long startTimeMs;
private Long finishTimeMs;
Expand Down Expand Up @@ -99,8 +101,8 @@ public void execute() throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
onFail(e.getMessage());
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
onFail(e.getMessage());
} finally {
// The cancel logic will call the closeOrReleased Resources method by itself.
// If it is also called here,
Expand Down
Loading