diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index bdec8f9fbe3523..dc1ea2183cfb0e 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader( if (type == TFileType::FILE_S3) { required_param.insert(_params.properties.begin(), _params.properties.end()); } + required_param.insert( + std::make_pair("split_start_offset", std::to_string(_range.start_offset))); + required_param.insert(std::make_pair("split_size", std::to_string(_range.size))); + required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size))); required_param.insert(std::make_pair("uri", _range.path)); _jni_connector = std::make_unique("org/apache/doris/avro/AvroJNIScanner", required_param, column_names); diff --git a/fe/be-java-extensions/avro-scanner/pom.xml b/fe/be-java-extensions/avro-scanner/pom.xml index f95fa947051336..878e4a33e02d32 100644 --- a/fe/be-java-extensions/avro-scanner/pom.xml +++ b/fe/be-java-extensions/avro-scanner/pom.xml @@ -36,6 +36,11 @@ under the License. + + org.apache.avro + avro-mapred + ${avro.version} + org.apache.doris java-common @@ -53,12 +58,39 @@ under the License. org.apache.hadoop - hadoop-hdfs - provided + hadoop-aws + + + slf4j-log4j12 + org.slf4j + + + log4j + log4j + + + servlet-api + javax.servlet + + + aws-java-sdk-s3 + com.amazonaws + + + aws-java-sdk-bundle + com.amazonaws + + com.amazonaws - aws-java-sdk-s3 + aws-java-sdk-bundle + ${aws-java-sdk.version} + + + org.apache.hadoop + hadoop-hdfs + provided org.apache.doris diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java new file mode 100644 index 00000000000000..3e3264e62919cd --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.avro.Schema; + +import java.util.Set; + +public class AvroFileContext { + private Schema schema; + private Set requiredFields; + private Long splitStartOffset; + private Long splitSize; + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public Schema getSchema() { + return schema; + } + + public void setRequiredFields(Set requiredFields) { + this.requiredFields = requiredFields; + } + + public void setSplitStartOffset(Long splitStartOffset) { + this.splitStartOffset = splitStartOffset; + } + + public void setSplitSize(Long splitSize) { + this.splitSize = splitSize; + } + + public Long getSplitStartOffset() { + return this.splitStartOffset; + } + + public Long getSplitSize() { + return this.splitSize; + } + + public Set getRequiredFields() { + return requiredFields; + } +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 75cbc721e3142b..17a185d03ae138 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -25,6 +25,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -33,14 +35,17 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; public class AvroJNIScanner extends JniScanner { @@ -50,9 +55,11 @@ public class AvroJNIScanner extends JniScanner { private final String uri; private final Map requiredParams; private final Integer fetchSize; + private final ClassLoader classLoader; private int[] requiredColumnIds; private String[] columnTypes; private String[] requiredFields; + private Set requiredFieldSet; private ColumnType[] requiredTypes; private AvroReader avroReader; private final boolean isGetTableSchema; @@ -61,6 +68,12 @@ public class AvroJNIScanner extends JniScanner { private StructField[] structFields; private ObjectInspector[] fieldInspectors; private String serde; + private AvroFileContext avroFileContext; + private AvroWrapper> inputPair; + private NullWritable ignore; + private Long splitStartOffset; + private Long splitSize; + private Long splitFileSize; /** * Call by JNI for get table data or get table schema @@ -69,6 +82,7 @@ public class AvroJNIScanner extends JniScanner { * @param requiredParams required params */ public AvroJNIScanner(int fetchSize, Map requiredParams) { + this.classLoader = this.getClass().getClassLoader(); this.requiredParams = requiredParams; this.fetchSize = fetchSize; this.isGetTableSchema = Boolean.parseBoolean(requiredParams.get(AvroProperties.IS_GET_TABLE_SCHEMA)); @@ -79,14 +93,20 @@ public AvroJNIScanner(int fetchSize, Map requiredParams) { .split(AvroProperties.COLUMNS_TYPE_DELIMITER); this.requiredFields = requiredParams.get(AvroProperties.REQUIRED_FIELDS) .split(AvroProperties.FIELDS_DELIMITER); + this.requiredFieldSet = new HashSet<>(Arrays.asList(requiredFields)); this.requiredTypes = new ColumnType[requiredFields.length]; this.serde = requiredParams.get(AvroProperties.HIVE_SERDE); this.structFields = new StructField[requiredFields.length]; this.fieldInspectors = new ObjectInspector[requiredFields.length]; + this.inputPair = new AvroWrapper<>(null); + this.ignore = NullWritable.get(); + this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET)); + this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE)); + this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE)); } } - private void init() throws Exception { + private void initFieldInspector() throws Exception { requiredColumnIds = new int[requiredFields.length]; for (int i = 0; i < requiredFields.length; i++) { ColumnType columnType = ColumnType.parseType(requiredFields[i], columnTypes[i]); @@ -127,14 +147,7 @@ private Deserializer getDeserializer(Configuration configuration, Properties pro @Override public void open() throws IOException { - try { - if (!isGetTableSchema) { - init(); - } - } catch (Exception e) { - LOG.warn("Failed to init avro scanner. ", e); - throw new IOException(e); - } + Thread.currentThread().setContextClassLoader(classLoader); switch (fileType) { case FILE_HDFS: this.avroReader = new HDFSFileReader(uri); @@ -150,12 +163,30 @@ public void open() throws IOException { LOG.warn("Unsupported " + fileType.name() + " file type."); throw new IOException("Unsupported " + fileType.name() + " file type."); } - this.avroReader.open(new Configuration()); if (!isGetTableSchema) { + initDataReader(); + } + this.avroReader.open(avroFileContext, isGetTableSchema); + } + + private void initDataReader() { + try { + initAvroFileContext(); + initFieldInspector(); initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); + } catch (Exception e) { + LOG.warn("Failed to init avro scanner. ", e); + throw new RuntimeException(e); } } + private void initAvroFileContext() { + avroFileContext = new AvroFileContext(); + avroFileContext.setRequiredFields(requiredFieldSet); + avroFileContext.setSplitStartOffset(splitStartOffset); + avroFileContext.setSplitSize(splitSize); + } + @Override public void close() throws IOException { if (Objects.nonNull(avroReader)) { @@ -167,7 +198,7 @@ public void close() throws IOException { protected int getNext() throws IOException { int numRows = 0; for (; numRows < getBatchSize(); numRows++) { - if (!avroReader.hasNext()) { + if (!avroReader.hasNext(inputPair, ignore)) { break; } GenericRecord rowRecord = (GenericRecord) avroReader.getNext(); @@ -189,4 +220,5 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException { Schema schema = avroReader.getSchema(); return AvroTypeUtils.parseTableSchema(schema); } + } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java index 6619b6888c4a9e..416b7d25897f93 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java @@ -34,5 +34,12 @@ public class AvroProperties { protected static final String HIVE_SERDE = "hive.serde"; protected static final String COLUMNS = "columns"; protected static final String COLUMNS2TYPES = "columns.types"; + protected static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + protected static final String FS_S3A_REGION = "fs.s3a.region"; + protected static final String SPLIT_START_OFFSET = "split_start_offset"; + protected static final String SPLIT_SIZE = "split_size"; + protected static final String SPLIT_FILE_SIZE = "split_file_size"; } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java index eb012e402be416..50b647361bf9ff 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java @@ -17,21 +17,89 @@ package org.apache.doris.avro; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; +import org.apache.avro.Schema.Parser; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroJob; +import org.apache.avro.mapred.AvroRecordReader; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; +import java.util.Set; -public interface AvroReader { +public abstract class AvroReader { - void open(Configuration conf) throws IOException; + private static final Logger LOG = LogManager.getLogger(AvroReader.class); + protected AvroRecordReader> dataReader; + protected DataFileStream schemaReader; + protected Path path; + protected FileSystem fileSystem; - Schema getSchema(); + public abstract void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException; - boolean hasNext(); + public abstract Schema getSchema(); - Object getNext() throws IOException; + public abstract boolean hasNext(AvroWrapper> inputPair, NullWritable ignore) throws IOException; - void close() throws IOException; + public abstract Object getNext(); + + public abstract void close() throws IOException; + + protected void openSchemaReader() throws IOException { + InputStream inputStream = new BufferedInputStream(fileSystem.open(path)); + schemaReader = new DataFileStream<>(inputStream, new GenericDatumReader<>()); + LOG.debug("success open avro schema reader."); + } + + protected void openDataReader(AvroFileContext avroFileContext) throws IOException { + JobConf job = new JobConf(); + projectionSchema(job, avroFileContext); + FileSplit fileSplit = + new FileSplit(path, avroFileContext.getSplitStartOffset(), avroFileContext.getSplitSize(), job); + dataReader = new AvroRecordReader<>(job, fileSplit); + LOG.debug("success open avro data reader."); + } + + protected void projectionSchema(JobConf job, AvroFileContext avroFileContext) { + Schema projectionSchema; + Set filedNames = avroFileContext.getRequiredFields(); + Schema avroSchema = avroFileContext.getSchema(); + // The number of fields that need to be queried is the same as that of the avro file, + // so no projection is required. + if (filedNames.size() != avroSchema.getFields().size()) { + JsonObject schemaJson = new Gson().fromJson(avroSchema.toString(), JsonObject.class); + JsonArray schemaFields = schemaJson.get("fields").getAsJsonArray(); + JsonObject copySchemaJson = schemaJson.deepCopy(); + JsonArray copySchemaFields = copySchemaJson.get("fields").getAsJsonArray(); + for (int i = 0; i < schemaFields.size(); i++) { + JsonObject object = schemaFields.get(i).getAsJsonObject(); + String name = object.get("name").getAsString(); + if (filedNames.contains(name)) { + continue; + } + copySchemaFields.remove(schemaFields.get(i)); + } + projectionSchema = new Parser().parse(copySchemaJson.toString()); + } else { + projectionSchema = avroSchema; + } + AvroJob.setInputSchema(job, projectionSchema); + LOG.debug("projection avro schema is:" + projectionSchema.toString()); + } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java index cd597fa4cfc314..32ead116dd176b 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java @@ -25,6 +25,8 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.commons.compress.utils.Lists; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +36,8 @@ public class AvroTypeUtils { + private static final Logger LOG = LogManager.getLogger(AvroTypeUtils.class); + protected static TableSchema parseTableSchema(Schema schema) throws UnsupportedOperationException { List schemaFields = schema.getFields(); List schemaColumns = new ArrayList<>(); diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java index 8c189704027522..be9f355912a930 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java @@ -18,61 +18,63 @@ package org.apache.doris.avro; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.BufferedInputStream; import java.io.IOException; import java.net.URI; import java.util.Objects; -public class HDFSFileReader implements AvroReader { +public class HDFSFileReader extends AvroReader { private static final Logger LOG = LogManager.getLogger(HDFSFileReader.class); - private final Path filePath; private final String url; - private DataFileStream reader; - private BufferedInputStream inputStream; + private AvroWrapper> inputPair; public HDFSFileReader(String url) { this.url = url; - this.filePath = new Path(url); + this.path = new Path(url); } @Override - public void open(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(URI.create(url), conf); - inputStream = new BufferedInputStream(fs.open(filePath)); - reader = new DataFileStream<>(inputStream, new GenericDatumReader<>()); + public void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException { + fileSystem = FileSystem.get(URI.create(url), new Configuration()); + openSchemaReader(); + if (!tableSchema) { + avroFileContext.setSchema(schemaReader.getSchema()); + openDataReader(avroFileContext); + } } @Override public Schema getSchema() { - return reader.getSchema(); + return schemaReader.getSchema(); } @Override - public boolean hasNext() { - return reader.hasNext(); + public boolean hasNext(AvroWrapper> inputPair, NullWritable ignore) throws IOException { + this.inputPair = inputPair; + return dataReader.next(this.inputPair, ignore); } @Override - public Object getNext() throws IOException { - return reader.next(); + public Object getNext() { + return inputPair.datum(); } @Override public void close() throws IOException { - if (Objects.nonNull(inputStream)) { - inputStream.close(); + if (Objects.nonNull(schemaReader)) { + schemaReader.close(); } - if (Objects.nonNull(reader)) { - reader.close(); + if (Objects.nonNull(dataReader)) { + dataReader.close(); } + fileSystem.close(); } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java index 4b1b4a864ce86b..185fc5a31070aa 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java @@ -17,81 +17,84 @@ package org.apache.doris.avro; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.io.InputStream; +import java.net.URI; import java.util.Objects; -public class S3FileReader implements AvroReader { +public class S3FileReader extends AvroReader { private static final Logger LOG = LogManager.getLogger(S3FileReader.class); private final String bucketName; private final String key; - private AmazonS3 s3Client; - private DataFileStream reader; - private InputStream s3ObjectInputStream; - private final AWSCredentials credentials; + private AvroWrapper> inputPair; private final String endpoint; private final String region; + private final String accessKey; + private final String secretKey; + private final String s3aUri; public S3FileReader(String accessKey, String secretKey, String endpoint, String region, String uri) throws IOException { this.endpoint = endpoint; this.region = region; - this.credentials = new BasicAWSCredentials(accessKey, secretKey); S3Utils.parseURI(uri); this.bucketName = S3Utils.getBucket(); this.key = S3Utils.getKey(); + this.accessKey = accessKey; + this.secretKey = secretKey; + this.s3aUri = "s3a://" + bucketName + "/" + key; } @Override - public void open(Configuration conf) throws IOException { - s3Client = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(credentials)) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) - .build(); - S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, key)); - s3ObjectInputStream = object.getObjectContent(); - reader = new DataFileStream<>(s3ObjectInputStream, new GenericDatumReader<>()); + public void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException { + Configuration conf = new Configuration(); + conf.set(AvroProperties.FS_S3A_ACCESS_KEY, accessKey); + conf.set(AvroProperties.FS_S3A_SECRET_KEY, secretKey); + conf.set(AvroProperties.FS_S3A_ENDPOINT, endpoint); + conf.set(AvroProperties.FS_S3A_REGION, region); + path = new Path(s3aUri); + fileSystem = FileSystem.get(URI.create(s3aUri), conf); + openSchemaReader(); + if (!tableSchema) { + avroFileContext.setSchema(schemaReader.getSchema()); + openDataReader(avroFileContext); + } } @Override public Schema getSchema() { - return reader.getSchema(); + return schemaReader.getSchema(); } @Override - public boolean hasNext() { - return reader.hasNext(); + public boolean hasNext(AvroWrapper> inputPair, NullWritable ignore) throws IOException { + this.inputPair = inputPair; + return dataReader.next(this.inputPair, ignore); } @Override - public Object getNext() throws IOException { - return reader.next(); + public Object getNext() { + return inputPair.datum(); } @Override public void close() throws IOException { - if (Objects.nonNull(s3ObjectInputStream)) { - s3ObjectInputStream.close(); + if (Objects.nonNull(schemaReader)) { + schemaReader.close(); } - if (Objects.nonNull(reader)) { - reader.close(); + if (Objects.nonNull(dataReader)) { + dataReader.close(); } + fileSystem.close(); } } diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out index 8f39bd410c9e67..d4205d988d0554 100644 --- a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out +++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out @@ -49,6 +49,18 @@ B [{"Key11":1}, {"Key22":0}] [{"arrayMapKey1":0}, {"arrayMapKey2":1}] +-- !10 -- +[{"Key11":1}, {"Key22":0}] false {"k1":1, "k2":2} 9999 \N +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] true {"key1":1, "key2":2} 3400 \N + +-- !11 -- +a test string 9.81 A +string test 9.1102 B + +-- !12 -- +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"k1":1, "k2":2} 2.11 +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"key1":1, "key2":2} 3.14 + -- !3 -- aBoolean BOOLEAN Yes false \N NONE aInt INT Yes false \N NONE @@ -71,3 +83,15 @@ true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5 -- !4 -- 2 +-- !13 -- +[{"Key11":1}, {"Key22":0}] false {"k1":1, "k2":2} 9999 \N +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] true {"key1":1, "key2":2} 3400 \N + +-- !14 -- +a test string 9.81 A +string test 9.1102 B + +-- !15 -- +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"k1":1, "k2":2} 2.11 +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"key1":1, "key2":2} 3.14 + diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy index 6f9b4f98b49c42..e6a54771582dde 100644 --- a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy @@ -126,6 +126,33 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "FORMAT" = "${format}"); """ + order_qt_10 """ + select arrayMapBoolean,aBoolean,aMap,aLong,aUnion from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_11 """ + select aString,aDouble,anEnum from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_12 """ + select aRecord,aMap,aFloat from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + // TVF hdfs() String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { @@ -148,6 +175,24 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "fs.defaultFS" = "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}"); """ + + order_qt_13 """ select arrayMapBoolean,aBoolean,aMap,aLong,aUnion from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" + + order_qt_14 """ select aString,aDouble,anEnum from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" + + order_qt_15 """ select aRecord,aMap,aFloat from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" } finally { } }