Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ public final class FeMetaVersion {
public static final int VERSION_89 = 89;
// for global variable persist
public static final int VERSION_90 = 90;
// sparkLoadAppHandle
public static final int VERSION_91 = 91;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_90;
public static final int VERSION_CURRENT = VERSION_91;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,44 @@

package org.apache.doris.load.loadv2;

import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;

import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class SparkLoadAppHandle {
public class SparkLoadAppHandle implements Writable {
private static final Logger LOG = LogManager.getLogger(SparkLoadAppHandle.class);
// 5min
private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;

private Process process;

@SerializedName("appId")
private String appId;
@SerializedName("state")
private State state;
@SerializedName("queue")
private String queue;
@SerializedName("startTime")
private long startTime;
@SerializedName("finalStatus")
private FinalApplicationStatus finalStatus;
@SerializedName("trackingUrl")
private String trackingUrl;
@SerializedName("user")
private String user;
@SerializedName("logPath")
private String logPath;

private List<Listener> listeners;
Expand Down Expand Up @@ -76,6 +91,10 @@ public SparkLoadAppHandle(Process process) {
this.process = process;
}

public SparkLoadAppHandle() {
this.state = State.UNKNOWN;
}

public void addListener(Listener listener) {
if (this.listeners == null) {
this.listeners = Lists.newArrayList();
Expand Down Expand Up @@ -168,6 +187,16 @@ private void fireEvent(boolean isInfoChanged) {
}
}
}
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public static SparkLoadAppHandle read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, SparkLoadAppHandle.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
Expand Down Expand Up @@ -103,8 +104,8 @@
/**
* There are 4 steps in SparkLoadJob:
* Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job.
* Step2: LoadEtlChecker will check spark etl job status periodly and send push tasks to be when spark etl job is finished.
* Step3: LoadLoadingChecker will check loading status periodly and commit transaction when push tasks are finished.
* Step2: LoadEtlChecker will check spark etl job status periodically and send push tasks to be when spark etl job is finished.
* Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished.
* Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
*/
public class SparkLoadJob extends BulkLoadJob {
Expand All @@ -126,7 +127,7 @@ public class SparkLoadJob extends BulkLoadJob {
// --- members below not persist ---
private ResourceDesc resourceDesc;
// for spark standalone
private SparkLoadAppHandle sparkLoadAppHandle;
private SparkLoadAppHandle sparkLoadAppHandle = new SparkLoadAppHandle();
// for straggler wait long time to commit transaction
private long quorumFinishTimestamp = -1;
// below for push task
Expand Down Expand Up @@ -715,13 +716,11 @@ protected long getEtlStartTimestamp() {
}

public void clearSparkLauncherLog() {
if (sparkLoadAppHandle != null) {
String logPath = sparkLoadAppHandle.getLogPath();
if (!Strings.isNullOrEmpty(logPath)) {
File file = new File(logPath);
if (file.exists()) {
file.delete();
}
String logPath = sparkLoadAppHandle.getLogPath();
if (!Strings.isNullOrEmpty(logPath)) {
File file = new File(logPath);
if (file.exists()) {
file.delete();
}
}
}
Expand All @@ -730,6 +729,7 @@ public void clearSparkLauncherLog() {
public void write(DataOutput out) throws IOException {
super.write(out);
sparkResource.write(out);
sparkLoadAppHandle.write(out);
out.writeLong(etlStartTimestamp);
Text.writeString(out, appId);
Text.writeString(out, etlOutputPath);
Expand All @@ -744,6 +744,9 @@ public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
super.readFields(in);
sparkResource = (SparkResource) Resource.read(in);
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_91) {
sparkLoadAppHandle = SparkLoadAppHandle.read(in);
}
etlStartTimestamp = in.readLong();
appId = Text.readString(in);
etlOutputPath = Text.readString(in);
Expand All @@ -760,7 +763,7 @@ public void readFields(DataInput in) throws IOException {
*/
private void unprotectedLogUpdateStateInfo() {
SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo(
id, state, transactionId, etlStartTimestamp, appId, etlOutputPath,
id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath,
loadStartTimestamp, tabletMetaToFileInfo);
Catalog.getCurrentCatalog().getEditLog().logUpdateLoadJob(info);
}
Expand All @@ -769,6 +772,7 @@ private void unprotectedLogUpdateStateInfo() {
public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
super.replayUpdateStateInfo(info);
SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info;
sparkLoadAppHandle = sparkJobStateInfo.getSparkLoadAppHandle();
etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp();
appId = sparkJobStateInfo.getAppId();
etlOutputPath = sparkJobStateInfo.getEtlOutputPath();
Expand All @@ -792,6 +796,8 @@ public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
* Used for spark load job journal log when job state changed to ETL or LOADING
*/
public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo {
@SerializedName(value = "sparkLoadAppHandle")
private SparkLoadAppHandle sparkLoadAppHandle;
@SerializedName(value = "etlStartTimestamp")
private long etlStartTimestamp;
@SerializedName(value = "appId")
Expand All @@ -801,16 +807,21 @@ public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo {
@SerializedName(value = "tabletMetaToFileInfo")
private Map<String, Pair<String, Long>> tabletMetaToFileInfo;

public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long etlStartTimestamp,
String appId, String etlOutputPath, long loadStartTimestamp,
public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, SparkLoadAppHandle sparkLoadAppHandle,
long etlStartTimestamp, String appId, String etlOutputPath, long loadStartTimestamp,
Map<String, Pair<String, Long>> tabletMetaToFileInfo) {
super(jobId, state, transactionId, loadStartTimestamp);
this.sparkLoadAppHandle = sparkLoadAppHandle;
this.etlStartTimestamp = etlStartTimestamp;
this.appId = appId;
this.etlOutputPath = etlOutputPath;
this.tabletMetaToFileInfo = tabletMetaToFileInfo;
}

public SparkLoadAppHandle getSparkLoadAppHandle() {
return sparkLoadAppHandle;
}

public long getEtlStartTimestamp() {
return etlStartTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class SparkLoadJobTest {
private String broker;
private long transactionId;
private long pendingTaskId;
private SparkLoadAppHandle sparkLoadAppHandle;
private String appId;
private String etlOutputPath;
private long tableId;
Expand All @@ -107,6 +108,7 @@ public void setUp() {
broker = "broker0";
transactionId = 2L;
pendingTaskId = 3L;
sparkLoadAppHandle = new SparkLoadAppHandle();
appId = "application_15888888888_0088";
etlOutputPath = "hdfs://127.0.0.1:10000/tmp/doris/100/label/101";
tableId = 10L;
Expand Down Expand Up @@ -448,7 +450,7 @@ public void testStateUpdateInfoPersist() throws IOException {
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo(
id, state, transactionId, etlStartTimestamp, appId, etlOutputPath,
id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp, appId, etlOutputPath,
loadStartTimestamp, tabletMetaToFileInfo);
info.write(out);
out.flush();
Expand Down Expand Up @@ -480,8 +482,8 @@ public void testStateUpdateInfoPersist() throws IOException {
}
file.createNewFile();
out = new DataOutputStream(new FileOutputStream(file));
info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, etlStartTimestamp, appId, etlOutputPath,
loadStartTimestamp, tabletMetaToFileInfo);
info = new SparkLoadJobStateUpdateInfo(id, state, transactionId, sparkLoadAppHandle, etlStartTimestamp,
appId, etlOutputPath, loadStartTimestamp, tabletMetaToFileInfo);
info.write(out);
out.flush();
out.close();
Expand Down