diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 12d950ec1ab45a..db3f0513c545e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -192,6 +192,8 @@ public final class FeMetaVersion { public static final int VERSION_89 = 89; // for global variable persist public static final int VERSION_90 = 90; + // sparkLoadAppHandle + public static final int VERSION_91 = 91; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_90; + public static final int VERSION_CURRENT = VERSION_91; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index bfe335ad3d8e64..22896556087920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -17,29 +17,44 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Iterator; import java.util.List; -public class SparkLoadAppHandle { +public class SparkLoadAppHandle implements Writable { private static final Logger LOG = LogManager.getLogger(SparkLoadAppHandle.class); // 5min private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000; private Process process; + @SerializedName("appId") private String appId; + @SerializedName("state") private State state; + @SerializedName("queue") private String queue; + @SerializedName("startTime") private long startTime; + @SerializedName("finalStatus") private FinalApplicationStatus finalStatus; + @SerializedName("trackingUrl") private String trackingUrl; + @SerializedName("user") private String user; + @SerializedName("logPath") private String logPath; private List listeners; @@ -76,6 +91,10 @@ public SparkLoadAppHandle(Process process) { this.process = process; } + public SparkLoadAppHandle() { + this.state = State.UNKNOWN; + } + public void addListener(Listener listener) { if (this.listeners == null) { this.listeners = Lists.newArrayList(); @@ -168,6 +187,16 @@ private void fireEvent(boolean isInfoChanged) { } } } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + public static SparkLoadAppHandle read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, SparkLoadAppHandle.class); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index c0f4f4222054b1..f2460c00b0d6ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -45,6 +45,7 @@ import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -103,8 +104,8 @@ /** * There are 4 steps in SparkLoadJob: * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job. - * Step2: LoadEtlChecker will check spark etl job status periodly and send push tasks to be when spark etl job is finished. - * Step3: LoadLoadingChecker will check loading status periodly and commit transaction when push tasks are finished. + * Step2: LoadEtlChecker will check spark etl job status periodically and send push tasks to be when spark etl job is finished. + * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished. * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction. */ public class SparkLoadJob extends BulkLoadJob { @@ -126,7 +127,7 @@ public class SparkLoadJob extends BulkLoadJob { // --- members below not persist --- private ResourceDesc resourceDesc; // for spark standalone - private SparkLoadAppHandle sparkLoadAppHandle; + private SparkLoadAppHandle sparkLoadAppHandle = new SparkLoadAppHandle(); // for straggler wait long time to commit transaction private long quorumFinishTimestamp = -1; // below for push task @@ -715,13 +716,11 @@ protected long getEtlStartTimestamp() { } public void clearSparkLauncherLog() { - if (sparkLoadAppHandle != null) { - String logPath = sparkLoadAppHandle.getLogPath(); - if (!Strings.isNullOrEmpty(logPath)) { - File file = new File(logPath); - if (file.exists()) { - file.delete(); - } + String logPath = sparkLoadAppHandle.getLogPath(); + if (!Strings.isNullOrEmpty(logPath)) { + File file = new File(logPath); + if (file.exists()) { + file.delete(); } } } @@ -730,6 +729,7 @@ public void clearSparkLauncherLog() { public void write(DataOutput out) throws IOException { super.write(out); sparkResource.write(out); + sparkLoadAppHandle.write(out); out.writeLong(etlStartTimestamp); Text.writeString(out, appId); Text.writeString(out, etlOutputPath); @@ -744,6 +744,9 @@ public void write(DataOutput out) throws IOException { public void readFields(DataInput in) throws IOException { super.readFields(in); sparkResource = (SparkResource) Resource.read(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_91) { + sparkLoadAppHandle = SparkLoadAppHandle.read(in); + } etlStartTimestamp = in.readLong(); appId = Text.readString(in); etlOutputPath = Text.readString(in); @@ -760,7 +763,7 @@ public void readFields(DataInput in) throws IOException { */ private void unprotectedLogUpdateStateInfo() { SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo( - id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, + id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo); Catalog.getCurrentCatalog().getEditLog().logUpdateLoadJob(info); } @@ -769,6 +772,7 @@ private void unprotectedLogUpdateStateInfo() { public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { super.replayUpdateStateInfo(info); SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info; + sparkLoadAppHandle = sparkJobStateInfo.getSparkLoadAppHandle(); etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp(); appId = sparkJobStateInfo.getAppId(); etlOutputPath = sparkJobStateInfo.getEtlOutputPath(); @@ -792,6 +796,8 @@ public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { * Used for spark load job journal log when job state changed to ETL or LOADING */ public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo { + @SerializedName(value = "sparkLoadAppHandle") + private SparkLoadAppHandle sparkLoadAppHandle; @SerializedName(value = "etlStartTimestamp") private long etlStartTimestamp; @SerializedName(value = "appId") @@ -801,16 +807,21 @@ public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo { @SerializedName(value = "tabletMetaToFileInfo") private Map> tabletMetaToFileInfo; - public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long etlStartTimestamp, - String appId, String etlOutputPath, long loadStartTimestamp, + public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, SparkLoadAppHandle sparkLoadAppHandle, + long etlStartTimestamp, String appId, String etlOutputPath, long loadStartTimestamp, Map> tabletMetaToFileInfo) { super(jobId, state, transactionId, loadStartTimestamp); + this.sparkLoadAppHandle = sparkLoadAppHandle; this.etlStartTimestamp = etlStartTimestamp; this.appId = appId; this.etlOutputPath = etlOutputPath; this.tabletMetaToFileInfo = tabletMetaToFileInfo; } + public SparkLoadAppHandle getSparkLoadAppHandle() { + return sparkLoadAppHandle; + } + public long getEtlStartTimestamp() { return etlStartTimestamp; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 57bbd9a0eb3f3b..7f263d7a1615a0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -87,6 +87,7 @@ public class SparkLoadJobTest { private String broker; private long transactionId; private long pendingTaskId; + private SparkLoadAppHandle sparkLoadAppHandle; private String appId; private String etlOutputPath; private long tableId; @@ -107,6 +108,7 @@ public void setUp() { broker = "broker0"; transactionId = 2L; pendingTaskId = 3L; + sparkLoadAppHandle = new SparkLoadAppHandle(); appId = "application_15888888888_0088"; etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101"; tableId = 10L; @@ -448,7 +450,7 @@ public void testStateUpdateInfoPersist() throws IOException { file.createNewFile(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo( - id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, + id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo); info.write(out); out.flush(); @@ -480,8 +482,8 @@ public void testStateUpdateInfoPersist() throws IOException { } file.createNewFile(); out = new DataOutputStream(new FileOutputStream(file)); - info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, - loadStartTimestamp, tabletMetaToFileInfo); + info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, + appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo); info.write(out); out.flush(); out.close();