From fff2a33f9c41a12fe83ba5bd877a9a33f8f0c788 Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Mon, 6 Nov 2023 19:07:32 +0800 Subject: [PATCH 1/4] [feature](tvf)(jni-avro)jni-avro scanner add complex data types --- .../vec/exec/format/avro/avro_jni_reader.cpp | 32 ++-- .../avro/avro_all_types/all_type.avro | Bin 0 -> 1155 bytes .../apache/doris/avro/AvroColumnValue.java | 34 +++- .../org/apache/doris/avro/AvroJNIScanner.java | 56 +------ .../org/apache/doris/avro/AvroTypeUtils.java | 122 ++++++++++++++ .../apache/doris/avro/AvroTypeUtilsTest.java | 105 ++++++++++++ .../doris/common/jni/vec/TableSchema.java | 10 +- .../external_table_p0/tvf/test_tvf_avro.out | 73 +++++++++ .../tvf/test_tvf_avro.groovy | 154 ++++++++++++++++++ 9 files changed, 511 insertions(+), 75 deletions(-) create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro create mode 100644 fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java create mode 100644 fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java create mode 100644 regression-test/data/external_table_p0/tvf/test_tvf_avro.out create mode 100644 regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index e682ff9886d644..a49cd12061186f 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -144,8 +144,7 @@ Status AvroJNIReader::get_parsed_schema(std::vector* col_names, } TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) { - ::doris::TPrimitiveType::type schema_type = - static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt()); + auto schema_type = static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt()); switch (schema_type) { case TPrimitiveType::INT: case TPrimitiveType::STRING: @@ -153,30 +152,35 @@ TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& colu case TPrimitiveType::BOOLEAN: case TPrimitiveType::DOUBLE: case TPrimitiveType::FLOAT: - return TypeDescriptor(thrift_to_type(schema_type)); + case TPrimitiveType::BINARY: + return {thrift_to_type(schema_type)}; case TPrimitiveType::ARRAY: { TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY); - list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); + const rapidjson::Value& childColumns = column_schema["childColumns"]; + list_type.add_sub_type(convert_to_doris_type(childColumns[0])); return list_type; } case TPrimitiveType::MAP: { TypeDescriptor map_type(PrimitiveType::TYPE_MAP); - + const rapidjson::Value& childColumns = column_schema["childColumns"]; // The default type of AVRO MAP structure key is STRING map_type.add_sub_type(PrimitiveType::TYPE_STRING); - map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject())); + map_type.add_sub_type(convert_to_doris_type(childColumns[1])); return map_type; } + case TPrimitiveType::STRUCT: { + TypeDescriptor struct_type(PrimitiveType::TYPE_STRUCT); + const rapidjson::Value& childColumns = column_schema["childColumns"]; + for (auto i = 0; i < childColumns.Size(); i++) { + const rapidjson::Value& child = childColumns[i]; + struct_type.add_sub_type(convert_to_doris_type(childColumns[i]), + std::string(child["name"].GetString())); + } + return struct_type; + } default: - return TypeDescriptor(PrimitiveType::INVALID_TYPE); + return {PrimitiveType::INVALID_TYPE}; } } -TypeDescriptor AvroJNIReader::convert_complex_type( - const rapidjson::Document::ConstObject child_schema) { - ::doris::TPrimitiveType::type child_schema_type = - static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt()); - return TypeDescriptor(thrift_to_type(child_schema_type)); -} - } // namespace doris::vectorized diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro new file mode 100644 index 0000000000000000000000000000000000000000..c66017bb6243ee860863a938a7c08965f121c205 GIT binary patch literal 1155 zcma)5O^ee&7@i43?3P_YkRp4~I3jq6LV|)6glt`x6%|=qPfHJzcIsv`NlHEz!jhwU z6;HCr{sxbNM`7^~c=Q)|^5DsfGm|u#v@3XM>GOWg^FGhp=k6iYdgLgM?j(}~8jv>| z$2iUB6!&pVCs91bCJsp;Fky^?nMc_)!TtT?IzEUZMoGw%Ri|483bQGC9Hvcy$AOxB z8ilW#EE7mg@34p{-7^W*g6$}C8Eqy^ORSa*)7VoVNlL7igj;b;@_I@w1gXMaN&~(* zmh9EWXJo$G70BQbVHrKQDSQ-W!D<^Vqm|}CE~-Yt5ECbP;6_YnkGD7+@Xtd$E>tEj z7M_(()GI7|nRvmR(YpE^Q)_1Bx{Z@=lUbE{(j*rGw+X%sy{NgAeH>=;s;c6#iKlc% zGM3`L$SA9EIi)?VQ>piBi(QA8ohvV^rI_%R`?q|<{~A%p+b`n!)bVLP0CX?i z-X8tAzUJH=O1q5GFJmZA*VqyNM1YWQ>1Xg0Jb?iDHoANZ0Hj@!oAbS%@PPodunhqK z-{9#-_@H~i`M!35*?sKvN+Ob!-d(>a2Ww&?(Eg8z?pv0A0(%3X`}TrJR}ioupTU3F JHh+0po4=>nXx{(; literal 0 HcmV?d00001 diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java index dd72c9aad5010d..77c6fba37dc679 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java @@ -19,17 +19,25 @@ import org.apache.doris.common.jni.vec.ColumnValue; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Fixed; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; public class AvroColumnValue implements ColumnValue { @@ -42,6 +50,12 @@ public AvroColumnValue(ObjectInspector fieldInspector, Object fieldData) { } private Object inspectObject() { + if (fieldData instanceof ByteBuffer) { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(((ByteBuffer) fieldData).array()); + } else if (fieldData instanceof Fixed) { + return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject( + ((GenericFixed) fieldData).bytes()); + } return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); } @@ -162,6 +176,24 @@ public void unpackMap(List keys, List values) { @Override public void unpackStruct(List structFieldIndex, List values) { - + StructObjectInspector inspector = (StructObjectInspector) fieldInspector; + List fields = inspector.getAllStructFieldRefs(); + for (Integer idx : structFieldIndex) { + AvroColumnValue cv = null; + if (idx != null) { + StructField sf = fields.get(idx); + Object o; + if (fieldData instanceof GenericData.Record) { + GenericRecord record = (GenericRecord) inspector.getStructFieldData(fieldData, sf); + o = record.get(idx); + } else { + o = inspector.getStructFieldData(fieldData, sf); + } + if (Objects.nonNull(o)) { + cv = new AvroColumnValue(sf.getFieldObjectInspector(), o); + } + } + values.add(cv); + } } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 11bce610d70e7a..75cbc721e3142b 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -21,12 +21,9 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.TableSchema; -import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn; import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TPrimitiveType; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; @@ -40,10 +37,7 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -193,54 +187,6 @@ protected int getNext() throws IOException { @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { Schema schema = avroReader.getSchema(); - List schemaFields = schema.getFields(); - List schemaColumns = new ArrayList<>(); - for (Field schemaField : schemaFields) { - Schema avroSchema = schemaField.schema(); - String columnName = schemaField.name().toLowerCase(Locale.ROOT); - - SchemaColumn schemaColumn = new SchemaColumn(); - TPrimitiveType tPrimitiveType = serializeSchemaType(avroSchema, schemaColumn); - schemaColumn.setName(columnName); - schemaColumn.setType(tPrimitiveType); - schemaColumns.add(schemaColumn); - } - return new TableSchema(schemaColumns); - } - - private TPrimitiveType serializeSchemaType(Schema avroSchema, SchemaColumn schemaColumn) - throws UnsupportedOperationException { - Schema.Type type = avroSchema.getType(); - switch (type) { - case NULL: - return TPrimitiveType.NULL_TYPE; - case STRING: - return TPrimitiveType.STRING; - case INT: - return TPrimitiveType.INT; - case BOOLEAN: - return TPrimitiveType.BOOLEAN; - case LONG: - return TPrimitiveType.BIGINT; - case FLOAT: - return TPrimitiveType.FLOAT; - case BYTES: - return TPrimitiveType.BINARY; - case DOUBLE: - return TPrimitiveType.DOUBLE; - case ARRAY: - SchemaColumn arrayChildColumn = new SchemaColumn(); - schemaColumn.addChildColumn(arrayChildColumn); - arrayChildColumn.setType(serializeSchemaType(avroSchema.getElementType(), arrayChildColumn)); - return TPrimitiveType.ARRAY; - case MAP: - SchemaColumn mapChildColumn = new SchemaColumn(); - schemaColumn.addChildColumn(mapChildColumn); - mapChildColumn.setType(serializeSchemaType(avroSchema.getValueType(), mapChildColumn)); - return TPrimitiveType.MAP; - default: - throw new UnsupportedOperationException("avro format: " + type.getName() + " is not supported."); - } + return AvroTypeUtils.parseTableSchema(schema); } - } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java new file mode 100644 index 00000000000000..cd597fa4cfc314 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn; +import org.apache.doris.thrift.TPrimitiveType; + +import com.google.common.base.Preconditions; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.commons.compress.utils.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class AvroTypeUtils { + + protected static TableSchema parseTableSchema(Schema schema) throws UnsupportedOperationException { + List schemaFields = schema.getFields(); + List schemaColumns = new ArrayList<>(); + for (Field schemaField : schemaFields) { + Schema avroSchema = schemaField.schema(); + String columnName = schemaField.name(); + + SchemaColumn schemaColumn = new SchemaColumn(); + TPrimitiveType tPrimitiveType = typeFromAvro(avroSchema, schemaColumn); + schemaColumn.setName(columnName); + schemaColumn.setType(tPrimitiveType); + schemaColumns.add(schemaColumn); + } + return new TableSchema(schemaColumns); + } + + private static TPrimitiveType typeFromAvro(Schema avroSchema, SchemaColumn schemaColumn) + throws UnsupportedOperationException { + Schema.Type type = avroSchema.getType(); + switch (type) { + case ENUM: + case STRING: + return TPrimitiveType.STRING; + case INT: + return TPrimitiveType.INT; + case BOOLEAN: + return TPrimitiveType.BOOLEAN; + case LONG: + return TPrimitiveType.BIGINT; + case FLOAT: + return TPrimitiveType.FLOAT; + case FIXED: + case BYTES: + return TPrimitiveType.BINARY; + case DOUBLE: + return TPrimitiveType.DOUBLE; + case ARRAY: + SchemaColumn arrayChildColumn = new SchemaColumn(); + schemaColumn.addChildColumns(Collections.singletonList(arrayChildColumn)); + arrayChildColumn.setType(typeFromAvro(avroSchema.getElementType(), arrayChildColumn)); + return TPrimitiveType.ARRAY; + case MAP: + // The default type of AVRO MAP structure key is STRING + SchemaColumn keyChildColumn = new SchemaColumn(); + keyChildColumn.setType(TPrimitiveType.STRING); + SchemaColumn valueChildColumn = new SchemaColumn(); + valueChildColumn.setType(typeFromAvro(avroSchema.getValueType(), valueChildColumn)); + + schemaColumn.addChildColumns(Arrays.asList(keyChildColumn, valueChildColumn)); + return TPrimitiveType.MAP; + case RECORD: + List fields = avroSchema.getFields(); + List childSchemaColumn = Lists.newArrayList(); + for (Field field : fields) { + SchemaColumn structChildColumn = new SchemaColumn(); + structChildColumn.setName(field.name()); + structChildColumn.setType(typeFromAvro(field.schema(), structChildColumn)); + childSchemaColumn.add(structChildColumn); + } + schemaColumn.addChildColumns(childSchemaColumn); + return TPrimitiveType.STRUCT; + case UNION: + List nonNullableMembers = filterNullableUnion(avroSchema); + Preconditions.checkArgument(!nonNullableMembers.isEmpty(), + avroSchema.getName() + "Union child type not all nullAble type"); + List childSchemaColumns = Lists.newArrayList(); + for (Schema nullableMember : nonNullableMembers) { + SchemaColumn childColumn = new SchemaColumn(); + childColumn.setName(nullableMember.getName()); + childColumn.setType(typeFromAvro(nullableMember, childColumn)); + childSchemaColumns.add(childColumn); + } + schemaColumn.addChildColumns(childSchemaColumns); + return TPrimitiveType.STRUCT; + default: + throw new UnsupportedOperationException( + "avro format: " + avroSchema.getName() + type.getName() + " is not supported."); + } + } + + private static List filterNullableUnion(Schema schema) { + Preconditions.checkArgument(schema.isUnion(), "Schema must be union"); + return schema.getTypes().stream().filter(s -> !s.isNullable()).collect(Collectors.toList()); + } + +} diff --git a/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java b/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java new file mode 100644 index 00000000000000..04f5fd217bf47d --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.doris.common.jni.vec.TableSchema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class AvroTypeUtilsTest { + private Schema allTypesRecordSchema; + private final ObjectMapper objectMapper = new ObjectMapper(); + private String result; + + @Before + public void setUp() { + result = "[{\"name\":\"aBoolean\",\"type\":2,\"childColumns\":null},{\"name\":\"aInt\",\"type\":5," + + "\"childColumns\":null},{\"name\":\"aLong\",\"type\":6,\"childColumns\":null},{\"name\":\"" + + "aFloat\",\"type\":7,\"childColumns\":null},{\"name\":\"aDouble\",\"type\":8,\"childColumns\"" + + ":null},{\"name\":\"aString\",\"type\":23,\"childColumns\":null},{\"name\":\"aBytes\",\"type\"" + + ":11,\"childColumns\":null},{\"name\":\"aFixed\",\"type\":11,\"childColumns\":null},{\"name\"" + + ":\"anArray\",\"type\":20,\"childColumns\":[{\"name\":null,\"type\":5,\"childColumns\":null}]}" + + ",{\"name\":\"aMap\",\"type\":21,\"childColumns\":[{\"name\":null,\"type\":23,\"childColumns\"" + + ":null},{\"name\":null,\"type\":5,\"childColumns\":null}]},{\"name\":\"anEnum\",\"type\":23" + + ",\"childColumns\":null},{\"name\":\"aRecord\",\"type\":22,\"childColumns\":[{\"name\":\"a\"," + + "\"type\":5,\"childColumns\":null},{\"name\":\"b\",\"type\":8,\"childColumns\":null},{\"name\":" + + "\"c\",\"type\":23,\"childColumns\":null}]},{\"name\":\"aUnion\",\"type\":22,\"childColumns\":" + + "[{\"name\":\"string\",\"type\":23,\"childColumns\":null}]}]\n"; + + Schema simpleEnumSchema = SchemaBuilder.enumeration("myEnumType").symbols("A", "B", "C"); + Schema simpleRecordSchema = SchemaBuilder.record("simpleRecord") + .fields() + .name("a") + .type().intType().noDefault() + .name("b") + .type().doubleType().noDefault() + .name("c") + .type().stringType().noDefault() + .endRecord(); + + allTypesRecordSchema = SchemaBuilder.builder() + .record("all") + .fields() + .name("aBoolean") + .type().booleanType().noDefault() + .name("aInt") + .type().intType().noDefault() + .name("aLong") + .type().longType().noDefault() + .name("aFloat") + .type().floatType().noDefault() + .name("aDouble") + .type().doubleType().noDefault() + .name("aString") + .type().stringType().noDefault() + .name("aBytes") + .type().bytesType().noDefault() + .name("aFixed") + .type().fixed("myFixedType").size(16).noDefault() + .name("anArray") + .type().array().items().intType().noDefault() + .name("aMap") + .type().map().values().intType().noDefault() + .name("anEnum") + .type(simpleEnumSchema).noDefault() + .name("aRecord") + .type(simpleRecordSchema).noDefault() + .name("aUnion") + .type().optional().stringType() + .endRecord(); + } + + @Test + public void testParseTableSchema() throws IOException { + TableSchema tableSchema = AvroTypeUtils.parseTableSchema(allTypesRecordSchema); + String tableSchemaTableSchema = tableSchema.getTableSchema(); + JsonNode tableSchemaTree = objectMapper.readTree(tableSchemaTableSchema); + + JsonNode resultSchemaTree = objectMapper.readTree(result); + Assert.assertEquals(resultSchemaTree, tableSchemaTree); + } + +} diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java index 421feb55a3fdd0..9e223d0435f9e7 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java @@ -49,7 +49,7 @@ public String getTableSchema() throws IOException { public static class SchemaColumn { private String name; private int type; - private SchemaColumn childColumn; + private List childColumns; public SchemaColumn() { @@ -59,8 +59,8 @@ public String getName() { return name; } - public SchemaColumn getChildColumn() { - return childColumn; + public List getChildColumns() { + return childColumns; } public int getType() { @@ -75,8 +75,8 @@ public void setType(TPrimitiveType type) { this.type = type.getValue(); } - public void addChildColumn(SchemaColumn childColumn) { - this.childColumn = childColumn; + public void addChildColumns(List childColumns) { + this.childColumns = childColumns; } } diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out new file mode 100644 index 00000000000000..32612a5390398c --- /dev/null +++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +aBoolean BOOLEAN Yes false \N NONE +aInt INT Yes false \N NONE +aLong BIGINT Yes false \N NONE +aFloat FLOAT Yes false \N NONE +aDouble DOUBLE Yes false \N NONE +aString TEXT Yes false \N NONE +anArray ARRAY Yes false \N NONE +aMap MAP Yes false \N NONE +anEnum TEXT Yes false \N NONE +aRecord STRUCT Yes false \N NONE +aUnion STRUCT Yes false \N NONE +mapArrayLong MAP> Yes false \N NONE +arrayMapBoolean ARRAY> Yes false \N NONE + +-- !2 -- +2 + +-- !3 -- +true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] +false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] + +-- !4 -- +[1, 2, 3, 4] +[5, 6, 7] + +-- !5 -- +{"key1":1, "key2":2} +{"k1":1, "k2":2} + +-- !6 -- +A +B + +-- !7 -- +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} + +-- !8 -- +\N +\N + +-- !9 -- +{"k1":[99, 88, 77], "k2":[10, 20]} +{"k11":[77, 11, 33], "k22":[10, 20]} + +-- !10 -- +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] +[{"Key11":1}, {"Key22":0}] + +-- !hdfs_1 -- +aBoolean BOOLEAN Yes false \N NONE +aInt INT Yes false \N NONE +aLong BIGINT Yes false \N NONE +aFloat FLOAT Yes false \N NONE +aDouble DOUBLE Yes false \N NONE +aString TEXT Yes false \N NONE +anArray ARRAY Yes false \N NONE +aMap MAP Yes false \N NONE +anEnum TEXT Yes false \N NONE +aRecord STRUCT Yes false \N NONE +aUnion STRUCT Yes false \N NONE +mapArrayLong MAP> Yes false \N NONE +arrayMapBoolean ARRAY> Yes false \N NONE + +-- !hdfs_2 -- +true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] +false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] + +-- !hdfs_3 -- +2 + diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy new file mode 100644 index 00000000000000..60b569484ea272 --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { + + def all_type_file = "all_type.avro"; + def format = "avro" + + // s3 config + String ak = getS3AK() + String sk = getS3SK() + String s3_endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = context.config.otherConfigs.get("s3BucketName"); + def s3Uri = "https://${bucket}.${s3_endpoint}/regression/datalake/pipeline_data/tvf/${all_type_file}"; + + // hdfs config + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def hdfsUserName = "doris" + def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}" + def hdfsUri = "${defaultFS}" + "/user/doris/preinstalled_data/avro/avro_all_types/${all_type_file}" + + // TVF s3() + qt_1 """ + desc function s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_2 """ + select count(*) from s3( + "uri" ="${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_3 """ + select * from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}") order by aInt, aLong, aFloat; + """ + + qt_4 """ + select anArray from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_5 """ + select aMap from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_6 """ + select anEnum from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_7 """ + select aRecord from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_8 """ + select aUnion from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_9 """ + select mapArrayLong from s3( + "uri" ="${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + qt_10 """ + select arrayMapBoolean from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + // TVF hdfs() + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + qt_hdfs_1 """ + desc function HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "FORMAT" = "${format}"); """ + + qt_hdfs_2 """ select * from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}") order by aInt, aLong, aFloat; """ + + qt_hdfs_3 """ select count(*) from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}"); """ + } finally { + } + } +} \ No newline at end of file From 23c54cd69b853fdbde0def63e540015eaf60e7da Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Tue, 7 Nov 2023 15:31:15 +0800 Subject: [PATCH 2/4] fix --- be/src/vec/exec/format/avro/avro_jni_reader.cpp | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index a49cd12061186f..f3cff19c04d9be 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -86,18 +86,10 @@ Status AvroJNIReader::init_fetch_table_reader( {"file_type", std::to_string(type)}, {"is_get_table_schema", "false"}, {"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}}; - switch (type) { - case TFileType::FILE_HDFS: - required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value)); - break; - case TFileType::FILE_S3: - required_param.insert(std::make_pair("uri", _range.path)); + if (type == TFileType::FILE_S3) { required_param.insert(_params.properties.begin(), _params.properties.end()); - break; - default: - return Status::InternalError("unsupported file reader type: {}", std::to_string(type)); } - required_param.insert(_params.properties.begin(), _params.properties.end()); + required_param.insert(std::make_pair("uri", _range.path)); _jni_connector = std::make_unique("org/apache/doris/avro/AvroJNIScanner", required_param, column_names); RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range)); From fa4a0ec3ddfb6bbee23e0e7b50dc9a65d50e0d3e Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Tue, 7 Nov 2023 18:05:14 +0800 Subject: [PATCH 3/4] fix --- .../external_table_p0/tvf/test_tvf_avro.out | 30 +++++++------- .../tvf/test_tvf_avro.groovy | 40 +++++++++---------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out index 32612a5390398c..4595771a6780b4 100644 --- a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out +++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out @@ -17,39 +17,39 @@ arrayMapBoolean ARRAY> Yes false \N NONE -- !2 -- 2 --- !3 -- -true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] +-- !1 -- false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] +true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] --- !4 -- +-- !2 -- [1, 2, 3, 4] [5, 6, 7] --- !5 -- -{"key1":1, "key2":2} +-- !3 -- {"k1":1, "k2":2} +{"key1":1, "key2":2} --- !6 -- +-- !4 -- A B --- !7 -- +-- !5 -- {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} --- !8 -- +-- !6 -- \N \N --- !9 -- +-- !7 -- {"k1":[99, 88, 77], "k2":[10, 20]} {"k11":[77, 11, 33], "k22":[10, 20]} --- !10 -- -[{"arrayMapKey1":0}, {"arrayMapKey2":1}] +-- !8 -- [{"Key11":1}, {"Key22":0}] +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] --- !hdfs_1 -- +-- !3 -- aBoolean BOOLEAN Yes false \N NONE aInt INT Yes false \N NONE aLong BIGINT Yes false \N NONE @@ -64,10 +64,10 @@ aUnion STRUCT Yes false \N NONE mapArrayLong MAP> Yes false \N NONE arrayMapBoolean ARRAY> Yes false \N NONE --- !hdfs_2 -- -true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] +-- !8 -- false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] +true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] --- !hdfs_3 -- +-- !4 -- 2 diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy index 60b569484ea272..00bec952e00710 100644 --- a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy @@ -42,7 +42,7 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ qt_2 """ @@ -51,19 +51,19 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ - qt_3 """ + order_qt_1 """ select * from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}") order by aInt, aLong, aFloat; + "FORMAT" = "${format}"); """ - qt_4 """ + order_qt_2 """ select anArray from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", @@ -72,16 +72,16 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "FORMAT" = "${format}"); """ - qt_5 """ + order_qt_3 """ select aMap from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ - qt_6 """ + order_qt_4 """ select anEnum from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", @@ -90,60 +90,60 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "FORMAT" = "${format}"); """ - qt_7 """ + order_qt_5 """ select aRecord from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ - qt_8 """ + order_qt_6 """ select aUnion from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ - qt_9 """ + order_qt_7 """ select mapArrayLong from s3( "uri" ="${s3Uri}", "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ - qt_10 """ + order_qt_7 """ select arrayMapBoolean from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", "SECRET_KEY" = "${sk}", "REGION" = "${region}", - "FORMAT" = "${format}"); + "FORMAT" = "${format}"); """ // TVF hdfs() String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { try { - qt_hdfs_1 """ + qt_3 """ desc function HDFS( "uri" = "${hdfsUri}", "fs.defaultFS" = "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "FORMAT" = "${format}"); """ - qt_hdfs_2 """ select * from HDFS( + order_qt_8 """ select * from HDFS( "uri" = "${hdfsUri}", "fs.defaultFS" = "${defaultFS}", "hadoop.username" = "${hdfsUserName}", - "format" = "${format}") order by aInt, aLong, aFloat; """ + "format" = "${format}")""" - qt_hdfs_3 """ select count(*) from HDFS( + qt_4 """ select count(*) from HDFS( "uri" = "${hdfsUri}", "fs.defaultFS" = "${defaultFS}", "hadoop.username" = "${hdfsUserName}", From 24de8b1643d334c9f8be7fb46f6b504e233eb60d Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Tue, 7 Nov 2023 18:10:15 +0800 Subject: [PATCH 4/4] fix --- regression-test/data/external_table_p0/tvf/test_tvf_avro.out | 2 +- .../suites/external_table_p0/tvf/test_tvf_avro.groovy | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out index 4595771a6780b4..8f39bd410c9e67 100644 --- a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out +++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out @@ -64,7 +64,7 @@ aUnion STRUCT Yes false \N NONE mapArrayLong MAP> Yes false \N NONE arrayMapBoolean ARRAY> Yes false \N NONE --- !8 -- +-- !9 -- false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}] true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}] diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy index 00bec952e00710..6f9b4f98b49c42 100644 --- a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy @@ -117,7 +117,7 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "FORMAT" = "${format}"); """ - order_qt_7 """ + order_qt_8 """ select arrayMapBoolean from s3( "uri" = "${s3Uri}", "ACCESS_KEY" = "${ak}", @@ -137,7 +137,7 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "hadoop.username" = "${hdfsUserName}", "FORMAT" = "${format}"); """ - order_qt_8 """ select * from HDFS( + order_qt_9 """ select * from HDFS( "uri" = "${hdfsUri}", "fs.defaultFS" = "${defaultFS}", "hadoop.username" = "${hdfsUserName}",