diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 428e88c3cae693..739258a224ccf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -250,9 +250,11 @@ public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJ byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc); String dppResultStr = new String(data, "UTF-8"); DppResult dppResult = new Gson().fromJson(dppResultStr, DppResult.class); - status.setDppResult(dppResult); - if (status.getState() == TEtlState.CANCELLED && !Strings.isNullOrEmpty(dppResult.failedReason)) { - status.setFailMsg(dppResult.failedReason); + if (dppResult != null) { + status.setDppResult(dppResult); + if (status.getState() == TEtlState.CANCELLED && !Strings.isNullOrEmpty(dppResult.failedReason)) { + status.setFailMsg(dppResult.failedReason); + } } } catch (UserException | JsonSyntaxException | UnsupportedEncodingException e) { LOG.warn("read broker file failed. path: {}", dppResultFilePath, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index f1dff69c78399a..3d8977c4a5fcd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -414,6 +414,14 @@ private Set submitPushTasks() throws UserException { try { writeLock(); try { + // check state is still loading. If state is cancelled or finished, return. + // if state is cancelled or finished and not return, this would throw all partitions have no load data exception, + // because tableToLoadPartitions was already cleaned up, + if (state != JobState.LOADING) { + LOG.warn("job state is not loading. job id: {}, state: {}", id, state); + return totalTablets; + } + for (Map.Entry> entry : tableToLoadPartitions.entrySet()) { long tableId = entry.getKey(); OlapTable table = (OlapTable) db.getTable(tableId); @@ -567,6 +575,10 @@ public void updateLoadingStatus() throws UserException { // submit push tasks Set totalTablets = submitPushTasks(); + if (totalTablets.isEmpty()) { + LOG.warn("total tablets set is empty. job id: {}, state: {}", id, state); + return; + } // update status boolean canCommitJob = false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 671de5a73516d8..70786a55ab0284 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -414,6 +414,22 @@ public void testUpdateEtlStatusFinishedAndCommitTransaction( Assert.assertTrue(fullTablets.contains(tabletId)); } + @Test + public void testSubmitTasksWhenStateFinished(@Mocked Catalog catalog, @Injectable String originStmt, + @Injectable Database db) throws Exception { + new Expectations() { + { + catalog.getDb(dbId); + result = db; + } + }; + + SparkLoadJob job = getEtlStateJob(originStmt); + job.state = JobState.FINISHED; + Set totalTablets = Deencapsulation.invoke(job, "submitPushTasks"); + Assert.assertTrue(totalTablets.isEmpty()); + } + @Test public void testStateUpdateInfoPersist() throws IOException { String fileName = "./testStateUpdateInfoPersistFile"; diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java index 0b91cb003555f3..d5e0ceef827027 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java @@ -138,7 +138,8 @@ class BooleanParser extends ColumnParser { @Override public boolean parse(String value) { if (value.equalsIgnoreCase("true") - || value.equalsIgnoreCase("false")) { + || value.equalsIgnoreCase("false") + || value.equals("0") || value.equals("1")) { return true; } return false; diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 53c911141d8782..bc0a6cc0eda0a1 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -105,6 +105,7 @@ public final class SparkDpp implements java.io.Serializable { // because hadoop configuration is not serializable, // we need to wrap it so that we can use it in executor. private SerializableConfiguration serializableHadoopConf; + private DppResult dppResult = new DppResult(); public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) { @@ -485,7 +486,8 @@ private Dataset convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex bas dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date")); } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) { dataframe = dataframe.withColumn(dstField.name(), - functions.when(dataframe.col(dstField.name()).equalTo("true"), "1") + functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1") + .when(dataframe.col(dstField.name()).equalTo("1"), "1") .otherwise("0")); } else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) && !dstField.dataType().equals(DataTypes.StringType)) { dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(dstField.dataType())); @@ -535,9 +537,10 @@ private Dataset loadDataFromPath(SparkSession spark, dataSrcColumns.add(column.columnName); } } - List dstTableNames = new ArrayList<>(); - for (EtlJobConfig.EtlColumn column : baseIndex.columns) { - dstTableNames.add(column.columnName); + // for getting schema to check source data + Map dstColumnNameToIndex = new HashMap(); + for (int i = 0; i < baseIndex.columns.size(); i++) { + dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i); } List srcColumnsWithColumnsFromPath = new ArrayList<>(); srcColumnsWithColumnsFromPath.addAll(dataSrcColumns); @@ -566,25 +569,28 @@ record -> { validRow = false; } else { for (int i = 0; i < attributes.length; ++i) { - if (attributes[i].equals(NULL_FLAG)) { - if (baseIndex.columns.get(i).isAllowNull) { + StructField field = srcSchema.apply(i); + String srcColumnName = field.name(); + if (attributes[i].equals(NULL_FLAG) && dstColumnNameToIndex.containsKey(srcColumnName)) { + if (baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) { attributes[i] = null; } else { - LOG.warn("colunm:" + i + " can not be null. row:" + record); + LOG.warn("column name:" + srcColumnName + ", attribute: " + i + + " can not be null. row:" + record); validRow = false; break; } } - boolean isStrictMode = (boolean) etlJobConfig.properties.strictMode; + boolean isStrictMode = etlJobConfig.properties.strictMode; if (isStrictMode) { - StructField field = srcSchema.apply(i); - if (dstTableNames.contains(field.name())) { - String type = columns.get(i).columnType; + if (dstColumnNameToIndex.containsKey(srcColumnName)) { + int index = dstColumnNameToIndex.get(srcColumnName); + String type = columns.get(index).columnType; if (type.equalsIgnoreCase("CHAR") || type.equalsIgnoreCase("VARCHAR")) { continue; } - ColumnParser parser = parsers.get(i); + ColumnParser parser = parsers.get(index); boolean valid = parser.parse(attributes[i]); if (!valid) { validRow = false; @@ -726,12 +732,20 @@ private Dataset loadDataFromFilePaths(SparkSession spark, throws SparkDppException, IOException, URISyntaxException { Dataset fileGroupDataframe = null; for (String filePath : filePaths) { - fileNumberAcc.add(1); try { URI uri = new URI(filePath); FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value()); - FileStatus fileStatus = fs.getFileStatus(new Path(filePath)); - fileSizeAcc.add(fileStatus.getLen()); + FileStatus[] fileStatuses = fs.globStatus(new Path(filePath)); + if (fileStatuses == null) { + throw new SparkDppException("fs list status failed: " + filePath); + } + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + continue; + } + fileNumberAcc.add(1); + fileSizeAcc.add(fileStatus.getLen()); + } } catch (Exception e) { LOG.warn("parse path failed:" + filePath); throw e; @@ -770,8 +784,7 @@ private Dataset loadDataFromHiveTable(SparkSession spark, return dataframe; } - private DppResult process() throws Exception { - DppResult dppResult = new DppResult(); + private void process() throws Exception { try { for (Map.Entry entry : etlJobConfig.tables.entrySet()) { Long tableId = entry.getKey(); @@ -852,34 +865,26 @@ private DppResult process() throws Exception { } processRollupTree(rootNode, tablePairRDD, tableId, baseIndex); } - spark.stop(); + LOG.info("invalid rows contents:" + invalidRows.value()); + dppResult.isSuccess = true; + dppResult.failedReason = ""; } catch (Exception exception) { LOG.warn("spark dpp failed for exception:" + exception); dppResult.isSuccess = false; dppResult.failedReason = exception.getMessage(); + throw exception; + } finally { + spark.stop(); dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value(); dppResult.scannedRows = scannedRowsAcc.value(); dppResult.fileNumber = fileNumberAcc.value(); dppResult.fileSize = fileSizeAcc.value(); dppResult.abnormalRows = abnormalRowAcc.value(); dppResult.partialAbnormalRows = invalidRows.value(); - throw exception; } - LOG.info("invalid rows contents:" + invalidRows.value()); - dppResult.isSuccess = true; - dppResult.failedReason = ""; - dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value(); - dppResult.scannedRows = scannedRowsAcc.value(); - dppResult.fileNumber = fileNumberAcc.value(); - dppResult.fileSize = fileSizeAcc.value(); - dppResult.abnormalRows = abnormalRowAcc.value(); - dppResult.partialAbnormalRows = invalidRows.value(); - return dppResult; } - public void doDpp() throws Exception { - // write dpp result to output - DppResult dppResult = process(); + private void writeDppResult(DppResult dppResult) throws Exception { String outputPath = etlJobConfig.getOutputPath(); String resultFilePath = outputPath + "/" + DPP_RESULT_FILE; URI uri = new URI(outputPath); @@ -891,5 +896,16 @@ public void doDpp() throws Exception { outputStream.write('\n'); outputStream.close(); } + + public void doDpp() throws Exception { + try { + process(); + } catch (Exception e) { + throw e; + } finally { + // write dpp result to file in outputPath + writeDppResult(dppResult); + } + } } diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 9dec697939dba5..fe81aeb1d26bd4 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -111,7 +111,10 @@ }, "where": "k2 > 10", "isNegative": false, - "hiveTableName": "hive_db.table" + "hiveDbTableName": "hive_db.table", + "hiveTableProperties": { + "hive.metastore.uris": "thrift://host:port" + } }] } },