Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJ
byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc);
String dppResultStr = new String(data, "UTF-8");
DppResult dppResult = new Gson().fromJson(dppResultStr, DppResult.class);
status.setDppResult(dppResult);
if (status.getState() == TEtlState.CANCELLED && !Strings.isNullOrEmpty(dppResult.failedReason)) {
status.setFailMsg(dppResult.failedReason);
if (dppResult != null) {
status.setDppResult(dppResult);
if (status.getState() == TEtlState.CANCELLED && !Strings.isNullOrEmpty(dppResult.failedReason)) {
status.setFailMsg(dppResult.failedReason);
}
}
} catch (UserException | JsonSyntaxException | UnsupportedEncodingException e) {
LOG.warn("read broker file failed. path: {}", dppResultFilePath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,14 @@ private Set<Long> submitPushTasks() throws UserException {
try {
writeLock();
try {
// check state is still loading. If state is cancelled or finished, return.
// if state is cancelled or finished and not return, this would throw all partitions have no load data exception,
// because tableToLoadPartitions was already cleaned up,
if (state != JobState.LOADING) {
LOG.warn("job state is not loading. job id: {}, state: {}", id, state);
return totalTablets;
}

for (Map.Entry<Long, Set<Long>> entry : tableToLoadPartitions.entrySet()) {
long tableId = entry.getKey();
OlapTable table = (OlapTable) db.getTable(tableId);
Expand Down Expand Up @@ -567,6 +575,10 @@ public void updateLoadingStatus() throws UserException {

// submit push tasks
Set<Long> totalTablets = submitPushTasks();
if (totalTablets.isEmpty()) {
LOG.warn("total tablets set is empty. job id: {}, state: {}", id, state);
return;
}

// update status
boolean canCommitJob = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,22 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction(
Assert.assertTrue(fullTablets.contains(tabletId));
}

@Test
public void testSubmitTasksWhenStateFinished(@Mocked Catalog catalog, @Injectable String originStmt,
@Injectable Database db) throws Exception {
new Expectations() {
{
catalog.getDb(dbId);
result = db;
}
};

SparkLoadJob job = getEtlStateJob(originStmt);
job.state = JobState.FINISHED;
Set<Long> totalTablets = Deencapsulation.invoke(job, "submitPushTasks");
Assert.assertTrue(totalTablets.isEmpty());
}

@Test
public void testStateUpdateInfoPersist() throws IOException {
String fileName = "./testStateUpdateInfoPersistFile";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class BooleanParser extends ColumnParser {
@Override
public boolean parse(String value) {
if (value.equalsIgnoreCase("true")
|| value.equalsIgnoreCase("false")) {
|| value.equalsIgnoreCase("false")
|| value.equals("0") || value.equals("1")) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public final class SparkDpp implements java.io.Serializable {
// because hadoop configuration is not serializable,
// we need to wrap it so that we can use it in executor.
private SerializableConfiguration serializableHadoopConf;
private DppResult dppResult = new DppResult();


public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
Expand Down Expand Up @@ -485,7 +486,8 @@ private Dataset<Row> convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex bas
dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date"));
} else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
dataframe = dataframe.withColumn(dstField.name(),
functions.when(dataframe.col(dstField.name()).equalTo("true"), "1")
functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1")
.when(dataframe.col(dstField.name()).equalTo("1"), "1")
.otherwise("0"));
} else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) && !dstField.dataType().equals(DataTypes.StringType)) {
dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(dstField.dataType()));
Expand Down Expand Up @@ -535,9 +537,10 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
dataSrcColumns.add(column.columnName);
}
}
List<String> dstTableNames = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
dstTableNames.add(column.columnName);
// for getting schema to check source data
Map<String, Integer> dstColumnNameToIndex = new HashMap<String, Integer>();
for (int i = 0; i < baseIndex.columns.size(); i++) {
dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
}
List<String> srcColumnsWithColumnsFromPath = new ArrayList<>();
srcColumnsWithColumnsFromPath.addAll(dataSrcColumns);
Expand Down Expand Up @@ -566,25 +569,28 @@ record -> {
validRow = false;
} else {
for (int i = 0; i < attributes.length; ++i) {
if (attributes[i].equals(NULL_FLAG)) {
if (baseIndex.columns.get(i).isAllowNull) {
StructField field = srcSchema.apply(i);
String srcColumnName = field.name();
if (attributes[i].equals(NULL_FLAG) && dstColumnNameToIndex.containsKey(srcColumnName)) {
if (baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) {
attributes[i] = null;
} else {
LOG.warn("colunm:" + i + " can not be null. row:" + record);
LOG.warn("column name:" + srcColumnName + ", attribute: " + i
+ " can not be null. row:" + record);
validRow = false;
break;
}
}
boolean isStrictMode = (boolean) etlJobConfig.properties.strictMode;
boolean isStrictMode = etlJobConfig.properties.strictMode;
if (isStrictMode) {
StructField field = srcSchema.apply(i);
if (dstTableNames.contains(field.name())) {
String type = columns.get(i).columnType;
if (dstColumnNameToIndex.containsKey(srcColumnName)) {
int index = dstColumnNameToIndex.get(srcColumnName);
String type = columns.get(index).columnType;
if (type.equalsIgnoreCase("CHAR")
|| type.equalsIgnoreCase("VARCHAR")) {
continue;
}
ColumnParser parser = parsers.get(i);
ColumnParser parser = parsers.get(index);
boolean valid = parser.parse(attributes[i]);
if (!valid) {
validRow = false;
Expand Down Expand Up @@ -726,12 +732,20 @@ private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
throws SparkDppException, IOException, URISyntaxException {
Dataset<Row> fileGroupDataframe = null;
for (String filePath : filePaths) {
fileNumberAcc.add(1);
try {
URI uri = new URI(filePath);
FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value());
FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
fileSizeAcc.add(fileStatus.getLen());
FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
if (fileStatuses == null) {
throw new SparkDppException("fs list status failed: " + filePath);
}
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
continue;
}
fileNumberAcc.add(1);
fileSizeAcc.add(fileStatus.getLen());
}
} catch (Exception e) {
LOG.warn("parse path failed:" + filePath);
throw e;
Expand Down Expand Up @@ -770,8 +784,7 @@ private Dataset<Row> loadDataFromHiveTable(SparkSession spark,
return dataframe;
}

private DppResult process() throws Exception {
DppResult dppResult = new DppResult();
private void process() throws Exception {
try {
for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
Long tableId = entry.getKey();
Expand Down Expand Up @@ -852,34 +865,26 @@ private DppResult process() throws Exception {
}
processRollupTree(rootNode, tablePairRDD, tableId, baseIndex);
}
spark.stop();
LOG.info("invalid rows contents:" + invalidRows.value());
dppResult.isSuccess = true;
dppResult.failedReason = "";
} catch (Exception exception) {
LOG.warn("spark dpp failed for exception:" + exception);
dppResult.isSuccess = false;
dppResult.failedReason = exception.getMessage();
throw exception;
} finally {
spark.stop();
dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
dppResult.scannedRows = scannedRowsAcc.value();
dppResult.fileNumber = fileNumberAcc.value();
dppResult.fileSize = fileSizeAcc.value();
dppResult.abnormalRows = abnormalRowAcc.value();
dppResult.partialAbnormalRows = invalidRows.value();
throw exception;
}
LOG.info("invalid rows contents:" + invalidRows.value());
dppResult.isSuccess = true;
dppResult.failedReason = "";
dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
dppResult.scannedRows = scannedRowsAcc.value();
dppResult.fileNumber = fileNumberAcc.value();
dppResult.fileSize = fileSizeAcc.value();
dppResult.abnormalRows = abnormalRowAcc.value();
dppResult.partialAbnormalRows = invalidRows.value();
return dppResult;
}

public void doDpp() throws Exception {
// write dpp result to output
DppResult dppResult = process();
private void writeDppResult(DppResult dppResult) throws Exception {
String outputPath = etlJobConfig.getOutputPath();
String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
URI uri = new URI(outputPath);
Expand All @@ -891,5 +896,16 @@ public void doDpp() throws Exception {
outputStream.write('\n');
outputStream.close();
}

public void doDpp() throws Exception {
try {
process();
} catch (Exception e) {
throw e;
} finally {
// write dpp result to file in outputPath
writeDppResult(dppResult);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@
},
"where": "k2 > 10",
"isNegative": false,
"hiveTableName": "hive_db.table"
"hiveDbTableName": "hive_db.table",
"hiveTableProperties": {
"hive.metastore.uris": "thrift://host:port"
}
}]
}
},
Expand Down