From e9799e3619fe9e484f20bb685ebe0b8936682ace Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 31 Aug 2020 15:14:54 +0800 Subject: [PATCH 1/4] save --- .../apache/doris/analysis/ColumnSeparatorTest.java | 14 ++++++++++++-- .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 10 +++++++++- .../apache/doris/load/loadv2/etl/EtlJobConfig.java | 4 ++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java index 76e5feddf52957..8426992319617e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnSeparatorTest.java @@ -17,11 +17,11 @@ package org.apache.doris.analysis; +import org.apache.doris.common.AnalysisException; + import org.junit.Assert; import org.junit.Test; -import org.apache.doris.common.AnalysisException; - public class ColumnSeparatorTest { @Test public void testNormal() throws AnalysisException { @@ -42,6 +42,16 @@ public void testNormal() throws AnalysisException { separator.analyze(); Assert.assertEquals("'\\x0001'", separator.toSql()); Assert.assertEquals("\0\1", separator.getColumnSeparator()); + + separator = new ColumnSeparator("|"); + separator.analyze(); + Assert.assertEquals("'|'", separator.toSql()); + Assert.assertEquals("|", separator.getColumnSeparator()); + + separator = new ColumnSeparator("\\|"); + separator.analyze(); + Assert.assertEquals("'\\|'", separator.toSql()); + Assert.assertEquals("\\|", separator.getColumnSeparator()); } @Test(expected = AnalysisException.class) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index bc0a6cc0eda0a1..65e557718e0323 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -559,7 +559,15 @@ private Dataset loadDataFromPath(SparkSession spark, JavaRDD rowRDD = sourceDataRdd.flatMap( record -> { scannedRowsAcc.add(1); - String[] attributes = record.split(fileGroup.columnSeparator); + String[] attributes = new String[record.]; + int off = 0; + int next = 0; + char sep = fileGroup.columnSeparator; + ArrayList list = new ArrayList<>(); + while ((next = record.indexOf(sep, off)) != -1) { + list.add(record.substring(off, next)); + off = next + 1; + } List result = new ArrayList<>(); boolean validRow = true; if (attributes.length != columnSize) { diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index fe81aeb1d26bd4..7d595af664d479 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -471,7 +471,7 @@ public static class EtlFileGroup implements Serializable { @SerializedName(value = "columnsFromPath") public List columnsFromPath; @SerializedName(value = "columnSeparator") - public String columnSeparator; + public char columnSeparator; @SerializedName(value = "lineDelimiter") public String lineDelimiter; @SerializedName(value = "isNegative") @@ -502,7 +502,7 @@ public EtlFileGroup(SourceType sourceType, List filePaths, List this.filePaths = filePaths; this.fileFieldNames = fileFieldNames; this.columnsFromPath = columnsFromPath; - this.columnSeparator = columnSeparator; + this.columnSeparator = (char)columnSeparator.getBytes()[0]; this.lineDelimiter = lineDelimiter; this.isNegative = isNegative; this.fileFormat = fileFormat; From bb769d02dc8bd944e1a5490a6cf55a2e6cb0db66 Mon Sep 17 00:00:00 2001 From: xy720 Date: Mon, 31 Aug 2020 23:09:32 +0800 Subject: [PATCH 2/4] save code --- .../doris/load/loadv2/dpp/SparkDpp.java | 26 +++++++------------ .../doris/load/loadv2/etl/EtlJobConfig.java | 4 ++- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 65e557718e0323..1cc25f455b0a9e 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -17,14 +17,15 @@ package org.apache.doris.load.loadv2.dpp; -import org.apache.commons.lang3.tuple.Pair; +import scala.Tuple2; + import org.apache.doris.common.SparkDppException; import org.apache.doris.load.loadv2.etl.EtlJobConfig; - import com.google.common.base.Strings; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -55,6 +56,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; +import org.apache.spark.util.SerializableConfiguration; import java.io.IOException; import java.math.BigInteger; @@ -71,9 +73,6 @@ import java.util.Queue; import java.util.Set; -import org.apache.spark.util.SerializableConfiguration; -import scala.Tuple2; - // This class is a Spark-based data preprocessing program, // which will make use of the distributed compute framework of spark to // do ETL job/sort/preaggregate jobs in spark job @@ -559,15 +558,14 @@ private Dataset loadDataFromPath(SparkSession spark, JavaRDD rowRDD = sourceDataRdd.flatMap( record -> { scannedRowsAcc.add(1); - String[] attributes = new String[record.]; - int off = 0; - int next = 0; char sep = fileGroup.columnSeparator; - ArrayList list = new ArrayList<>(); - while ((next = record.indexOf(sep, off)) != -1) { - list.add(record.substring(off, next)); - off = next + 1; + String regex = null; + if (".$|()[]{}^?*+\\".indexOf(sep) != -1) { + regex = new String(new char[]{'\\', sep}); + } else { + regex = Character.toString(sep); } + String[] attributes = record.split(regex); List result = new ArrayList<>(); boolean validRow = true; if (attributes.length != columnSize) { @@ -758,10 +756,6 @@ private Dataset loadDataFromFilePaths(SparkSession spark, LOG.warn("parse path failed:" + filePath); throw e; } - if (fileGroup.columnSeparator == null) { - LOG.warn("invalid null column separator!"); - throw new SparkDppException("Reason: invalid null column separator!"); - } Dataset dataframe = null; dataframe = loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns); diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 7d595af664d479..27f0a1e51d8853 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -25,6 +25,8 @@ import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; +import org.apache.parquet.Strings; + import java.io.Serializable; import java.util.Comparator; import java.util.List; @@ -502,7 +504,7 @@ public EtlFileGroup(SourceType sourceType, List filePaths, List this.filePaths = filePaths; this.fileFieldNames = fileFieldNames; this.columnsFromPath = columnsFromPath; - this.columnSeparator = (char)columnSeparator.getBytes()[0]; + this.columnSeparator = Strings.isNullOrEmpty(columnSeparator)?'\t': columnSeparator.charAt(0); this.lineDelimiter = lineDelimiter; this.isNegative = isNegative; this.fileFormat = fileFormat; From 706364c95919a2850d5d04bccf03159fb09b10f1 Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 1 Sep 2020 14:27:38 +0800 Subject: [PATCH 3/4] execute once --- .../apache/doris/load/loadv2/dpp/SparkDpp.java | 9 +-------- .../doris/load/loadv2/etl/EtlJobConfig.java | 16 +++++++++++----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 1cc25f455b0a9e..46378b8fd51a6f 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -558,14 +558,7 @@ private Dataset loadDataFromPath(SparkSession spark, JavaRDD rowRDD = sourceDataRdd.flatMap( record -> { scannedRowsAcc.add(1); - char sep = fileGroup.columnSeparator; - String regex = null; - if (".$|()[]{}^?*+\\".indexOf(sep) != -1) { - regex = new String(new char[]{'\\', sep}); - } else { - regex = Character.toString(sep); - } - String[] attributes = record.split(regex); + String[] attributes = record.split(fileGroup.columnSeparator); List result = new ArrayList<>(); boolean validRow = true; if (attributes.length != columnSize) { diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 27f0a1e51d8853..713592ca9c95a1 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -17,16 +17,15 @@ package org.apache.doris.load.loadv2.etl; -import com.google.common.collect.Lists; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.gson.ExclusionStrategy; import com.google.gson.FieldAttributes; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; -import org.apache.parquet.Strings; - import java.io.Serializable; import java.util.Comparator; import java.util.List; @@ -473,7 +472,7 @@ public static class EtlFileGroup implements Serializable { @SerializedName(value = "columnsFromPath") public List columnsFromPath; @SerializedName(value = "columnSeparator") - public char columnSeparator; + public String columnSeparator; @SerializedName(value = "lineDelimiter") public String lineDelimiter; @SerializedName(value = "isNegative") @@ -504,13 +503,20 @@ public EtlFileGroup(SourceType sourceType, List filePaths, List this.filePaths = filePaths; this.fileFieldNames = fileFieldNames; this.columnsFromPath = columnsFromPath; - this.columnSeparator = Strings.isNullOrEmpty(columnSeparator)?'\t': columnSeparator.charAt(0); this.lineDelimiter = lineDelimiter; this.isNegative = isNegative; this.fileFormat = fileFormat; this.columnMappings = columnMappings; this.where = where; this.partitions = partitions; + + // Handle some special characters + char sep = Strings.isNullOrEmpty(columnSeparator) ? '\t' : columnSeparator.charAt(0); + if (".$|()[]{}^?*+\\".indexOf(sep) != -1) { + this.columnSeparator = new String(new char[]{'\\', sep}); + } else { + this.columnSeparator = Character.toString(sep); + } } // for data from table From 027fe9af21406dd3d2029ffb0b1552933efcf87a Mon Sep 17 00:00:00 2001 From: xy720 Date: Tue, 1 Sep 2020 14:42:52 +0800 Subject: [PATCH 4/4] save code --- .../main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java | 4 ++++ .../java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 46378b8fd51a6f..6e5a714f1b5ce6 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -749,6 +749,10 @@ private Dataset loadDataFromFilePaths(SparkSession spark, LOG.warn("parse path failed:" + filePath); throw e; } + if (fileGroup.columnSeparator == null) { + LOG.warn("invalid null column separator!"); + throw new SparkDppException("Reason: invalid null column separator!"); + } Dataset dataframe = null; dataframe = loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns); diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index 713592ca9c95a1..9ee4d83688b70b 100644 --- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -510,7 +510,7 @@ public EtlFileGroup(SourceType sourceType, List filePaths, List this.where = where; this.partitions = partitions; - // Handle some special characters + // Convert some special characters in column separator char sep = Strings.isNullOrEmpty(columnSeparator) ? '\t' : columnSeparator.charAt(0); if (".$|()[]{}^?*+\\".indexOf(sep) != -1) { this.columnSeparator = new String(new char[]{'\\', sep});