diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index f3e2f91150697b..21f6431927de31 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -77,7 +77,7 @@ public class SparkEtlJobHandler { private static final String YARN_KILL_CMD = "%s --config %s application -kill %s"; public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource, - BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException { + BrokerDesc brokerDesc, SparkLoadAppHandle handle, SparkPendingTaskAttachment attachment) throws LoadException { // delete outputPath deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); @@ -142,13 +142,12 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo } // start app - SparkLoadAppHandle handle = null; State state = null; String appId = null; String errMsg = "start spark app failed. error: "; try { Process process = launcher.launch(); - handle = new SparkLoadAppHandle(process); + handle.setProcess(process); if (!FeConstants.runningUnitTest) { SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS); @@ -264,8 +263,18 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l } public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException { - Preconditions.checkNotNull(appId); if (resource.isYarnMaster()) { + // The appId may be empty when the load job is in PENDING phase. This is because the appId is + // parsed from the spark launcher process's output (spark launcher process submit job and then + // return appId). In this case, the spark job has still not been submitted, we only need to kill + // the spark launcher process. + if (Strings.isNullOrEmpty(appId)) { + appId = handle.getAppId(); + if (Strings.isNullOrEmpty(appId)) { + handle.kill(); + return; + } + } // prepare yarn config String configDir = resource.prepareYarnConfig(); // yarn client path diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 9a664482ba7d5d..9af39c4f3ab4cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -106,6 +106,11 @@ public void setRedirectLogPath(String redirectLogPath) throws IOException { // UNKNOWN/SUBMITTED for a long time. @Override public void run() { + if (handle.getState() == SparkLoadAppHandle.State.KILLED) { + // If handle has been killed, kill the process + process.destroyForcibly(); + return; + } BufferedReader outReader = null; String line = null; long startTime = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 22896556087920..c732027e1646d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -134,6 +134,10 @@ public void kill() { public String getLogPath() { return this.logPath; } + public void setProcess(Process process) { + this.process = process; + } + public void setState(State state) { this.state = state; this.fireEvent(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index f2460c00b0d6ab..9297295748a407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -715,6 +715,10 @@ protected long getEtlStartTimestamp() { return etlStartTimestamp; } + public SparkLoadAppHandle getHandle() { + return sparkLoadAppHandle; + } + public void clearSparkLauncherLog() { String logPath = sparkLoadAppHandle.getLogPath(); if (!Strings.isNullOrEmpty(logPath)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 185e84a54958c9..dbf5247a435c02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -57,16 +57,15 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; import org.apache.doris.transaction.TransactionState; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; import java.util.Set; @@ -84,6 +83,7 @@ public class SparkLoadPendingTask extends LoadTask { private final long loadJobId; private final long transactionId; private EtlJobConfig etlJobConfig; + private SparkLoadAppHandle sparkLoadAppHandle; public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, @@ -98,6 +98,7 @@ public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, this.loadJobId = loadTaskCallback.getId(); this.loadLabel = loadTaskCallback.getLabel(); this.transactionId = loadTaskCallback.getTransactionId(); + this.sparkLoadAppHandle = loadTaskCallback.getHandle(); this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL); } @@ -115,7 +116,7 @@ private void submitEtlJob() throws LoadException { // handler submit etl job SparkEtlJobHandler handler = new SparkEtlJobHandler(); - handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkAttachment); + handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkLoadAppHandle, sparkAttachment); LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index 904a8b1718ba00..73df62834bab54 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -167,7 +167,7 @@ public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLaunche BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap()); SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId); SparkEtlJobHandler handler = new SparkEtlJobHandler(); - handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, attachment); + handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment); // check submit etl job success Assert.assertEquals(appId, attachment.getAppId()); @@ -203,7 +203,7 @@ public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkL BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap()); SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId); SparkEtlJobHandler handler = new SparkEtlJobHandler(); - handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, attachment); + handler.submitEtlJob(loadJobId, label, etlJobConfig, resource, brokerDesc, handle, attachment); } @Test