Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 131 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TResultFileSinkOptions;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -139,7 +143,11 @@ public BrokerDesc getBrokerDesc() {
return brokerDesc;
}

public void analyze(Analyzer analyzer) throws AnalysisException {
public List<List<String>> getSchema() {
return schema;
}

private void analyze(Analyzer analyzer) throws AnalysisException {
analyzeFilePath();

if (Strings.isNullOrEmpty(filePath)) {
Expand Down Expand Up @@ -167,11 +175,111 @@ public void analyze(Analyzer analyzer) throws AnalysisException {

public void analyze(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
analyze(analyzer);
List<SelectListItem> items = stmt.getSelectList().getItems();
for (SelectListItem item:items) {
if (item.getExpr().getType() == Type.LARGEINT && isParquetFormat()) {
throw new AnalysisException("currently parquet do not support largeint type");

if (isParquetFormat()) {
analyzeForParquetFormat(stmt.getResultExprs());
}
}

private void analyzeForParquetFormat(List<Expr> resultExprs) throws AnalysisException {
if (this.schema.isEmpty()) {
genParquetSchema(resultExprs);
}

// check schema number
if (resultExprs.size() != this.schema.size()) {
throw new AnalysisException("Parquet schema number does not equal to select item number");
}

// check type
for (int i = 0; i < this.schema.size(); ++i) {
String type = this.schema.get(i).get(1);
Type resultType = resultExprs.get(i).getType();
switch (resultType.getPrimitiveType()) {
case BOOLEAN:
if (!type.equals("boolean")) {
throw new AnalysisException("project field type is BOOLEAN, should use boolean, but the type of column "
+ i + " is " + type);
}
break;
case TINYINT:
case SMALLINT:
case INT:
if (!type.equals("int32")) {
throw new AnalysisException("project field type is TINYINT/SMALLINT/INT, should use int32, "
+ "but the definition type of column " + i + " is " + type);
}
break;
case BIGINT:
case DATE:
case DATETIME:
if (!type.equals("int64")) {
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME, should use int64, " +
"but the definition type of column " + i + " is " + type);
}
break;
case FLOAT:
if (!type.equals("float")) {
throw new AnalysisException("project field type is FLOAT, should use float, but the definition type of column "
+ i + " is " + type);
}
break;
case DOUBLE:
if (!type.equals("double")) {
throw new AnalysisException("project field type is DOUBLE, should use double, but the definition type of column "
+ i + " is " + type);
}
break;
case CHAR:
case VARCHAR:
case DECIMALV2:
if (!type.equals("byte_array")) {
throw new AnalysisException("project field type is CHAR/VARCHAR/DECIMAL, should use byte_array, " +
"but the definition type of column " + i + " is " + type);
}
break;
default:
throw new AnalysisException("Parquet format does not support column type: " + resultType.getPrimitiveType());
}
}
}

private void genParquetSchema(List<Expr> resultExprs) throws AnalysisException {
Preconditions.checkState(this.schema.isEmpty());
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
List<String> column = new ArrayList<>();
column.add("required");
switch (expr.getType().getPrimitiveType()) {
case BOOLEAN:
column.add("boolean");
break;
case TINYINT:
case SMALLINT:
case INT:
column.add("int32");
break;
case BIGINT:
case DATE:
case DATETIME:
column.add("int64");
break;
case FLOAT:
column.add("float");
break;
case DOUBLE:
column.add("double");
break;
case CHAR:
case VARCHAR:
case DECIMALV2:
column.add("byte_array");
break;
default:
throw new AnalysisException("currently parquet do not support column type: " + expr.getType().getPrimitiveType());
}
column.add("col" + i);
this.schema.add(column);
}
}

Expand Down Expand Up @@ -238,7 +346,6 @@ private void analyzeProperties() throws AnalysisException {
throw new AnalysisException("Unknown properties: " + properties.keySet().stream()
.filter(k -> !processedPropKeys.contains(k)).collect(Collectors.toList()));
}

}

private void getBrokerProperties(Set<String> processedPropKeys) {
Expand Down Expand Up @@ -273,14 +380,28 @@ private void getBrokerProperties(Set<String> processedPropKeys) {
* currently only supports: compression, disable_dictionary, version
*/
private void getParquetProperties(Set<String> processedPropKeys) throws AnalysisException {
// save all parquet prefix property
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
processedPropKeys.add(entry.getKey());
fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue());
}
}

// check schema. if schema is not set, Doris will gen schema by select items
String schema = properties.get(SCHEMA);
if (schema == null || schema.length() <= 0) {
throw new AnalysisException("schema is required for parquet file");
if (schema == null) {
return;
}
if (schema.isEmpty()) {
throw new AnalysisException("Parquet schema property should not be empty");
}
schema = schema.replace(" ","");
schema = schema.replace(" ", "");
schema = schema.toLowerCase();
String[] schemas = schema.split(";");
for (String item:schemas) {
for (String item : schemas) {
String[] properties = item.split(",");
if (properties.length != 3) {
throw new AnalysisException("must only contains repetition type/column type/column name");
Expand All @@ -299,14 +420,6 @@ private void getParquetProperties(Set<String> processedPropKeys) throws Analysis
this.schema.add(column);
}
processedPropKeys.add(SCHEMA);
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
processedPropKeys.add(entry.getKey());
fileProperties.put(entry.getKey().substring(PARQUET_PROP_PREFIX.length()), entry.getValue());
}
}
}

private boolean isCsvFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.doris.analysis;

import mockit.Mock;
import mockit.MockUp;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.Util;
Expand All @@ -27,8 +25,10 @@
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand All @@ -40,6 +40,9 @@
import java.util.Set;
import java.util.UUID;

import mockit.Mock;
import mockit.MockUp;

public class SelectStmtTest {
private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
private static DorisAssert dorisAssert;
Expand Down Expand Up @@ -584,63 +587,109 @@ public void testGetTableRefs() throws Exception {
public void testOutfile() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
Config.enable_outfile_to_local = true;
String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;required,byte_array,username;\");";
String sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,col0\");";
dorisAssert.query(sql).explainQuery();
// must contains schema
// if shema not set, gen schema
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET;";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
Assert.assertEquals(1, stmt.getOutFileClause().getSchema().size());
Assert.assertEquals(Lists.newArrayList("required", "byte_array", "col0"),
stmt.getOutFileClause().getSchema().get(0));
} catch (Exception e) {
Assert.fail(e.getMessage());
}

// schema can not be empty
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"\");";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("schema is required for parquet file"));
Assert.assertTrue(e.getMessage().contains("Parquet schema property should not be empty"));
}

// schema must contains 3 fields
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"int32,siteid;\");";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("must only contains repetition type/column type/column name"));
}

// unknown repetition type
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeat, int32,siteid;\");";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("unknown repetition type"));
}

// only support required type
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"repeated,int32,siteid;\");";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("currently only support required type"));
}

// unknown data type
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int128,siteid;\");";
try {
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("data type is not supported"));
}

// contains parquet properties
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,int32,siteid;\", 'parquet.compression'='snappy');";
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"file:///root/doris/\" FORMAT AS PARQUET PROPERTIES (\"schema\"=\"required,byte_array,siteid;\", 'parquet.compression'='snappy');";
dorisAssert.query(sql).explainQuery();
// support parquet for broker
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");";
sql = "SELECT k1 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " +
"PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" , " +
"\"schema\"=\"required,byte_array,siteid;\");";
dorisAssert.query(sql).explainQuery();

// do not support large int type
try {
sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");";
sql = "SELECT k5 FROM db1.tbl1 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET " +
"PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" ," +
" \"schema\"=\"required,int32,siteid;\");";
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type"));
e.printStackTrace();
Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT"));
}

// do not support large int type, contains function
try {
sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", \"broker.hadoop.security.authentication\" = \"kerberos\", \"broker.kerberos_principal\" = \"test\", \"broker.kerberos_keytab_content\" = \"test\" , \"schema\"=\"required,int32,siteid;\");";
sql = "SELECT sum(k5) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " +
"FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" , " +
"\"schema\"=\"required,int32,siteid;\");";
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Parquet format does not support column type: LARGEINT"));
}

// support cast
try {
sql = "SELECT cast(sum(k5) as bigint) FROM db1.tbl1 group by k5 INTO OUTFILE \"hdfs://test/test_sql_prc_2019_02_19/\" " +
"FORMAT AS PARQUET PROPERTIES ( \"broker.name\" = \"hdfs_broker\", " +
"\"broker.hadoop.security.authentication\" = \"kerberos\", " +
"\"broker.kerberos_principal\" = \"test\", " +
"\"broker.kerberos_keytab_content\" = \"test\" , " +
"\"schema\"=\"required,int64,siteid;\");";
SelectStmt stmt = (SelectStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("currently parquet do not support largeint type"));
Assert.fail(e.getMessage());
}
}
}