From a613a069f4bf0116972b6c9d62a9ea504ece1165 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 21:12:07 +0800 Subject: [PATCH 1/4] persist sparkLoadAppHandle --- .../apache/doris/common/FeMetaVersion.java | 4 ++- .../doris/load/loadv2/SparkLoadAppHandle.java | 27 ++++++++++++++++++- .../doris/load/loadv2/SparkLoadJob.java | 23 ++++++++++++---- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 12d950ec1ab45a..db3f0513c545e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -192,6 +192,8 @@ public final class FeMetaVersion { public static final int VERSION_89 = 89; // for global variable persist public static final int VERSION_90 = 90; + // sparkLoadAppHandle + public static final int VERSION_91 = 91; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_90; + public static final int VERSION_CURRENT = VERSION_91; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index bfe335ad3d8e64..8393da3a1176bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -17,29 +17,44 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Iterator; import java.util.List; -public class SparkLoadAppHandle { +public class SparkLoadAppHandle implements Writable { private static final Logger LOG = LogManager.getLogger(SparkLoadAppHandle.class); // 5min private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000; private Process process; + @SerializedName("appId") private String appId; + @SerializedName("state") private State state; + @SerializedName("queue") private String queue; + @SerializedName("startTime") private long startTime; + @SerializedName("finalStatus") private FinalApplicationStatus finalStatus; + @SerializedName("trackingUrl") private String trackingUrl; + @SerializedName("user") private String user; + @SerializedName("logPath") private String logPath; private List listeners; @@ -168,6 +183,16 @@ private void fireEvent(boolean isInfoChanged) { } } } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + public static SparkLoadAppHandle read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, SparkLoadAppHandle.class); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index c0f4f4222054b1..cda9bf13c10afd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -45,6 +45,7 @@ import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -103,8 +104,8 @@ /** * There are 4 steps in SparkLoadJob: * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job. - * Step2: LoadEtlChecker will check spark etl job status periodly and send push tasks to be when spark etl job is finished. - * Step3: LoadLoadingChecker will check loading status periodly and commit transaction when push tasks are finished. + * Step2: LoadEtlChecker will check spark etl job status periodically and send push tasks to be when spark etl job is finished. + * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished. * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction. */ public class SparkLoadJob extends BulkLoadJob { @@ -730,6 +731,7 @@ public void clearSparkLauncherLog() { public void write(DataOutput out) throws IOException { super.write(out); sparkResource.write(out); + sparkLoadAppHandle.write(out); out.writeLong(etlStartTimestamp); Text.writeString(out, appId); Text.writeString(out, etlOutputPath); @@ -744,6 +746,9 @@ public void write(DataOutput out) throws IOException { public void readFields(DataInput in) throws IOException { super.readFields(in); sparkResource = (SparkResource) Resource.read(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_91) { + sparkLoadAppHandle = SparkLoadAppHandle.read(in); + } etlStartTimestamp = in.readLong(); appId = Text.readString(in); etlOutputPath = Text.readString(in); @@ -760,7 +765,7 @@ public void readFields(DataInput in) throws IOException { */ private void unprotectedLogUpdateStateInfo() { SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo( - id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, + id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo); Catalog.getCurrentCatalog().getEditLog().logUpdateLoadJob(info); } @@ -769,6 +774,7 @@ private void unprotectedLogUpdateStateInfo() { public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { super.replayUpdateStateInfo(info); SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info; + sparkLoadAppHandle = sparkJobStateInfo.getSparkLoadAppHandle(); etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp(); appId = sparkJobStateInfo.getAppId(); etlOutputPath = sparkJobStateInfo.getEtlOutputPath(); @@ -792,6 +798,8 @@ public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) { * Used for spark load job journal log when job state changed to ETL or LOADING */ public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo { + @SerializedName(value = "sparkLoadAppHandle") + private SparkLoadAppHandle sparkLoadAppHandle; @SerializedName(value = "etlStartTimestamp") private long etlStartTimestamp; @SerializedName(value = "appId") @@ -801,16 +809,21 @@ public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo { @SerializedName(value = "tabletMetaToFileInfo") private Map> tabletMetaToFileInfo; - public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long etlStartTimestamp, - String appId, String etlOutputPath, long loadStartTimestamp, + public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, SparkLoadAppHandle sparkLoadAppHandle, + long etlStartTimestamp, String appId, String etlOutputPath, long loadStartTimestamp, Map> tabletMetaToFileInfo) { super(jobId, state, transactionId, loadStartTimestamp); + this.sparkLoadAppHandle = sparkLoadAppHandle; this.etlStartTimestamp = etlStartTimestamp; this.appId = appId; this.etlOutputPath = etlOutputPath; this.tabletMetaToFileInfo = tabletMetaToFileInfo; } + public SparkLoadAppHandle getSparkLoadAppHandle() { + return sparkLoadAppHandle; + } + public long getEtlStartTimestamp() { return etlStartTimestamp; } From 672fcf940c20c7b2af1a9ed041f1930922d44fe1 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 21:38:33 +0800 Subject: [PATCH 2/4] fix ut --- .../org/apache/doris/load/loadv2/SparkLoadAppHandle.java | 3 +++ .../org/apache/doris/load/loadv2/SparkLoadJobTest.java | 8 +++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 8393da3a1176bf..4ee355fbac666f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -91,6 +91,9 @@ public SparkLoadAppHandle(Process process) { this.process = process; } + // only for unit test + public SparkLoadAppHandle() { } + public void addListener(Listener listener) { if (this.listeners == null) { this.listeners = Lists.newArrayList(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 57bbd9a0eb3f3b..7f263d7a1615a0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -87,6 +87,7 @@ public class SparkLoadJobTest { private String broker; private long transactionId; private long pendingTaskId; + private SparkLoadAppHandle sparkLoadAppHandle; private String appId; private String etlOutputPath; private long tableId; @@ -107,6 +108,7 @@ public void setUp() { broker = "broker0"; transactionId = 2L; pendingTaskId = 3L; + sparkLoadAppHandle = new SparkLoadAppHandle(); appId = "application_15888888888_0088"; etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101"; tableId = 10L; @@ -448,7 +450,7 @@ public void testStateUpdateInfoPersist() throws IOException { file.createNewFile(); DataOutputStream out = new DataOutputStream(new FileOutputStream(file)); SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo( - id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, + id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo); info.write(out); out.flush(); @@ -480,8 +482,8 @@ public void testStateUpdateInfoPersist() throws IOException { } file.createNewFile(); out = new DataOutputStream(new FileOutputStream(file)); - info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, etlStartTimestamp, appId, etlOutputPath, - loadStartTimestamp, tabletMetaToFileInfo); + info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, + appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo); info.write(out); out.flush(); out.close(); From 21b1a1a01146beb9c5ad664b6c70df1e4ff8ebf7 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 22:32:55 +0800 Subject: [PATCH 3/4] log --- .../doris/load/loadv2/SparkLoadAppHandle.java | 5 +++-- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 13 ++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 4ee355fbac666f..22896556087920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -91,8 +91,9 @@ public SparkLoadAppHandle(Process process) { this.process = process; } - // only for unit test - public SparkLoadAppHandle() { } + public SparkLoadAppHandle() { + this.state = State.UNKNOWN; + } public void addListener(Listener listener) { if (this.listeners == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index cda9bf13c10afd..f7d7e26776fe1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -149,6 +149,7 @@ public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginSt throws MetaNotFoundException { super(dbId, label, originStmt); this.resourceDesc = resourceDesc; + this.sparkLoadAppHandle = new SparkLoadAppHandle(); timeoutSecond = Config.spark_load_default_timeout_second; jobType = EtlJobType.SPARK; } @@ -716,13 +717,11 @@ protected long getEtlStartTimestamp() { } public void clearSparkLauncherLog() { - if (sparkLoadAppHandle != null) { - String logPath = sparkLoadAppHandle.getLogPath(); - if (!Strings.isNullOrEmpty(logPath)) { - File file = new File(logPath); - if (file.exists()) { - file.delete(); - } + String logPath = sparkLoadAppHandle.getLogPath(); + if (!Strings.isNullOrEmpty(logPath)) { + File file = new File(logPath); + if (file.exists()) { + file.delete(); } } } From cc961ceaaf2ad04b50179adf48631c90c73b2fb3 Mon Sep 17 00:00:00 2001 From: xy720 Date: Wed, 2 Sep 2020 22:57:55 +0800 Subject: [PATCH 4/4] fix --- .../main/java/org/apache/doris/load/loadv2/SparkLoadJob.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index f7d7e26776fe1f..f2460c00b0d6ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -127,7 +127,7 @@ public class SparkLoadJob extends BulkLoadJob { // --- members below not persist --- private ResourceDesc resourceDesc; // for spark standalone - private SparkLoadAppHandle sparkLoadAppHandle; + private SparkLoadAppHandle sparkLoadAppHandle = new SparkLoadAppHandle(); // for straggler wait long time to commit transaction private long quorumFinishTimestamp = -1; // below for push task @@ -149,7 +149,6 @@ public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginSt throws MetaNotFoundException { super(dbId, label, originStmt); this.resourceDesc = resourceDesc; - this.sparkLoadAppHandle = new SparkLoadAppHandle(); timeoutSecond = Config.spark_load_default_timeout_second; jobType = EtlJobType.SPARK; }