diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java index 6dcf51ed64b19f..eb827cbb92d8b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import com.google.common.primitives.Longs; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.common.Pair; @@ -28,9 +27,9 @@ import org.apache.doris.thrift.TTypeDesc; import org.apache.doris.thrift.TTypeNode; import org.apache.doris.thrift.TTypeNodeType; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 2f53fc47e8887a..b88bc063559f58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -543,6 +543,12 @@ public class Config extends ConfigBase { @ConfField public static String spark_resource_path = ""; + /** + * The specified spark launcher log dir + */ + @ConfField + public static String spark_launcher_log_dir = sys_log_dir + "/spark_launcher_log"; + /** * Default yarn client path */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 9b0741b835f1ac..5f6d6c31a1b966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -45,7 +45,6 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -379,6 +378,9 @@ public void removeOldLoadJob() { && ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) { iter.remove(); dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job); + if (job instanceof SparkLoadJob) { + ((SparkLoadJob) job).clearSparkLauncherLog(); + } } } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 96b7064453a5bf..f3e2f91150697b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; +import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; @@ -66,6 +67,7 @@ public class SparkEtlJobHandler { private static final String CONFIG_FILE_NAME = "jobconfig.json"; private static final String JOB_CONFIG_DIR = "configs"; private static final String ETL_JOB_NAME = "doris__%s"; + private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log"; // 5min private static final long GET_APPID_TIMEOUT_MS = 300000L; // 30s @@ -79,6 +81,11 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo // delete outputPath deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); + // init local dir + if (!FeConstants.runningUnitTest) { + initLocalDir(); + } + // prepare dpp archive SparkRepository.SparkArchive archive = resource.prepareArchive(); SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary(); @@ -96,6 +103,8 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo String jobArchiveHdfsPath = spark2xLibrary.remotePath; // spark yarn stage dir String jobStageHdfsPath = resource.getWorkingDir(); + // spark launcher log path + String logFilePath = Config.spark_launcher_log_dir + "/" + String.format(LAUNCHER_LOG, loadJobId, loadLabel); // update archive and stage configs here Map sparkConfigs = resource.getSparkConfigs(); @@ -143,6 +152,7 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo if (!FeConstants.runningUnitTest) { SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS); + logMonitor.setRedirectLogPath(logFilePath); logMonitor.start(); try { logMonitor.join(); @@ -299,6 +309,14 @@ public Map getEtlFilePaths(String outputPath, BrokerDesc brokerDes return filePathToSize; } + public static synchronized void initLocalDir() { + String logDir = Config.spark_launcher_log_dir; + File file = new File(logDir); + if (!file.exists()) { + file.mkdirs(); + } + } + public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { try { BrokerUtil.deletePath(outputPath, brokerDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 628037d68895ce..9a664482ba7d5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -27,8 +27,11 @@ import org.apache.logging.log4j.Logger; import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -67,6 +70,7 @@ public static class LogMonitor extends Thread { private SparkLoadAppHandle handle; private long submitTimeoutMs; private boolean isStop; + private OutputStream outputStream; private static final String STATE = "state"; private static final String QUEUE = "queue"; @@ -89,6 +93,11 @@ public void setSubmitTimeoutMs(long submitTimeoutMs) { this.submitTimeoutMs = submitTimeoutMs; } + public void setRedirectLogPath(String redirectLogPath) throws IOException { + this.outputStream = new FileOutputStream(new File(redirectLogPath), false); + this.handle.setLogPath(redirectLogPath); + } + // Normally, log monitor will automatically stop if the spark app state changes // to RUNNING. // But if the spark app state changes to FAILED/KILLED/LOST, log monitor will stop @@ -103,7 +112,9 @@ public void run() { try { outReader = new BufferedReader(new InputStreamReader(process.getInputStream())); while (!isStop && (line = outReader.readLine()) != null) { - LOG.info("monitor log: " + line); + if (outputStream != null) { + outputStream.write((line + "\n").getBytes()); + } SparkLoadAppHandle.State oldState = handle.getState(); SparkLoadAppHandle.State newState = oldState; // parse state and appId @@ -186,6 +197,9 @@ else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINA if (outReader != null) { outReader.close(); } + if (outputStream != null) { + outputStream.close(); + } } catch (IOException e) { LOG.warn("close buffered reader error", e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 5d75ae9392a6f1..bfe335ad3d8e64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -40,6 +40,7 @@ public class SparkLoadAppHandle { private FinalApplicationStatus finalStatus; private String trackingUrl; private String user; + private String logPath; private List listeners; @@ -112,6 +113,8 @@ public void kill() { public String getUser() { return this.user; } + public String getLogPath() { return this.logPath; } + public void setState(State state) { this.state = state; this.fireEvent(false); @@ -147,6 +150,11 @@ public void setUser(String user) { this.fireEvent(true); } + public void setLogPath(String logPath) { + this.logPath = logPath; + this.fireEvent(true); + } + private void fireEvent(boolean isInfoChanged) { if (this.listeners != null) { Iterator iterator = this.listeners.iterator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index d18416d5bf34d9..7927dc07bfeab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -94,6 +94,7 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; @@ -713,6 +714,18 @@ protected long getEtlStartTimestamp() { return etlStartTimestamp; } + public void clearSparkLauncherLog() { + if (sparkLoadAppHandle != null) { + String logPath = sparkLoadAppHandle.getLogPath(); + if (!Strings.isNullOrEmpty(logPath)) { + File file = new File(logPath); + if (file.exists()) { + file.delete(); + } + } + } + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java index 8e55d4138db6bd..8f9761896a3ef0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java @@ -18,10 +18,12 @@ package org.apache.doris.load.loadv2; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.net.URL; @@ -33,6 +35,7 @@ public class SparkLauncherMonitorTest { private FinalApplicationStatus finalApplicationStatus; private String trackingUrl; private String user; + private String logPath; @Before public void setUp() { @@ -43,6 +46,7 @@ public void setUp() { finalApplicationStatus = FinalApplicationStatus.UNDEFINED; trackingUrl = "http://myhost:8388/proxy/application_1573630236805_6864759/"; user = "testugi"; + logPath = "./spark-launcher.log"; } @Test @@ -54,6 +58,7 @@ public void testLogMonitorNormal() { Process process = Runtime.getRuntime().exec(cmd); handle = new SparkLoadAppHandle(process); SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); + logMonitor.setRedirectLogPath(logPath); logMonitor.start(); try { logMonitor.join(); @@ -63,6 +68,7 @@ public void testLogMonitorNormal() { Assert.fail(); } + // check values Assert.assertEquals(appId, handle.getAppId()); Assert.assertEquals(state, handle.getState()); Assert.assertEquals(queue, handle.getQueue()); @@ -70,5 +76,17 @@ public void testLogMonitorNormal() { Assert.assertEquals(finalApplicationStatus, handle.getFinalStatus()); Assert.assertEquals(trackingUrl, handle.getUrl()); Assert.assertEquals(user, handle.getUser()); + + // check log + File file = new File(logPath); + Assert.assertTrue(file.exists()); + } + + @After + public void clear() { + File file = new File(logPath); + if (file.exists()) { + file.delete(); + } } }