Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
Expand Down Expand Up @@ -238,11 +237,11 @@ public void onTaskSuccess(StreamingJobSchedulerTask task) throws JobException {

public void onStreamTaskFail(StreamingInsertTask task) throws JobException {
try {
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
}
failedTaskCount.incrementAndGet();
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.pauseReason = new PauseReason(InternalErrorCode.INTERNAL_ERR, task.getErrMsg());
}
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -321,7 +320,8 @@ public TRow getTvfInfo() {

trow.addToColumnValue(new TCell().setStringVal(
jobStatistic == null ? FeConstants.null_string : jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(pauseReason == null ? FeConstants.null_string : pauseReason.getMsg()));
trow.addToColumnValue(
new TCell().setStringVal(pauseReason == null ? FeConstants.null_string : pauseReason.getMsg()));
return trow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.SessionVariable;

import com.google.gson.annotations.SerializedName;
import lombok.Data;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import lombok.Getter;
import lombok.Setter;

import java.util.List;

@Getter
@Setter
public class S3Offset implements Offset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String, Strin
String parentPath = rfiles.get(0).getParentPath();
String filePaths = rfiles.stream().map(RemoteFile::getName).collect(Collectors.joining(",", "{", "}"));
String finalFiles = String.format("s3://%s/%s/%s", bucket, parentPath, filePaths);
offset.setEndFile(String.format("s3://%s/%s/%s", bucket, parentPath, rfiles.get(rfiles.size() - 1).getName()));
offset.setEndFile(
String.format("s3://%s/%s/%s", bucket, parentPath, rfiles.get(rfiles.size() - 1).getName()));
offset.setFileLists(finalFiles);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.JobProperties;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ public long getCallbackId() {
return callbackId;
}

public void setCallbackId(long callbackId) {
this.callbackId = callbackId;
}

public long getTimeoutMs() {
return timeoutMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateFunctionCommand;
import org.apache.doris.nereids.trees.plans.commands.DropFunctionCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;

import com.google.common.collect.ImmutableList;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -44,7 +39,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;


Expand Down Expand Up @@ -79,19 +73,7 @@ public void testDropGlobalFunction() throws Exception {
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) new NereidsParser().parseSingle(sql);
baseCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor(), false);
Map<String, String> map = new HashMap<>();
map.put("url" ,"s3:/xxxx/*.");
// rewrite plan
Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
if (plan instanceof LogicalTVFRelation) {
LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
originTvfRel.getRelationId(), new S3(new Properties(map)), ImmutableList.of());
return newRvfRel;
}
return plan;
});
InsertIntoTableCommand s = new InsertIntoTableCommand((LogicalPlan) rewritePlan, Optional.empty(), Optional.empty(),
Optional.empty(), true, Optional.empty());
map.put("url", "s3:/xxxx/*.");

StmtExecutor stmtExecutor = new StmtExecutor(connectContext, sql);
if (logicalPlan instanceof CreateDatabaseCommand) {
Expand Down