Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.job.offset.SourceOffsetProviderFactory;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
Expand All @@ -48,6 +49,7 @@
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnStateChangeCallback;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -65,7 +67,7 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M

@SerializedName("did")
private final long dbId;
private LoadStatistic loadStatistic = new LoadStatistic();
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
@SerializedName("fm")
private FailMsg failMsg;
@Getter
Expand Down Expand Up @@ -138,7 +140,7 @@ public List<StreamingJobSchedulerTask> createTasks(TaskType taskType, Map<Object
protected StreamingInsertTask createStreamingInsertTask() {
InsertIntoTableCommand command = offsetProvider.rewriteTvfParams(getExecuteSql());
this.runningStreamTask = new StreamingInsertTask(getJobId(), AbstractTask.getNextTaskId(), command,
loadStatistic, getCurrentDbName(), offsetProvider.getCurrentOffset(), jobProperties);
getCurrentDbName(), offsetProvider.getCurrentOffset(), jobProperties);
return this.runningStreamTask;
}

Expand Down Expand Up @@ -184,6 +186,13 @@ public void onStreamTaskSuccess(StreamingInsertTask task) {
Env.getCurrentEnv().getJobManager().getStreamingTaskScheduler().registerTask(runningStreamTask);
}

private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + attachment.getScannedRows());
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + attachment.getLoadBytes());
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + attachment.getFileNumber());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + attachment.getFileSize());
offsetProvider.updateOffset(attachment.getOffset());
}

@Override
public ShowResultSetMetaData getTaskMetaData() {
Expand Down Expand Up @@ -212,7 +221,7 @@ public TRow getTvfInfo() {
trow.addToColumnValue(new TCell().setStringVal(getComment()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(jobStatistic.toJson()));
trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? FeConstants.null_string : failMsg.getMsg()));
return trow;
}
Expand Down Expand Up @@ -244,7 +253,20 @@ public long getId() {

@Override
public void beforeCommitted(TransactionState txnState) throws TransactionException {

ArrayList<Long> taskIds = new ArrayList<>();
taskIds.add(runningStreamTask.getTaskId());
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
if (loadJobs.size() != 1) {
throw new TransactionException("load job not found, insert job id is " + runningStreamTask.getTaskId());
}
LoadJob loadJob = loadJobs.get(0);
LoadStatistic loadStatistic = loadJob.getLoadStatistic();
txnState.setTxnCommitAttachment(new StreamingTaskTxnCommitAttachment(
loadStatistic.getScannedRows(),
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
runningStreamTask.getOffset()));
}

@Override
Expand All @@ -254,12 +276,18 @@ public void beforeAborted(TransactionState txnState) throws TransactionException

@Override
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException {

Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState);
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
updateJobStatisticAndOffset(attachment);
}

@Override
public void replayOnCommitted(TransactionState txnState) {

Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState);
StreamingTaskTxnCommitAttachment attachment =
(StreamingTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
updateJobStatisticAndOffset(attachment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
Expand Down Expand Up @@ -59,22 +58,19 @@ public class StreamingInsertTask {
private String currentDb;
private UserIdentity userIdentity;
private ConnectContext ctx;
private LoadStatistic loadStatistic;
private Offset offset;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
private StreamingJobProperties jobProperties;

public StreamingInsertTask(long jobId,
long taskId,
InsertIntoTableCommand command,
LoadStatistic loadStatistic,
String currentDb,
Offset offset,
StreamingJobProperties jobProperties) {
this.jobId = jobId;
this.taskId = taskId;
this.command = command;
this.loadStatistic = loadStatistic;
this.userIdentity = ctx.getCurrentUserIdentity();
this.currentDb = currentDb;
this.offset = offset;
Expand Down Expand Up @@ -127,7 +123,7 @@ private void run() throws JobException {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
command.runWithUpdateInfo(ctx, stmtExecutor, loadStatistic);
command.runWithUpdateInfo(ctx, stmtExecutor, null);
if (ctx.getState().getStateType() == QueryState.MysqlStateType.OK) {
return;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.persist.gson.GsonUtils;

import lombok.Getter;
import lombok.Setter;

public class StreamingJobStatistic {
@Getter
@Setter
private long scannedRows;
@Getter
@Setter
private long loadBytes;
@Getter
@Setter
private long fileNumber;
@Getter
@Setter
private long fileSize;

public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.job.offset.Offset;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;

import com.google.gson.annotations.SerializedName;
import lombok.Getter;

public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment {

public StreamingTaskTxnCommitAttachment(long scannedRows, long loadBytes,
long fileNumber, long fileSize, Offset offset) {
super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
this.scannedRows = scannedRows;
this.loadBytes = loadBytes;
this.fileNumber = fileNumber;
this.fileSize = fileSize;
this.offset = offset;
}

@SerializedName(value = "sr")
@Getter
private long scannedRows;
@SerializedName(value = "lb")
@Getter
private long loadBytes;
@SerializedName(value = "fn")
@Getter
private long fileNumber;
@SerializedName(value = "fs")
@Getter
private long fileSize;
@SerializedName(value = "of")
@Getter
private Offset offset;

@Override
public String toString() {
return "StreamingTaskTxnCommitAttachment: ["
+ "scannedRows=" + scannedRows
+ ", loadBytes=" + loadBytes
+ ", fileNumber=" + fileNumber
+ ", fileSize=" + fileSize
+ ", offset=" + offset.toString()
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public interface SourceOffsetProvider {
InsertIntoTableCommand rewriteTvfParams(String sql);

/**
* Update the progress of the source.
* Update the offset of the source.
* @param offset
*/
void updateProgress(Offset offset);
void updateOffset(Offset offset);

/**
* Fetch remote meta information, such as listing files in S3 or getting latest offsets in Kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,13 @@ public class S3Offset implements Offset {
public String toJson() {
return GsonUtils.GSON.toJson(this);
}

@Override
public String toString() {
return "S3Offset: ["
+ "startFile=" + startFile
+ ", endFile=" + endFile
+ ", fileLists=" + fileLists
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public InsertIntoTableCommand rewriteTvfParams(String sql) {
}

@Override
public void updateProgress(Offset offset) {
public void updateOffset(Offset offset) {
this.currentOffset = (S3Offset) offset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public Map<String, String> getCounters() {
return counters;
}

public int getFileNumber() {
return fileNum;
}

public long getTotalFileSizeB() {
return totalFileSizeB;
}

public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
Expand Down
Loading