From 8b9cc4d4b28635dca281ad1fa46700aa969e8f7f Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 3 Sep 2020 21:23:11 +0800 Subject: [PATCH 1/5] save code --- .../apache/doris/load/loadv2/SparkEtlJobHandler.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index f3e2f91150697b..62ed11604ce1ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -264,8 +264,18 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l } public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException { - Preconditions.checkNotNull(appId); if (resource.isYarnMaster()) { + // When users kill the load job in PENDING phase, it is possible that the appId is empty. This is + // because the appId is parsed from the spark launcher process's output (spark launcher process + // submit job and then return appId). In this case, the spark job has still not been submitted, + // we only need to kill the spark launcher process. + if (Strings.isNullOrEmpty(appId)) { + appId = handle.getAppId(); + if (Strings.isNullOrEmpty(appId)) { + handle.kill(); + return; + } + } // prepare yarn config String configDir = resource.prepareYarnConfig(); // yarn client path From b9d338630bbc6c5448751fa827912cfb898f30c8 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 4 Sep 2020 10:28:58 +0800 Subject: [PATCH 2/5] save --- .../java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 62ed11604ce1ba..c88c22d83dbf81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -265,7 +265,7 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException { if (resource.isYarnMaster()) { - // When users kill the load job in PENDING phase, it is possible that the appId is empty. This is + // When users kill a spark load in PENDING phase, it is possible that the appId is empty. This is // because the appId is parsed from the spark launcher process's output (spark launcher process // submit job and then return appId). In this case, the spark job has still not been submitted, // we only need to kill the spark launcher process. From a68ed24f8c93b77ada19d398eb7f6f60c81520d2 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 4 Sep 2020 16:29:19 +0800 Subject: [PATCH 3/5] save code --- .../doris/load/loadv2/SparkEtlJobHandler.java | 13 ++++++------- .../doris/load/loadv2/SparkLoadAppHandle.java | 4 ++++ .../org/apache/doris/load/loadv2/SparkLoadJob.java | 4 ++++ .../doris/load/loadv2/SparkLoadPendingTask.java | 11 ++++++----- .../doris/load/loadv2/SparkEtlJobHandlerTest.java | 4 ++-- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index c88c22d83dbf81..22a0605edb3395 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -77,7 +77,7 @@ public class SparkEtlJobHandler { private static final String YARN_KILL_CMD = "%s --config %s application -kill %s"; public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource, - BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException { + BrokerDesc brokerDesc, SparkLoadAppHandle handle, SparkPendingTaskAttachment attachment) throws LoadException { // delete outputPath deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); @@ -142,13 +142,12 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo } // start app - SparkLoadAppHandle handle = null; State state = null; String appId = null; String errMsg = "start spark app failed. error: "; try { Process process = launcher.launch(); - handle = new SparkLoadAppHandle(process); + handle.setProcess(process); if (!FeConstants.runningUnitTest) { SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS); @@ -265,10 +264,10 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException { if (resource.isYarnMaster()) { - // When users kill a spark load in PENDING phase, it is possible that the appId is empty. This is - // because the appId is parsed from the spark launcher process's output (spark launcher process - // submit job and then return appId). In this case, the spark job has still not been submitted, - // we only need to kill the spark launcher process. + // The appId may be empty in when the load job is in PENDING phase. This is because the appId is + // parsed from the spark launcher process's output (spark launcher process submit job and then + // return appId). In this case, the spark job has still not been submitted, we only need to kill + // the spark launcher process. if (Strings.isNullOrEmpty(appId)) { appId = handle.getAppId(); if (Strings.isNullOrEmpty(appId)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 22896556087920..c732027e1646d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -134,6 +134,10 @@ public void kill() { public String getLogPath() { return this.logPath; } + public void setProcess(Process process) { + this.process = process; + } + public void setState(State state) { this.state = state; this.fireEvent(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index f2460c00b0d6ab..9297295748a407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -715,6 +715,10 @@ protected long getEtlStartTimestamp() { return etlStartTimestamp; } + public SparkLoadAppHandle getHandle() { + return sparkLoadAppHandle; + } + public void clearSparkLauncherLog() { String logPath = sparkLoadAppHandle.getLogPath(); if (!Strings.isNullOrEmpty(logPath)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 185e84a54958c9..dbf5247a435c02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -57,16 +57,15 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; import org.apache.doris.transaction.TransactionState; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; import java.util.Set; @@ -84,6 +83,7 @@ public class SparkLoadPendingTask extends LoadTask { private final long loadJobId; private final long transactionId; private EtlJobConfig etlJobConfig; + private SparkLoadAppHandle sparkLoadAppHandle; public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, @@ -98,6 +98,7 @@ public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, this.loadJobId = loadTaskCallback.getId(); this.loadLabel = loadTaskCallback.getLabel(); this.transactionId = loadTaskCallback.getTransactionId(); + this.sparkLoadAppHandle = loadTaskCallback.getHandle(); this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL); } @@ -115,7 +116,7 @@ private void submitEtlJob() throws LoadException { // handler submit etl job SparkEtlJobHandler handler = new SparkEtlJobHandler(); - handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkAttachment); + handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkLoadAppHandle, sparkAttachment); LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index 904a8b1718ba00..73df62834bab54 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -167,7 +167,7 @@ public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLaunche BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap()); SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId); SparkEtlJobHandler handler = new SparkEtlJobHandler(); - handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, attachment); + handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment); // check submit etl job success Assert.assertEquals(appId, attachment.getAppId()); @@ -203,7 +203,7 @@ public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkL BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap()); SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId); SparkEtlJobHandler handler = new SparkEtlJobHandler(); - handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, attachment); + handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment); } @Test From 523b36e864478ac79683a7594ca14236a67c9802 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 4 Sep 2020 17:52:54 +0800 Subject: [PATCH 4/5] save code --- .../org/apache/doris/load/loadv2/SparkLauncherMonitor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 9a664482ba7d5d..9af39c4f3ab4cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -106,6 +106,11 @@ public void setRedirectLogPath(String redirectLogPath) throws IOException { // UNKNOWN/SUBMITTED for a long time. @Override public void run() { + if (handle.getState() == SparkLoadAppHandle.State.KILLED) { + // If handle has been killed, kill the process + process.destroyForcibly(); + return; + } BufferedReader outReader = null; String line = null; long startTime = System.currentTimeMillis(); From 2f0772de7bddf85301c42ed83f067b390bf84ec7 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 4 Sep 2020 18:15:22 +0800 Subject: [PATCH 5/5] fix --- .../java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 22a0605edb3395..21f6431927de31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -264,7 +264,7 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException { if (resource.isYarnMaster()) { - // The appId may be empty in when the load job is in PENDING phase. This is because the appId is + // The appId may be empty when the load job is in PENDING phase. This is because the appId is // parsed from the spark launcher process's output (spark launcher process submit job and then // return appId). In this case, the spark job has still not been submitted, we only need to kill // the spark launcher process.