diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java index 4f0f95ea6456ca..0614bd34ebf9de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -19,16 +19,19 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.ResourceDesc; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.load.loadv2.SparkRepository; +import org.apache.doris.load.loadv2.SparkYarnConfigFiles; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import java.io.File; import java.util.Map; /** @@ -66,6 +69,7 @@ public class SparkResource extends Resource { private static final String SPARK_CONFIG_PREFIX = "spark."; private static final String BROKER_PROPERTY_PREFIX = "broker."; // spark uses hadoop configs in the form of spark.hadoop.* + private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop."; private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address"; private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS"; private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address"; @@ -166,6 +170,23 @@ public synchronized SparkRepository.SparkArchive prepareArchive() throws LoadExc return archive; } + // Each SparkResource has and only has one yarn config to run yarn command + // This method will write all the configuration start with "spark.hadoop." into config files in a specific directory + public synchronized String prepareYarnConfig() throws LoadException { + SparkYarnConfigFiles yarnConfigFiles = new SparkYarnConfigFiles(name, getSparkHadoopConfig(sparkConfigs)); + yarnConfigFiles.prepare(); + return yarnConfigFiles.getConfigDir(); + } + + public String getYarnClientPath() throws LoadException { + String yarnClientPath = Config.yarn_client_path; + File file = new File(yarnClientPath); + if (!file.exists() || !file.isFile()) { + throw new LoadException("yarn client does not exist in path: " + yarnClientPath); + } + return yarnClientPath; + } + public boolean isYarnMaster() { return getMaster().equalsIgnoreCase(YARN_MASTER); } @@ -182,7 +203,7 @@ public void update(ResourceDesc resourceDesc) throws DdlException { if (properties.containsKey(SPARK_MASTER)) { throw new DdlException("Cannot change spark master"); } - sparkConfigs.putAll(getSparkConfigs(properties)); + sparkConfigs.putAll(getSparkConfig(properties)); // update working dir and broker if (properties.containsKey(WORKING_DIR)) { @@ -199,7 +220,7 @@ protected void setProperties(Map properties) throws DdlException Preconditions.checkState(properties != null); // get spark configs - sparkConfigs = getSparkConfigs(properties); + sparkConfigs = getSparkConfig(properties); // check master and deploy mode if (getMaster() == null) { throw new DdlException("Missing " + SPARK_MASTER + " in properties"); @@ -233,14 +254,24 @@ && isYarnMaster()) { brokerProperties = getBrokerProperties(properties); } - private Map getSparkConfigs(Map properties) { - Map sparkConfigs = Maps.newHashMap(); + private Map getSparkConfig(Map properties) { + Map sparkConfig = Maps.newHashMap(); for (Map.Entry entry : properties.entrySet()) { if (entry.getKey().startsWith(SPARK_CONFIG_PREFIX)) { - sparkConfigs.put(entry.getKey(), entry.getValue()); + sparkConfig.put(entry.getKey(), entry.getValue()); } } - return sparkConfigs; + return sparkConfig; + } + + private Map getSparkHadoopConfig(Map properties) { + Map sparkConfig = Maps.newHashMap(); + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().startsWith(SPARK_HADOOP_CONFIG_PREFIX)) { + sparkConfig.put(entry.getKey(), entry.getValue()); + } + } + return sparkConfig; } private Map getBrokerProperties(Map properties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 26fccc0c42541f..2b4ee22868f021 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -543,6 +543,20 @@ public class Config extends ConfigBase { @ConfField public static String spark_resource_path = ""; + /** + * Default yarn client path + */ + @ConfField + public static String yarn_client_path = PaloFe.DORIS_HOME_DIR + "/lib/yarn-client/hadoop/bin/yarn"; + + /** + * Default yarn config file directory + * Each time before running the yarn command, we need to check that the + * config file exists under this path, and if not, create them. + */ + @ConfField + public static String yarn_config_dir = PaloFe.DORIS_HOME_DIR + "/lib/yarn-config"; + /** * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ConfigFile.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ConfigFile.java new file mode 100644 index 00000000000000..17384a72cbd07f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ConfigFile.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.LoadException; + +// The config file required to run the yarn command. +// Each time before running the yarn command, we need to check that the +// config file exists in the specified path, and if not, create them. +public interface ConfigFile { + public void createFile() throws LoadException; + public String getFilePath(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 428e88c3cae693..1d9eedc5ae7d3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -20,11 +20,14 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.SparkResource; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.LoadException; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.CommandResult; +import org.apache.doris.common.util.Util; import org.apache.doris.load.EtlStatus; +import org.apache.doris.load.loadv2.SparkLoadAppHandle.State; import org.apache.doris.load.loadv2.dpp.DppResult; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.apache.doris.load.loadv2.etl.SparkEtlJob; @@ -37,23 +40,13 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.spark.launcher.SparkAppHandle; -import org.apache.spark.launcher.SparkAppHandle.Listener; -import org.apache.spark.launcher.SparkAppHandle.State; import org.apache.spark.launcher.SparkLauncher; -import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; @@ -74,20 +67,12 @@ public class SparkEtlJobHandler { private static final String JOB_CONFIG_DIR = "configs"; private static final String ETL_JOB_NAME = "doris__%s"; // 5min - private static final int GET_APPID_MAX_RETRY_TIMES = 300; - private static final int GET_APPID_SLEEP_MS = 1000; - - class SparkAppListener implements Listener { - @Override - public void stateChanged(SparkAppHandle sparkAppHandle) { - LOG.info("get spark state changed: {}, app id: {}", sparkAppHandle.getState(), sparkAppHandle.getAppId()); - } - - @Override - public void infoChanged(SparkAppHandle sparkAppHandle) { - LOG.info("get spark info changed: {}, app id: {}", sparkAppHandle.getState(), sparkAppHandle.getAppId()); - } - } + private static final long GET_APPID_TIMEOUT_MS = 300000L; + // 30s + private static final long EXEC_CMD_TIMEOUT_MS = 30000L; + // yarn command + private static final String YARN_STATUS_CMD = "%s --config %s application -status %s"; + private static final String YARN_KILL_CMD = "%s --config %s application -kill %s"; public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource, BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException { @@ -140,8 +125,7 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo .setAppName(String.format(ETL_JOB_NAME, loadLabel)) .setSparkHome(sparkHome) .addAppArgs(jobConfigHdfsPath) - .redirectError() - .redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log")); + .redirectError(); // spark configs for (Map.Entry entry : resource.getSparkConfigs().entrySet()) { @@ -149,44 +133,38 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo } // start app - SparkAppHandle handle = null; + SparkLoadAppHandle handle = null; State state = null; String appId = null; - int retry = 0; String errMsg = "start spark app failed. error: "; try { - handle = launcher.startApplication(new SparkAppListener()); + Process process = launcher.launch(); + handle = new SparkLoadAppHandle(process); + if (!FeConstants.runningUnitTest) { + SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); + logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS); + logMonitor.start(); + try { + logMonitor.join(); + } catch (InterruptedException e) { + logMonitor.interrupt(); + throw new LoadException(errMsg + e.getMessage()); + } + } + appId = handle.getAppId(); + state = handle.getState(); } catch (IOException e) { LOG.warn(errMsg, e); throw new LoadException(errMsg + e.getMessage()); } - while (retry++ < GET_APPID_MAX_RETRY_TIMES) { - appId = handle.getAppId(); - if (appId != null) { - break; - } - - // check state and retry - state = handle.getState(); - if (fromSparkState(state) == TEtlState.CANCELLED) { - throw new LoadException(errMsg + "spark app state: " + state.toString()); - } - if (retry >= GET_APPID_MAX_RETRY_TIMES) { - throw new LoadException(errMsg + "wait too much time for getting appid. spark app state: " - + state.toString()); - } + if (fromSparkState(state) == TEtlState.CANCELLED) { + throw new LoadException(errMsg + "spark app state: " + state.toString() + ", loadJobId:" + loadJobId); + } - // log - if (retry % 10 == 0) { - LOG.info("spark appid that handle get is null. load job id: {}, state: {}, retry times: {}", - loadJobId, state.toString(), retry); - } - try { - Thread.sleep(GET_APPID_SLEEP_MS); - } catch (InterruptedException e) { - LOG.warn(e.getMessage()); - } + if (appId == null) { + throw new LoadException(errMsg + "Waiting too much time to get appId from handle. spark app state: " + + state.toString() + ", loadJobId:" + loadJobId); } // success @@ -194,39 +172,50 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo attachment.setHandle(handle); } - public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJobId, String etlOutputPath, - SparkResource resource, BrokerDesc brokerDesc) { + public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long loadJobId, String etlOutputPath, + SparkResource resource, BrokerDesc brokerDesc) throws LoadException { EtlStatus status = new EtlStatus(); + Preconditions.checkState(appId != null && !appId.isEmpty()); if (resource.isYarnMaster()) { - // state from yarn - Preconditions.checkState(appId != null && !appId.isEmpty()); - YarnClient client = startYarnClient(resource); - try { - ApplicationReport report = client.getApplicationReport(ConverterUtils.toApplicationId(appId)); - LOG.info("yarn application -status {}. load job id: {}, result: {}", appId, loadJobId, report); - - YarnApplicationState state = report.getYarnApplicationState(); - FinalApplicationStatus faStatus = report.getFinalApplicationStatus(); - status.setState(fromYarnState(state, faStatus)); - if (status.getState() == TEtlState.CANCELLED) { - if (state == YarnApplicationState.FINISHED) { - status.setFailMsg("spark app state: " + faStatus.toString()); - } else { - status.setFailMsg("yarn app state: " + state.toString()); + // prepare yarn config + String configDir = resource.prepareYarnConfig(); + // yarn client path + String yarnClient = resource.getYarnClientPath(); + // command: yarn --config configDir application -status appId + String yarnStatusCmd = String.format(YARN_STATUS_CMD, yarnClient, configDir, appId); + LOG.info(yarnStatusCmd); + String[] envp = { "LC_ALL=" + Config.locale }; + CommandResult result = Util.executeCommand(yarnStatusCmd, envp, EXEC_CMD_TIMEOUT_MS); + if (result.getReturnCode() != 0) { + String stderr = result.getStderr(); + if (stderr != null) { + // case application not exists + if (stderr.contains("doesn't exist in RM")) { + LOG.warn("spark app not found. spark app id: {}, load job id: {}", appId, loadJobId); + status.setState(TEtlState.CANCELLED); } } - status.setTrackingUrl(report.getTrackingUrl()); - status.setProgress((int) (report.getProgress() * 100)); - } catch (ApplicationNotFoundException e) { - LOG.warn("spark app not found. spark app id: {}, load job id: {}", appId, loadJobId, e); + LOG.warn("yarn application status failed. spark app id: {}, load job id: {}, timeout: {}, msg: {}", + appId, loadJobId, EXEC_CMD_TIMEOUT_MS, stderr); status.setState(TEtlState.CANCELLED); - status.setFailMsg(e.getMessage()); - } catch (YarnException | IOException e) { - LOG.warn("yarn application status failed. spark app id: {}, load job id: {}", appId, loadJobId, e); - } finally { - stopYarnClient(client); + return status; } + ApplicationReport report = new YarnApplicationReport(result.getStdout()).getReport(); + LOG.info("yarn application -status {}. load job id: {}, output: {}, report: {}", + appId, loadJobId, result.getStdout(), report); + YarnApplicationState state = report.getYarnApplicationState(); + FinalApplicationStatus faStatus = report.getFinalApplicationStatus(); + status.setState(fromYarnState(state, faStatus)); + if (status.getState() == TEtlState.CANCELLED) { + if (state == YarnApplicationState.FINISHED) { + status.setFailMsg("spark app state: " + faStatus.toString()); + } else { + status.setFailMsg("yarn app state: " + state.toString()); + } + } + status.setTrackingUrl(handle.getUrl() != null? handle.getUrl() : report.getTrackingUrl()); + status.setProgress((int) (report.getProgress() * 100)); } else { // state from handle if (handle == null) { @@ -262,19 +251,22 @@ public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJ return status; } - public void killEtlJob(SparkAppHandle handle, String appId, long loadJobId, SparkResource resource) { + public void killEtlJob(SparkLoadAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException { + Preconditions.checkNotNull(appId); if (resource.isYarnMaster()) { - Preconditions.checkNotNull(appId); - YarnClient client = startYarnClient(resource); - try { - try { - client.killApplication(ConverterUtils.toApplicationId(appId)); - LOG.info("yarn application -kill {}", appId); - } catch (YarnException | IOException e) { - LOG.warn("yarn application kill failed. app id: {}, load job id: {}", appId, loadJobId, e); - } - } finally { - stopYarnClient(client); + // prepare yarn config + String configDir = resource.prepareYarnConfig(); + // yarn client path + String yarnClient = resource.getYarnClientPath(); + // command: yarn --config configDir application -kill appId + String yarnKillCmd = String.format(YARN_KILL_CMD, yarnClient, configDir, appId); + LOG.info(yarnKillCmd); + String[] envp = { "LC_ALL=" + Config.locale }; + CommandResult result = Util.executeCommand(yarnKillCmd, envp, EXEC_CMD_TIMEOUT_MS); + LOG.info("yarn application -kill {}, output: {}", appId, result.getStdout()); + if (result.getReturnCode() != 0) { + String stderr = result.getStderr(); + LOG.warn("yarn application kill failed. app id: {}, load job id: {}, msg: {}", appId, loadJobId, stderr); } } else { if (handle != null) { @@ -314,21 +306,6 @@ public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { } } - private YarnClient startYarnClient(SparkResource resource) { - YarnClient client = YarnClient.createYarnClient(); - Configuration conf = new YarnConfiguration(); - // set yarn.resourcemanager.address - Pair pair = resource.getYarnResourcemanagerAddressPair(); - conf.set(pair.first, pair.second); - client.init(conf); - client.start(); - return client; - } - - private void stopYarnClient(YarnClient client) { - client.stop(); - } - private TEtlState fromYarnState(YarnApplicationState state, FinalApplicationStatus faStatus) { switch (state) { case FINISHED: diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java new file mode 100644 index 00000000000000..628037d68895ce --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SparkLauncherMonitor { + private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitor.class); + + public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) { + return new LogMonitor(handle); + } + + private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) { + switch (yarnState) { + case SUBMITTED: + case ACCEPTED: + return SparkLoadAppHandle.State.SUBMITTED; + case RUNNING: + return SparkLoadAppHandle.State.RUNNING; + case FINISHED: + return SparkLoadAppHandle.State.FINISHED; + case FAILED: + return SparkLoadAppHandle.State.FAILED; + case KILLED: + return SparkLoadAppHandle.State.KILLED; + default: + // NEW NEW_SAVING + return SparkLoadAppHandle.State.UNKNOWN; + } + } + + // This monitor is use for monitoring the spark launcher process. + // User can use this monitor to get real-time `appId`, `state` and `tracking-url` + // of spark launcher by reading and analyze the output of process. + public static class LogMonitor extends Thread { + private final Process process; + private SparkLoadAppHandle handle; + private long submitTimeoutMs; + private boolean isStop; + + private static final String STATE = "state"; + private static final String QUEUE = "queue"; + private static final String START_TIME = "start time"; + private static final String FINAL_STATUS = "final status"; + private static final String URL = "tracking URL"; + private static final String USER = "user"; + + // 5min + private static final long DEFAULT_SUBMIT_TIMEOUT_MS = 300000L; + + public LogMonitor(SparkLoadAppHandle handle) { + this.handle = handle; + this.process = handle.getProcess(); + this.isStop = false; + setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS); + } + + public void setSubmitTimeoutMs(long submitTimeoutMs) { + this.submitTimeoutMs = submitTimeoutMs; + } + + // Normally, log monitor will automatically stop if the spark app state changes + // to RUNNING. + // But if the spark app state changes to FAILED/KILLED/LOST, log monitor will stop + // and kill the spark launcher process. + // There is a `submitTimeout` for preventing the spark app state from staying in + // UNKNOWN/SUBMITTED for a long time. + @Override + public void run() { + BufferedReader outReader = null; + String line = null; + long startTime = System.currentTimeMillis(); + try { + outReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + while (!isStop && (line = outReader.readLine()) != null) { + LOG.info("monitor log: " + line); + SparkLoadAppHandle.State oldState = handle.getState(); + SparkLoadAppHandle.State newState = oldState; + // parse state and appId + if (line.contains(STATE)) { + // 1. state + String state = regexGetState(line); + if (state != null) { + YarnApplicationState yarnState = YarnApplicationState.valueOf(state); + newState = fromYarnState(yarnState); + if (newState != oldState) { + handle.setState(newState); + } + } + // 2. appId + String appId = regexGetAppId(line); + if (appId != null) { + if (!appId.equals(handle.getAppId())) { + handle.setAppId(appId); + } + } + + LOG.debug("spark appId that handle get is {}, state: {}", handle.getAppId(), handle.getState().toString()); + switch (newState) { + case UNKNOWN: + case CONNECTED: + case SUBMITTED: + // If the app stays in the UNKNOWN/CONNECTED/SUBMITTED state for more than submitTimeoutMs + // stop monitoring and kill the process + if (System.currentTimeMillis() - startTime > submitTimeoutMs) { + isStop = true; + handle.kill(); + } + break; + case RUNNING: + case FINISHED: + // There's no need to parse all logs of handle process to get all the information. + // As soon as the state changes to RUNNING/FINISHED, + // stop monitoring but keep the process alive. + isStop = true; + break; + case KILLED: + case FAILED: + case LOST: + // If the state changes to KILLED/FAILED/LOST, + // stop monitoring and kill the process + isStop = true; + handle.kill(); + break; + default: + Preconditions.checkState(false, "wrong spark app state"); + } + } + // parse other values + else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINAL_STATUS) || + line.contains(URL) || line.contains(USER)) { + String value = getValue(line); + if (!Strings.isNullOrEmpty(value)) { + try { + if (line.contains(QUEUE)) { + handle.setQueue(value); + } else if (line.contains(START_TIME)) { + handle.setStartTime(Long.parseLong(value)); + } else if (line.contains(FINAL_STATUS)) { + handle.setFinalStatus(FinalApplicationStatus.valueOf(value)); + } else if (line.contains(URL)) { + handle.setUrl(value); + } else if (line.contains(USER)) { + handle.setUser(value); + } + } catch (IllegalArgumentException e) { + LOG.warn("parse log encounter an error, line: {}, msg: {}", line, e.getMessage()); + } + } + } + } + } catch (Exception e) { + LOG.warn("Exception monitoring process.", e); + } finally { + try { + if (outReader != null) { + outReader.close(); + } + } catch (IOException e) { + LOG.warn("close buffered reader error", e); + } + } + } + + // e.g. + // input: "final status: SUCCEEDED" + // output: "SUCCEEDED" + private static String getValue(String line) { + String result = null; + List entry = Splitter.onPattern(":").trimResults().limit(2).splitToList(line); + if (entry.size() == 2) { + result = entry.get(1); + } + return result; + } + + // e.g. + // input: "Application report for application_1573630236805_6864759 (state: ACCEPTED)" + // output: "ACCEPTED" + private static String regexGetState(String line) { + String result = null; + Matcher stateMatcher = Pattern.compile("(?<=\\(state: )(.+?)(?=\\))").matcher(line); + if (stateMatcher.find()) { + result = stateMatcher.group(); + } + return result; + } + + // e.g. + // input: "Application report for application_1573630236805_6864759 (state: ACCEPTED)" + // output: "application_1573630236805_6864759" + private static String regexGetAppId(String line) { + String result = null; + Matcher appIdMatcher = Pattern.compile("application_[0-9]+_[0-9]+").matcher(line); + if (appIdMatcher.find()) { + result = appIdMatcher.group(); + } + return result; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java new file mode 100644 index 00000000000000..5d75ae9392a6f1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import com.google.common.collect.Lists; + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; +import java.util.List; + +public class SparkLoadAppHandle { + private static final Logger LOG = LogManager.getLogger(SparkLoadAppHandle.class); + // 5min + private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000; + + private Process process; + + private String appId; + private State state; + private String queue; + private long startTime; + private FinalApplicationStatus finalStatus; + private String trackingUrl; + private String user; + + private List listeners; + + public interface Listener { + void stateChanged(SparkLoadAppHandle handle); + + void infoChanged(SparkLoadAppHandle handle); + } + + public static enum State { + UNKNOWN(false), + CONNECTED(false), + SUBMITTED(false), + RUNNING(false), + FINISHED(true), + FAILED(true), + KILLED(true), + LOST(true); + + private final boolean isFinal; + + private State(boolean isFinal) { + this.isFinal = isFinal; + } + + public boolean isFinal() { + return this.isFinal; + } + } + + public SparkLoadAppHandle(Process process) { + this.state = State.UNKNOWN; + this.process = process; + } + + public void addListener(Listener listener) { + if (this.listeners == null) { + this.listeners = Lists.newArrayList(); + } + + this.listeners.add(listener); + } + + public void stop() { + } + + public void kill() { + this.setState(State.KILLED); + if (this.process != null) { + if (this.process.isAlive()) { + this.process.destroyForcibly(); + } + this.process = null; + } + } + + public State getState() { return this.state; } + + public String getAppId() { return this.appId; } + + public String getQueue() { return this.queue; } + + public Process getProcess() { return this.process; } + + public long getStartTime() { return this.startTime; } + + public FinalApplicationStatus getFinalStatus() { return this.finalStatus; } + + public String getUrl() { return this.trackingUrl; } + + public String getUser() { return this.user; } + + public void setState(State state) { + this.state = state; + this.fireEvent(false); + } + + public void setAppId(String appId) { + this.appId = appId; + this.fireEvent(true); + } + + public void setQueue(String queue) { + this.queue = queue; + this.fireEvent(true); + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + this.fireEvent(true); + } + + public void setFinalStatus(FinalApplicationStatus status) { + this.finalStatus = status; + this.fireEvent(true); + } + + public void setUrl(String url) { + this.trackingUrl = url; + this.fireEvent(true); + } + + public void setUser(String user) { + this.user = user; + this.fireEvent(true); + } + + private void fireEvent(boolean isInfoChanged) { + if (this.listeners != null) { + Iterator iterator = this.listeners.iterator(); + + while (iterator.hasNext()) { + Listener l = (Listener)iterator.next(); + if (isInfoChanged) { + l.infoChanged(this); + } else { + l.stateChanged(this); + } + } + } + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index f1dff69c78399a..6576be232f6b9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -20,15 +20,14 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.CastExpr; import org.apache.doris.analysis.DescriptorTable; -import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; @@ -36,6 +35,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.SparkResource; import org.apache.doris.catalog.Tablet; @@ -82,11 +82,6 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.spark.launcher.SparkAppHandle; - import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -94,6 +89,9 @@ import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -127,7 +125,7 @@ public class SparkLoadJob extends BulkLoadJob { // --- members below not persist --- private ResourceDesc resourceDesc; // for spark standalone - private SparkAppHandle sparkAppHandle; + private SparkLoadAppHandle sparkLoadAppHandle; // for straggler wait long time to commit transaction private long quorumFinishTimestamp = -1; // below for push task @@ -230,7 +228,7 @@ private void onPendingTaskFinished(SparkPendingTaskAttachment attachment) { // add task id into finishedTaskIds finishedTaskIds.add(attachment.getTaskId()); - sparkAppHandle = attachment.getHandle(); + sparkLoadAppHandle = attachment.getHandle(); appId = attachment.getAppId(); etlOutputPath = attachment.getOutputPath(); @@ -278,7 +276,7 @@ public void updateEtlStatus() throws Exception { // get etl status SparkEtlJobHandler handler = new SparkEtlJobHandler(); - EtlStatus status = handler.getEtlJobStatus(sparkAppHandle, appId, id, etlOutputPath, sparkResource, brokerDesc); + EtlStatus status = handler.getEtlJobStatus(sparkLoadAppHandle, appId, id, etlOutputPath, sparkResource, brokerDesc); writeLock(); try { switch (status.getState()) { @@ -630,9 +628,9 @@ private void clearJob() { LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state); SparkEtlJobHandler handler = new SparkEtlJobHandler(); if (state == JobState.CANCELLED) { - if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) { + if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkLoadAppHandle != null) { try { - handler.killEtlJob(sparkAppHandle, appId, id, sparkResource); + handler.killEtlJob(sparkLoadAppHandle, appId, id, sparkResource); } catch (Exception e) { LOG.warn("kill etl job failed. id: {}, state: {}", id, state, e); } @@ -661,7 +659,7 @@ private void clearJob() { } } // clear job infos that not persist - sparkAppHandle = null; + sparkLoadAppHandle = null; resourceDesc = null; tableToLoadPartitions.clear(); indexToPushBrokerReaderParams.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java index 311ca3bab70380..4189defdb693a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java @@ -17,10 +17,8 @@ package org.apache.doris.load.loadv2; -import org.apache.spark.launcher.SparkAppHandle; - public class SparkPendingTaskAttachment extends TaskAttachment { - private SparkAppHandle handle; + private SparkLoadAppHandle handle; private String appId; private String outputPath; @@ -28,7 +26,7 @@ public SparkPendingTaskAttachment(long taskId) { super(taskId); } - public SparkAppHandle getHandle() { + public SparkLoadAppHandle getHandle() { return handle; } @@ -40,7 +38,7 @@ public void setAppId(String appId) { this.appId = appId; } - public void setHandle(SparkAppHandle handle) { + public void setHandle(SparkLoadAppHandle handle) { this.handle = handle; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 18f8504886bbf8..3fc8bce63a3233 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -98,7 +98,7 @@ public void prepare() throws LoadException { } private void initRepository() throws LoadException { - LOG.info("start to init remote repositoryi. local dpp: {}", this.localDppPath); + LOG.info("start to init remote repository. local dpp: {}", this.localDppPath); boolean needUpload = false; boolean needReplace = false; CHECK: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkYarnConfigFiles.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkYarnConfigFiles.java new file mode 100644 index 00000000000000..c980d74b0c7d84 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkYarnConfigFiles.java @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.Config; +import org.apache.doris.common.LoadException; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +public class SparkYarnConfigFiles { + private static final Logger LOG = LogManager.getLogger(SparkYarnConfigFiles.class); + + private static final String HADOOP_CONF_FILE = "core-site.xml"; + private static final String YARN_CONF_FILE = "yarn-site.xml"; + private static final String SPARK_HADOOP_PREFIX = "spark.hadoop."; + private static final String HADOOP_PREFIX = "hadoop."; + private static final String YARN_PREFIX = "yarn."; + + private String configDir; + private List configFiles; + + public String getConfigDir() { + return this.configDir; + } + + public SparkYarnConfigFiles(String resourceName, Map properties) { + this.configDir = Config.yarn_config_dir + "/" + resourceName; + this.configFiles = Lists.newArrayList(); + createConfigFiles(properties); + } + + // for unit test + public SparkYarnConfigFiles(String resourceName, String parentDir, Map properties) { + this.configDir = parentDir + "/" + resourceName; + this.configFiles = Lists.newArrayList(); + createConfigFiles(properties); + } + + private void createConfigFiles(Map properties) { + LOG.info("create config file, properties size: {}", properties.size()); + configFiles.add(new XMLConfigFile(configDir + "/" + HADOOP_CONF_FILE, + getPropertiesByPrefix(properties, HADOOP_PREFIX))); + configFiles.add(new XMLConfigFile(configDir + "/" + YARN_CONF_FILE, + getPropertiesByPrefix(properties, YARN_PREFIX))); + } + + public void prepare() throws LoadException { + initConfigFile(); + } + + private void initConfigFile() throws LoadException { + LOG.info("start to init config file. config dir: {}", this.configDir); + Preconditions.checkState(!Strings.isNullOrEmpty(configDir)); + + boolean needUpdate = false; + boolean needReplace = false; + CHECK: { + if (!checkConfigDirExists(this.configDir)) { + needUpdate = true; + break CHECK; + } + + for (ConfigFile configFile : configFiles) { + String filePath = configFile.getFilePath(); + if (!checkConfigFileExists(filePath)) { + needUpdate = true; + needReplace = true; + break CHECK; + } + } + } + + if (needUpdate) { + updateConfig(needReplace); + } + LOG.info("init spark yarn config success, config dir={}, config file size={}", + configDir, configFiles.size()); + } + + private boolean checkConfigDirExists(String dir) { + boolean result = true; + File configDir = new File(dir); + if (!configDir.exists() || !configDir.isDirectory()) { + result = false; + } + LOG.info("check yarn client config dir exists, result: {}", result); + return result; + } + + private boolean checkConfigFileExists(String filePath) { + boolean result = true; + File configFile = new File(filePath); + if (!configFile.exists() || !configFile.isFile()) { + result = false; + } + LOG.info("check yarn client config file path exists, result: {}, path: {}", result, filePath); + return result; + } + + private void updateConfig(boolean needReplace) throws LoadException { + if (needReplace) { + clearAndDelete(this.configDir); + } + mkdir(this.configDir); + for (ConfigFile configFile : configFiles) { + configFile.createFile(); + } + LOG.info("finished to update yarn client config dir, dir={}", configDir); + } + + private Map getPropertiesByPrefix(Map properties, String prefix) { + Map result = Maps.newHashMap(); + Iterator> iterator = properties.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry property = iterator.next(); + String key = property.getKey(); + if (key.startsWith(SPARK_HADOOP_PREFIX)) { + String newKey = key.substring(SPARK_HADOOP_PREFIX.length()); + if (newKey.startsWith(prefix)) { + result.put(newKey, property.getValue()); + iterator.remove(); + } + } + } + return result; + } + + private void clearAndDelete(String deletePath) { + File file = new File(deletePath); + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + return; + } + File[] files = file.listFiles(); + for (File file1 : files) { + clearAndDelete(file1.getAbsolutePath()); + } + file.delete(); + } + + private void mkdir(String configDir) { + File file = new File(configDir); + file.mkdirs(); + } + + // xml config file + public static class XMLConfigFile implements ConfigFile { + private static final String CONFIGURATION = "configuration"; + private static final String PROPERTY = "property"; + private static final String NAME = "name"; + private static final String VALUE = "value"; + + private String filePath; + private Map configProperties; + + public XMLConfigFile(String filePath, Map configProperties) { + this.filePath = filePath; + this.configProperties = configProperties; + } + + @Override + public String getFilePath() { + return filePath; + } + + @Override + public void createFile() throws LoadException { + createXML(this.filePath, this.configProperties); + } + + private void createXML(String filePath, Map properties) throws LoadException { + try { + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder db = factory.newDocumentBuilder(); + Document document = db.newDocument(); + document.setXmlStandalone(true); + Element configuration = (Element) appendNode(document, CONFIGURATION, null); + for (Map.Entry pair : properties.entrySet()) { + Element property = (Element) appendNode(configuration, PROPERTY, null); + appendNode(property, NAME, pair.getKey()); + appendNode(property, VALUE, pair.getValue()); + } + + TransformerFactory tff = TransformerFactory.newInstance(); + Transformer tf = tff.newTransformer(); + + tf.setOutputProperty(OutputKeys.INDENT, "yes"); + tf.transform(new DOMSource(document), new StreamResult(new File(filePath))); + } catch (Exception e) { + throw new LoadException(e.getMessage()); + } + } + + private Node appendNode(Node parent, String tag, String content) { + Element child = null; + if (parent instanceof Document) { + child = ((Document)parent).createElement(tag); + } else { + child = parent.getOwnerDocument().createElement(tag); + } + if (content != null && !content.equals("")) { + child.setTextContent(content); + } + return parent.appendChild(child); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/YarnApplicationReport.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/YarnApplicationReport.java new file mode 100644 index 00000000000000..cd540c280e5325 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/YarnApplicationReport.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.LoadException; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; + +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import java.text.NumberFormat; +import java.text.ParseException; +import java.util.List; +import java.util.Map; + +/** + * Covert output string of command `yarn application -status` to application report. + * Input sample: + * ------------------- + * Application Report : + * Application-Id : application_1573630236805_6763648 + * Application-Name : doris_label_test + * Application-Type : SPARK-2.4.1 + * User : test + * Queue : test-queue + * Start-Time : 1597654469958 + * Finish-Time : 1597654801939 + * Progress : 100% + * State : FINISHED + * Final-State : SUCCEEDED + * Tracking-URL : 127.0.0.1:8004/history/application_1573630236805_6763648/1 + * RPC Port : 40236 + * AM Host : host-name + * ------------------ + * + * Output: + * ApplicationReport + */ +public class YarnApplicationReport { + private static final String APPLICATION_ID = "Application-Id"; + private static final String APPLICATION_TYPE = "Application-Type"; + private static final String APPLICATION_NAME = "Application-Name"; + private static final String USER = "User"; + private static final String QUEUE = "Queue"; + private static final String START_TIME = "Start-Time"; + private static final String FINISH_TIME = "Finish-Time"; + private static final String PROGRESS = "Progress"; + private static final String STATE = "State"; + private static final String FINAL_STATE = "Final-State"; + private static final String TRACKING_URL = "Tracking-URL"; + private static final String RPC_PORT = "RPC Port"; + private static final String AM_HOST = "AM Host"; + private static final String DIAGNOSTICS = "Diagnostics"; + + private ApplicationReport report; + + public YarnApplicationReport(String output) throws LoadException { + this.report = new ApplicationReportPBImpl(); + parseFromOutput(output); + } + + public ApplicationReport getReport() { + return report; + } + + private void parseFromOutput(String output) throws LoadException { + Map reportMap = Maps.newHashMap(); + List lines = Splitter.onPattern("\n").trimResults().splitToList(output); + // Application-Id : application_1573630236805_6763648 ==> (Application-Id, application_1573630236805_6763648) + for (String line : lines) { + List entry = Splitter.onPattern(":").limit(2).trimResults().splitToList(line); + Preconditions.checkState(entry.size() <= 2, line); + if (entry.size() > 1) { + reportMap.put(entry.get(0), entry.get(1)); + } else { + reportMap.put(entry.get(0), ""); + } + } + + try { + report.setApplicationId(ConverterUtils.toApplicationId(reportMap.get(APPLICATION_ID))); + report.setName(reportMap.get(APPLICATION_NAME)); + report.setApplicationType(reportMap.get(APPLICATION_TYPE)); + report.setUser(reportMap.get(USER)); + report.setQueue(reportMap.get(QUEUE)); + report.setStartTime(Long.parseLong(reportMap.get(START_TIME))); + report.setFinishTime(Long.parseLong(reportMap.get(FINISH_TIME))); + report.setProgress(NumberFormat.getPercentInstance().parse(reportMap.get(PROGRESS)).floatValue()); + report.setYarnApplicationState(YarnApplicationState.valueOf(reportMap.get(STATE))); + report.setFinalApplicationStatus(FinalApplicationStatus.valueOf(reportMap.get(FINAL_STATE))); + report.setTrackingUrl(reportMap.get(TRACKING_URL)); + report.setRpcPort(Integer.parseInt(reportMap.get(RPC_PORT))); + report.setHost(reportMap.get(AM_HOST)); + report.setDiagnostics(reportMap.get(DIAGNOSTICS)); + } catch (NumberFormatException | ParseException e) { + throw new LoadException(e.getMessage()); + } catch (Exception e) { + throw new LoadException(e.getMessage()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index e011062fbbc0e0..904a8b1718ba00 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -29,10 +29,13 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.SparkResource; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.GenericPool; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.CommandResult; +import org.apache.doris.common.util.Util; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.apache.doris.thrift.TBrokerFileStatus; @@ -46,14 +49,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.spark.launcher.SparkAppHandle; -import org.apache.spark.launcher.SparkAppHandle.State; import org.apache.spark.launcher.SparkLauncher; import org.junit.Assert; import org.junit.Before; @@ -76,8 +71,55 @@ public class SparkEtlJobHandlerTest { private String remoteArchivePath; private SparkRepository.SparkArchive archive; + private final String runningReport = "Application Report :\n" + + "Application-Id : application_15888888888_0088\n" + + "Application-Name : label0\n" + + "Application-Type : SPARK-2.4.1\n" + + "User : test\n" + + "Queue : test-queue\n" + + "Start-Time : 1597654469958\n" + + "Finish-Time : 0\n" + + "Progress : 50%\n" + + "State : RUNNING\n" + + "Final-State : UNDEFINED\n" + + "Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" + + "RPC Port : 40236\n" + + "AM Host : host-name"; + + private final String failedReport = "Application Report :\n" + + "Application-Id : application_15888888888_0088\n" + + "Application-Name : label0\n" + + "Application-Type : SPARK-2.4.1\n" + + "User : test\n" + + "Queue : test-queue\n" + + "Start-Time : 1597654469958\n" + + "Finish-Time : 1597654801939\n" + + "Progress : 100%\n" + + "State : FINISHED\n" + + "Final-State : FAILED\n" + + "Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" + + "RPC Port : 40236\n" + + "AM Host : host-name"; + + private final String finishReport = "Application Report :\n" + + "Application-Id : application_15888888888_0088\n" + + "Application-Name : label0\n" + + "Application-Type : SPARK-2.4.1\n" + + "User : test\n" + + "Queue : test-queue\n" + + "Start-Time : 1597654469958\n" + + "Finish-Time : 1597654801939\n" + + "Progress : 100%\n" + + "State : FINISHED\n" + + "Final-State : SUCCEEDED\n" + + "Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" + + "RPC Port : 40236\n" + + "AM Host : host-name"; + + @Before public void setUp() { + FeConstants.runningUnitTest = true; loadJobId = 0L; label = "label0"; resourceName = "spark0"; @@ -96,16 +138,16 @@ public void setUp() { } @Test - public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher, - @Injectable SparkAppHandle handle) throws IOException, LoadException { + public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher, @Injectable Process process, + @Mocked SparkLoadAppHandle handle ) throws IOException, LoadException { new Expectations() { { - launcher.startApplication((SparkAppHandle.Listener) any); - result = handle; + launcher.launch(); + result = process; handle.getAppId(); - returns(null, null, appId); + result = appId; handle.getState(); - returns(State.CONNECTED, State.SUBMITTED, State.RUNNING); + result = SparkLoadAppHandle.State.RUNNING; } }; @@ -129,20 +171,19 @@ public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLaunche // check submit etl job success Assert.assertEquals(appId, attachment.getAppId()); - Assert.assertEquals(handle, attachment.getHandle()); } @Test(expected = LoadException.class) - public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher, - @Injectable SparkAppHandle handle) throws IOException, LoadException { + public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher, @Injectable Process process, + @Mocked SparkLoadAppHandle handle) throws IOException, LoadException { new Expectations() { { - launcher.startApplication((SparkAppHandle.Listener) any); - result = handle; + launcher.launch(); + result = process; handle.getAppId(); - result = null; + result = appId; handle.getState(); - returns(State.CONNECTED, State.SUBMITTED, State.FAILED); + result = SparkLoadAppHandle.State.FAILED; } }; @@ -166,23 +207,32 @@ public void testSubmitEtlJobFailed(@Mocked BrokerUtil brokerUtil, @Mocked SparkL } @Test - public void testGetEtlJobStatus(@Mocked BrokerUtil brokerUtil, @Mocked YarnClient client, - @Injectable ApplicationReport report) - throws IOException, YarnException, UserException { + public void testGetEtlJobStatus(@Mocked BrokerUtil brokerUtil, @Mocked Util util, @Mocked CommandResult commandResult, + @Mocked SparkYarnConfigFiles sparkYarnConfigFiles, @Mocked SparkLoadAppHandle handle) + throws IOException, UserException { + new Expectations() { { - YarnClient.createYarnClient(); - result = client; - client.getApplicationReport((ApplicationId) any); - result = report; - report.getYarnApplicationState(); - returns(YarnApplicationState.RUNNING, YarnApplicationState.FINISHED, YarnApplicationState.FINISHED); - report.getFinalApplicationStatus(); - returns(FinalApplicationStatus.UNDEFINED, FinalApplicationStatus.FAILED, FinalApplicationStatus.SUCCEEDED); - report.getTrackingUrl(); + sparkYarnConfigFiles.prepare(); + sparkYarnConfigFiles.getConfigDir(); + result = "./yarn_config"; + + commandResult.getReturnCode(); + result = 0; + commandResult.getStdout(); + returns(runningReport, runningReport, failedReport, failedReport, finishReport, finishReport); + + handle.getUrl(); result = trackingUrl; - report.getProgress(); - returns(0.5f, 1f, 1f); + } + }; + + new Expectations() { + { + Util.executeCommand(anyString, (String[]) any); + minTimes = 0; + result = commandResult; + BrokerUtil.readFile(anyString, (BrokerDesc) any); result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}"; } @@ -193,22 +243,30 @@ public void testGetEtlJobStatus(@Mocked BrokerUtil brokerUtil, @Mocked YarnClien sparkConfigs.put("spark.master", "yarn"); sparkConfigs.put("spark.submit.deployMode", "cluster"); sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999"); + new Expectations(resource) { + { + resource.getYarnClientPath(); + result = Config.yarn_client_path; + } + }; + BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap()); SparkEtlJobHandler handler = new SparkEtlJobHandler(); // running - EtlStatus status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc); + EtlStatus status = handler.getEtlJobStatus(handle, appId, loadJobId, etlOutputPath, resource, brokerDesc); Assert.assertEquals(TEtlState.RUNNING, status.getState()); Assert.assertEquals(50, status.getProgress()); + Assert.assertEquals(trackingUrl, status.getTrackingUrl()); // yarn finished and spark failed - status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc); + status = handler.getEtlJobStatus(handle, appId, loadJobId, etlOutputPath, resource, brokerDesc); Assert.assertEquals(TEtlState.CANCELLED, status.getState()); Assert.assertEquals(100, status.getProgress()); Assert.assertEquals("etl job failed", status.getDppResult().failedReason); // finished - status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc); + status = handler.getEtlJobStatus(handle, appId, loadJobId, etlOutputPath, resource, brokerDesc); Assert.assertEquals(TEtlState.FINISHED, status.getState()); Assert.assertEquals(100, status.getProgress()); Assert.assertEquals(trackingUrl, status.getTrackingUrl()); @@ -217,13 +275,26 @@ public void testGetEtlJobStatus(@Mocked BrokerUtil brokerUtil, @Mocked YarnClien } @Test - public void testKillEtlJob(@Mocked YarnClient client) throws IOException, YarnException { + public void testGetEtlJobStatusFailed(@Mocked Util util, @Mocked CommandResult commandResult, + @Mocked SparkYarnConfigFiles sparkYarnConfigFiles, @Mocked SparkLoadAppHandle handle) + throws IOException, UserException { + new Expectations() { { - YarnClient.createYarnClient(); - result = client; - client.killApplication((ApplicationId) any); - times = 1; + sparkYarnConfigFiles.prepare(); + sparkYarnConfigFiles.getConfigDir(); + result = "./yarn_config"; + + commandResult.getReturnCode(); + result = -1; + } + }; + + new Expectations() { + { + Util.executeCommand(anyString, (String[]) any); + minTimes = 0; + result = commandResult; } }; @@ -232,6 +303,59 @@ public void testKillEtlJob(@Mocked YarnClient client) throws IOException, YarnEx sparkConfigs.put("spark.master", "yarn"); sparkConfigs.put("spark.submit.deployMode", "cluster"); sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999"); + new Expectations(resource) { + { + resource.getYarnClientPath(); + result = Config.yarn_client_path; + } + }; + + BrokerDesc brokerDesc = new BrokerDesc(broker, Maps.newHashMap()); + SparkEtlJobHandler handler = new SparkEtlJobHandler(); + + // yarn finished and spark failed + EtlStatus status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc); + Assert.assertEquals(TEtlState.CANCELLED, status.getState()); + } + + @Test + public void testKillEtlJob(@Mocked Util util, @Mocked CommandResult commandResult, + @Mocked SparkYarnConfigFiles sparkYarnConfigFiles) throws IOException, UserException { + new Expectations() { + { + sparkYarnConfigFiles.prepare(); + sparkYarnConfigFiles.getConfigDir(); + result = "./yarn_config"; + } + }; + + new Expectations() { + { + commandResult.getReturnCode(); + result = 0; + } + }; + + new Expectations() { + { + Util.executeCommand(anyString, (String[]) any); + minTimes = 0; + result = commandResult; + } + }; + + SparkResource resource = new SparkResource(resourceName); + Map sparkConfigs = resource.getSparkConfigs(); + sparkConfigs.put("spark.master", "yarn"); + sparkConfigs.put("spark.submit.deployMode", "cluster"); + sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999"); + new Expectations(resource) { + { + resource.getYarnClientPath(); + result = Config.yarn_client_path; + } + }; + SparkEtlJobHandler handler = new SparkEtlJobHandler(); try { handler.killEtlJob(null, appId, loadJobId, resource); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java new file mode 100644 index 00000000000000..8e55d4138db6bd --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; + +public class SparkLauncherMonitorTest { + private String appId; + private SparkLoadAppHandle.State state; + private String queue; + private long startTime; + private FinalApplicationStatus finalApplicationStatus; + private String trackingUrl; + private String user; + + @Before + public void setUp() { + appId = "application_1573630236805_6864759"; + state = SparkLoadAppHandle.State.RUNNING; + queue = "spark-queue"; + startTime = 1597916263384L; + finalApplicationStatus = FinalApplicationStatus.UNDEFINED; + trackingUrl = "http://myhost:8388/proxy/application_1573630236805_6864759/"; + user = "testugi"; + } + + @Test + public void testLogMonitorNormal() { + URL log = getClass().getClassLoader().getResource("spark_launcher_monitor.log"); + String cmd = "cat " + log.getPath(); + SparkLoadAppHandle handle = null; + try { + Process process = Runtime.getRuntime().exec(cmd); + handle = new SparkLoadAppHandle(process); + SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); + logMonitor.start(); + try { + logMonitor.join(); + } catch (InterruptedException e) { + } + } catch (IOException e) { + Assert.fail(); + } + + Assert.assertEquals(appId, handle.getAppId()); + Assert.assertEquals(state, handle.getState()); + Assert.assertEquals(queue, handle.getQueue()); + Assert.assertEquals(startTime, handle.getStartTime()); + Assert.assertEquals(finalApplicationStatus, handle.getFinalStatus()); + Assert.assertEquals(trackingUrl, handle.getUrl()); + Assert.assertEquals(user, handle.getUser()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 671de5a73516d8..e86cad0bdb9b9d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -17,15 +17,18 @@ package org.apache.doris.load.loadv2; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; -import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.ResourceMgr; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -33,6 +36,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.RangePartitionInfo; import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.ResourceMgr; import org.apache.doris.catalog.SparkResource; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; @@ -57,13 +61,9 @@ import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import mockit.Expectations; -import mockit.Injectable; -import mockit.Mocked; -import org.apache.spark.launcher.SparkAppHandle; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -269,7 +269,7 @@ public void testUpdateEtlStatusRunning(@Mocked Catalog catalog, @Injectable Stri new Expectations() { { - handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, etlOutputPath, + handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath, (SparkResource) any, (BrokerDesc) any); result = status; } @@ -292,7 +292,7 @@ public void testUpdateEtlStatusCancelled(@Mocked Catalog catalog, @Injectable St new Expectations() { { - handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, etlOutputPath, + handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath, (SparkResource) any, (BrokerDesc) any); result = status; } @@ -312,7 +312,7 @@ public void testUpdateEtlStatusFinishedQualityFailed(@Mocked Catalog catalog, @I new Expectations() { { - handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, etlOutputPath, + handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath, (SparkResource) any, (BrokerDesc) any); result = status; } @@ -343,7 +343,7 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction( new Expectations() { { - handler.getEtlJobStatus((SparkAppHandle) any, appId, anyLong, etlOutputPath, + handler.getEtlJobStatus((SparkLoadAppHandle) any, appId, anyLong, etlOutputPath, (SparkResource) any, (BrokerDesc) any); result = status; handler.getEtlFilePaths(etlOutputPath, (BrokerDesc) any); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkYarnConfigFilesTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkYarnConfigFilesTest.java new file mode 100644 index 00000000000000..09e4f62403971b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkYarnConfigFilesTest.java @@ -0,0 +1,72 @@ +package org.apache.doris.load.loadv2; + +import mockit.Mocked; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; +import com.google.common.collect.Maps; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.Map; + +public class SparkYarnConfigFilesTest { + private static final String RESOURCE_NAME = "spark0"; + private static final String SPARK_HADOOP_PREFIX = "spark.hadoop."; + private static final String YARN_CONFIG_DIR = "./yarn_config"; + + private Map properties; + + @Mocked + Catalog catalog; + + @Before + public void setUp() { + properties = Maps.newHashMap(); + properties.put(SPARK_HADOOP_PREFIX + "hadoop.job.ugi", "test,test"); + properties.put(SPARK_HADOOP_PREFIX + "hadoop.security.authentication", "simple"); + properties.put(SPARK_HADOOP_PREFIX + "yarn.resourcemanager.address", "host:port"); + properties.put(SPARK_HADOOP_PREFIX + "yarn.resourcemanager.scheduler.address", "host:port"); + } + + @Test + public void testNormal() { + SparkYarnConfigFiles sparkYarnConfigFiles = new SparkYarnConfigFiles(RESOURCE_NAME, YARN_CONFIG_DIR, properties); + try { + // prepare config files + sparkYarnConfigFiles.prepare(); + // get config files' parent directory + String configDir = sparkYarnConfigFiles.getConfigDir(); + File dir = new File(configDir); + File[] configFiles = dir.listFiles(); + Assert.assertEquals(2, configFiles.length); + } catch (LoadException e) { + Assert.fail(); + } + } + + @After + public void clear() { + delete(YARN_CONFIG_DIR); + } + + private void delete(String deletePath) { + File file = new File(deletePath); + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + return; + } + File[] files = file.listFiles(); + for (File file1 : files) { + delete(file1.getAbsolutePath()); + } + file.delete(); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/YarnApplicationReportTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/YarnApplicationReportTest.java new file mode 100644 index 00000000000000..2815d5f306ae78 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/YarnApplicationReportTest.java @@ -0,0 +1,50 @@ +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.LoadException; + +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.junit.Assert; +import org.junit.Test; + +public class YarnApplicationReportTest { + private final String runningReport = "Application Report :\n" + + "Application-Id : application_15888888888_0088\n" + + "Application-Name : label0\n" + + "Application-Type : SPARK-2.4.1\n" + + "User : test\n" + + "Queue : test-queue\n" + + "Start-Time : 1597654469958\n" + + "Finish-Time : 0\n" + + "Progress : 50%\n" + + "State : RUNNING\n" + + "Final-State : UNDEFINED\n" + + "Tracking-URL : http://127.0.0.1:8080/proxy/application_1586619723848_0088/\n" + + "RPC Port : 40236\n" + + "AM Host : host-name"; + + @Test + public void testParseToReport() { + try { + YarnApplicationReport yarnReport = new YarnApplicationReport(runningReport); + ApplicationReport report = yarnReport.getReport(); + Assert.assertEquals("application_15888888888_0088", report.getApplicationId().toString()); + Assert.assertEquals("label0", report.getName()); + Assert.assertEquals("test", report.getUser()); + Assert.assertEquals("test-queue", report.getQueue()); + Assert.assertEquals(1597654469958L, report.getStartTime()); + Assert.assertEquals(0L, report.getFinishTime()); + Assert.assertTrue(report.getProgress() == 0.5f); + Assert.assertEquals(YarnApplicationState.RUNNING, report.getYarnApplicationState()); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, report.getFinalApplicationStatus()); + Assert.assertEquals("http://127.0.0.1:8080/proxy/application_1586619723848_0088/", report.getTrackingUrl()); + Assert.assertEquals(40236, report.getRpcPort()); + Assert.assertEquals("host-name", report.getHost()); + + } catch (LoadException e) { + e.printStackTrace(); + Assert.fail(); + } + } +} diff --git a/fe/fe-core/src/test/resources/spark_launcher_monitor.log b/fe/fe-core/src/test/resources/spark_launcher_monitor.log new file mode 100644 index 00000000000000..4b9e5df1ce16b8 --- /dev/null +++ b/fe/fe-core/src/test/resources/spark_launcher_monitor.log @@ -0,0 +1,264 @@ +WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +WARN DependencyUtils: Skip remote jar hdfs://myhost:54310/user/etl/cluster_id/__spark_repository__test8/__archive_1.0.0/__lib_bea2ecd751343b3e1651556e85e98f89_spark-dpp-1.0.0-jar-with-dependencies.jar. +INFO RMProxy: Connecting to ResourceManager at myhost/127.0.0.1:8350 +INFO Client: Requesting a new application from cluster with 0 NodeManagers +INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (204800 MB per container) +INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead +INFO Client: Setting up container launch context for our AM +INFO Client: Setting up the launch environment for our AM container +INFO Client: Preparing resources for our AM container +* 0 [521082][9458][src/wrapper/hdfs_filesystem_wrapper.cpp:204] connect to cluster: hdfs://myhost:54310 +* 0 [522398][9458][src/afs/afs.cpp:171] connect to afs cluster: hdfs://myhost:54310 +INFO Client: Source and destination file systems are the same. Not copying hdfs://myhost:54310/user/etl/cluster_id/__spark_repository__test8/__archive_1.0.0/__lib_4d29048a6e58c5dd21d9deb9589573be_spark-2x.zip +INFO Client: Source and destination file systems are the same. Not copying hdfs://myhost:54310/user/etl/cluster_id/__spark_repository__test8/__archive_1.0.0/__lib_bea2ecd751343b3e1651556e85e98f89_spark-dpp-1.0.0-jar-with-dependencies.jar +* 0 [714727][9458][src/afs/dfs_client_impl.cpp:1759] fail to get path info, path:/user/etl/.sparkStaging/application_1573630236805_6864759/__dce_sessions__, ret:-10011 +INFO Client: Uploading resource file:/tmp/spark-7448982a-b4c2-4224-9f14-32b145deb920/__dce__2334649440686670150.zip -> hdfs://myhost:54310/user/etl/.sparkStaging/application_1573630236805_6864759/__dce__.zip +* 0 [517842][9458][src/afs/dfs_client_impl.cpp:1759] fail to get path info, path:/user/etl/.sparkStagig/application_1573630236805_6864759/__dce__.zip, ret:-10011 +* 0 [522307][9458][src/afs/dfs_client_impl.cpp:477] Avg write speed: 170393 +INFO Client: Uploading resource file:/tmp/spark-7448982a-b4c2-4224-9f14-32b145deb920/__spark_conf__1476443680705568032.zip -> hdfs://myhost:54310/user/etl/.sparkStaging/application_1573630236805_6864759/__spark_conf__.zip +* 0 [388983][9458][src/afs/dfs_client_impl.cpp:1759] fail to get path info, path:/user/etl/.sparkStaging/application_1573630236805_6864759/__spark_conf__.zip, ret:-10011 +INFO SecurityManager: Changing view acls to: test,testugi +INFO SecurityManager: Changing modify acls to: test,testugi +INFO SecurityManager: Changing view acls groups to: +INFO SecurityManager: Changing modify acls groups to: +INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(test, testugi); groups with view permissions: Set(); users with modify permissions: Set(test, testugi); groups with modify permissions: Set() +WARN Client: priority = 30 +INFO Client: Submitting application application_1573630236805_6864759 to ResourceManager +INFO YarnClientImpl: Submitted application application_1573630236805_6864759 +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: +client token: N/A +diagnostics: N/A +applicationMaster host: N/A +applicationMaster RPC port: -1 +queue: spark-queue +start time: 1597916263384 +final status: UNDEFINED +tracking URL: http://myhost:8388/proxy/application_1573630236805_6864759/ +user: testugi +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: ACCEPTED) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: +client token: N/A +diagnostics: N/A +applicationMaster host: applicationMasterHost +applicationMaster RPC port: 48297 +queue: spark-queue +start time: 1597916263384 +final status: UNDEFINED +tracking URL: http://myhost:8388/proxy/application_1573630236805_6864759/ +user: testugi +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: RUNNING) +INFO Client: Application report for application_1573630236805_6864759 (state: FINISHED) +INFO Client: +client token: N/A +diagnostics: N/A +applicationMaster host: applicationMasterHost +applicationMaster RPC port: 48297 +queue: spark-queue +start time: 1597916263384 +final status: SUCCEEDED +tracking URL: http://myhost:8388/proxy/application_1573630236805_6864759/A +user: testugi +* 0 [689781][9458][src/afs/dfs_client_impl.cpp:1473] fail to del, path:/user/etl/.sparkStaging/application_1573630236805_6864759, ret:-10011 +* 0 [689781][9458][src/afs/dfs_client_impl.cpp:1473] fail to del, path:/user/etl/.sparkStaging/application_1573630236805_6864759, ret:-10011 +WARN DFileSystem: Delete src not found! path: hdfs://myhost:54310/user/etl/.sparkStaging/application_1573630236805_6864759 +INFO ShutdownHookManager: Shutdown hook called +INFO ShutdownHookManager: Deleting directory /tmp/spark-7448982a-b4c2-4224-9f14-32b145deb920 +INFO ShutdownHookManager: Deleting directory /tmp/spark-5c1c8a31-3db5-4042-925a-405509cd21da