From 63129954cf1612b50f6fe26de3baa96881d9886b Mon Sep 17 00:00:00 2001 From: wudongliang <46414265+DongLiang-0@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:33:27 +0800 Subject: [PATCH 1/3] [Feature](tvf)(avro-jni) avro-jni add projection push down (#26885) --- fe/be-java-extensions/avro-scanner/pom.xml | 38 +++++- .../org/apache/doris/avro/AvroFileCache.java | 118 ++++++++++++++++++ .../org/apache/doris/avro/AvroJNIScanner.java | 59 +++++++-- .../org/apache/doris/avro/AvroProperties.java | 4 + .../org/apache/doris/avro/AvroReader.java | 78 ++++++++++-- .../org/apache/doris/avro/AvroTypeUtils.java | 4 + .../org/apache/doris/avro/HDFSFileReader.java | 48 +++---- .../org/apache/doris/avro/S3FileReader.java | 75 +++++------ .../external_table_p0/tvf/test_tvf_avro.out | 24 ++++ .../tvf/test_tvf_avro.groovy | 45 +++++++ 10 files changed, 415 insertions(+), 78 deletions(-) create mode 100644 fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java diff --git a/fe/be-java-extensions/avro-scanner/pom.xml b/fe/be-java-extensions/avro-scanner/pom.xml index f95fa947051336..878e4a33e02d32 100644 --- a/fe/be-java-extensions/avro-scanner/pom.xml +++ b/fe/be-java-extensions/avro-scanner/pom.xml @@ -36,6 +36,11 @@ under the License. + + org.apache.avro + avro-mapred + ${avro.version} + org.apache.doris java-common @@ -53,12 +58,39 @@ under the License. org.apache.hadoop - hadoop-hdfs - provided + hadoop-aws + + + slf4j-log4j12 + org.slf4j + + + log4j + log4j + + + servlet-api + javax.servlet + + + aws-java-sdk-s3 + com.amazonaws + + + aws-java-sdk-bundle + com.amazonaws + + com.amazonaws - aws-java-sdk-s3 + aws-java-sdk-bundle + ${aws-java-sdk.version} + + + org.apache.hadoop + hadoop-hdfs + provided org.apache.doris diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java new file mode 100644 index 00000000000000..0ffcb9bc99c089 --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import com.google.common.base.Objects; +import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; + +public class AvroFileCache { + private static final Logger LOG = LoggerFactory.getLogger(AvroFileCache.class); + private static final Map fileCache = Maps.newHashMap(); + + public static void addFileMeta(AvroFileCacheKey avroFileCacheKey, AvroFileMeta avroFileMeta) { + fileCache.put(avroFileCacheKey, avroFileMeta); + } + + public static AvroFileMeta getAvroFileMeta(AvroFileCacheKey key) { + return fileCache.get(key); + } + + public static void invalidateFileCache(AvroFileCacheKey key) { + fileCache.remove(key); + } + + public static class AvroFileMeta { + private final String schema; + private Set requiredFields; + // TODO split file + private String splitInfo; + + AvroFileMeta(String schema) { + this.schema = schema; + } + + AvroFileMeta(String schema, String splitInfo) { + this.schema = schema; + this.splitInfo = splitInfo; + } + + public String getSchema() { + return schema; + } + + public String getSplitInfo() { + return splitInfo; + } + + public void setRequiredFields(Set requiredFields) { + this.requiredFields = requiredFields; + } + + public Set getRequiredFields() { + return requiredFields; + } + } + + protected static class AvroFileCacheKey { + private final String fileType; + private final String uri; + + AvroFileCacheKey(String fileType, String uri) { + this.fileType = fileType; + this.uri = uri; + } + + protected String getUri() { + return uri; + } + + protected String getFileType() { + return fileType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AvroFileCacheKey that = (AvroFileCacheKey) o; + return Objects.equal(fileType, that.fileType) && Objects.equal(uri, that.uri); + } + + @Override + public int hashCode() { + return Objects.hashCode(fileType, uri); + } + + @Override + public String toString() { + return "AvroFileCacheKey{" + + "fileType='" + fileType + '\'' + + ", uri='" + uri + '\'' + + '}'; + } + } +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 75cbc721e3142b..9cfaa09262cc7b 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -17,6 +17,8 @@ package org.apache.doris.avro; +import org.apache.doris.avro.AvroFileCache.AvroFileCacheKey; +import org.apache.doris.avro.AvroFileCache.AvroFileMeta; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; @@ -25,6 +27,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.serde.serdeConstants; @@ -33,14 +37,17 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Arrays; +import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; public class AvroJNIScanner extends JniScanner { @@ -50,9 +57,11 @@ public class AvroJNIScanner extends JniScanner { private final String uri; private final Map requiredParams; private final Integer fetchSize; + private final ClassLoader classLoader; private int[] requiredColumnIds; private String[] columnTypes; private String[] requiredFields; + private Set requiredFieldSet; private ColumnType[] requiredTypes; private AvroReader avroReader; private final boolean isGetTableSchema; @@ -61,6 +70,10 @@ public class AvroJNIScanner extends JniScanner { private StructField[] structFields; private ObjectInspector[] fieldInspectors; private String serde; + private AvroFileCacheKey avroFileCacheKey; + private AvroFileMeta avroFileMeta; + private AvroWrapper> inputPair; + private NullWritable ignore; /** * Call by JNI for get table data or get table schema @@ -69,6 +82,7 @@ public class AvroJNIScanner extends JniScanner { * @param requiredParams required params */ public AvroJNIScanner(int fetchSize, Map requiredParams) { + this.classLoader = this.getClass().getClassLoader(); this.requiredParams = requiredParams; this.fetchSize = fetchSize; this.isGetTableSchema = Boolean.parseBoolean(requiredParams.get(AvroProperties.IS_GET_TABLE_SCHEMA)); @@ -79,14 +93,17 @@ public AvroJNIScanner(int fetchSize, Map requiredParams) { .split(AvroProperties.COLUMNS_TYPE_DELIMITER); this.requiredFields = requiredParams.get(AvroProperties.REQUIRED_FIELDS) .split(AvroProperties.FIELDS_DELIMITER); + this.requiredFieldSet = new HashSet<>(Arrays.asList(requiredFields)); this.requiredTypes = new ColumnType[requiredFields.length]; this.serde = requiredParams.get(AvroProperties.HIVE_SERDE); this.structFields = new StructField[requiredFields.length]; this.fieldInspectors = new ObjectInspector[requiredFields.length]; + this.inputPair = new AvroWrapper<>(null); + this.ignore = NullWritable.get(); } } - private void init() throws Exception { + private void initFieldInspector() throws Exception { requiredColumnIds = new int[requiredFields.length]; for (int i = 0; i < requiredFields.length; i++) { ColumnType columnType = ColumnType.parseType(requiredFields[i], columnTypes[i]); @@ -127,14 +144,7 @@ private Deserializer getDeserializer(Configuration configuration, Properties pro @Override public void open() throws IOException { - try { - if (!isGetTableSchema) { - init(); - } - } catch (Exception e) { - LOG.warn("Failed to init avro scanner. ", e); - throw new IOException(e); - } + Thread.currentThread().setContextClassLoader(classLoader); switch (fileType) { case FILE_HDFS: this.avroReader = new HDFSFileReader(uri); @@ -150,9 +160,24 @@ public void open() throws IOException { LOG.warn("Unsupported " + fileType.name() + " file type."); throw new IOException("Unsupported " + fileType.name() + " file type."); } - this.avroReader.open(new Configuration()); if (!isGetTableSchema) { + initDataReader(); + } + this.avroReader.open(avroFileMeta, isGetTableSchema); + } + + private void initDataReader() { + try { + avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri); + avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey); + avroFileMeta.setRequiredFields(requiredFieldSet); + initFieldInspector(); initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); + } catch (Exception e) { + LOG.warn("Failed to init avro scanner. ", e); + throw new RuntimeException(e); + } finally { + AvroFileCache.invalidateFileCache(avroFileCacheKey); } } @@ -167,7 +192,7 @@ public void close() throws IOException { protected int getNext() throws IOException { int numRows = 0; for (; numRows < getBatchSize(); numRows++) { - if (!avroReader.hasNext()) { + if (!avroReader.hasNext(inputPair, ignore)) { break; } GenericRecord rowRecord = (GenericRecord) avroReader.getNext(); @@ -187,6 +212,18 @@ protected int getNext() throws IOException { @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { Schema schema = avroReader.getSchema(); + addFileMeta2Cache(schema); return AvroTypeUtils.parseTableSchema(schema); } + + /** + * Cache avro file metadata in order to push down the projection of the actual read data. + * + * @param schema avro file schema + */ + private void addFileMeta2Cache(Schema schema) { + AvroFileMeta avroFileMeta = new AvroFileMeta(schema.toString()); + AvroFileCacheKey avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri); + AvroFileCache.addFileMeta(avroFileCacheKey, avroFileMeta); + } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java index 6619b6888c4a9e..7c6cbbcbb4e567 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java @@ -34,5 +34,9 @@ public class AvroProperties { protected static final String HIVE_SERDE = "hive.serde"; protected static final String COLUMNS = "columns"; protected static final String COLUMNS2TYPES = "columns.types"; + protected static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + protected static final String FS_S3A_REGION = "fs.s3a.region"; } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java index eb012e402be416..9180cf617f7e4a 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java @@ -17,21 +17,85 @@ package org.apache.doris.avro; +import org.apache.doris.avro.AvroFileCache.AvroFileMeta; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; +import org.apache.avro.Schema.Parser; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroJob; +import org.apache.avro.mapred.AvroRecordReader; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +public abstract class AvroReader { + + private static final Logger LOG = LogManager.getLogger(AvroReader.class); + protected AvroRecordReader> dataReader; + protected DataFileStream schemaReader; + protected Path path; + protected FileSystem fileSystem; + + public abstract void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOException; + + public abstract Schema getSchema(); -public interface AvroReader { + public abstract boolean hasNext(AvroWrapper> inputPair, NullWritable ignore) throws IOException; - void open(Configuration conf) throws IOException; + public abstract Object getNext(); - Schema getSchema(); + public abstract void close() throws IOException; - boolean hasNext(); + protected void openSchemaReader() throws IOException { + InputStream inputStream = new BufferedInputStream(fileSystem.open(path)); + schemaReader = new DataFileStream<>(inputStream, new GenericDatumReader<>()); + LOG.debug("success open avro schema reader."); + } - Object getNext() throws IOException; + protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException { + JobConf job = new JobConf(); + projectionSchema(job, avroFileMeta); + FileStatus fileStatus = fileSystem.getFileStatus(path); + // TODO split file + FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getLen(), job); + dataReader = new AvroRecordReader<>(job, fileSplit); + LOG.debug("success open avro data reader."); + } - void close() throws IOException; + protected void projectionSchema(JobConf job, AvroFileMeta avroFileMeta) { + Set filedNames = avroFileMeta.getRequiredFields(); + JsonObject schemaJson = new Gson().fromJson(avroFileMeta.getSchema(), JsonObject.class); + JsonObject copySchemaJson = schemaJson.deepCopy(); + JsonArray schemaFields = schemaJson.get("fields").getAsJsonArray(); + JsonArray copySchemaFields = copySchemaJson.get("fields").getAsJsonArray(); + for (int i = 0; i < schemaFields.size(); i++) { + JsonObject object = schemaFields.get(i).getAsJsonObject(); + String name = object.get("name").getAsString(); + if (filedNames.contains(name)) { + continue; + } + copySchemaFields.remove(schemaFields.get(i)); + } + Schema projectionSchema = new Parser().parse(copySchemaJson.toString()); + AvroJob.setInputSchema(job, projectionSchema); + LOG.debug("projection avro schema is:" + projectionSchema.toString()); + } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java index cd597fa4cfc314..32ead116dd176b 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java @@ -25,6 +25,8 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.commons.compress.utils.Lists; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +36,8 @@ public class AvroTypeUtils { + private static final Logger LOG = LogManager.getLogger(AvroTypeUtils.class); + protected static TableSchema parseTableSchema(Schema schema) throws UnsupportedOperationException { List schemaFields = schema.getFields(); List schemaColumns = new ArrayList<>(); diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java index 8c189704027522..7d6fc40eb13dee 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java @@ -17,62 +17,66 @@ package org.apache.doris.avro; +import org.apache.doris.avro.AvroFileCache.AvroFileMeta; + import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.BufferedInputStream; import java.io.IOException; import java.net.URI; import java.util.Objects; -public class HDFSFileReader implements AvroReader { +public class HDFSFileReader extends AvroReader { private static final Logger LOG = LogManager.getLogger(HDFSFileReader.class); - private final Path filePath; private final String url; - private DataFileStream reader; - private BufferedInputStream inputStream; + private AvroWrapper> inputPair; public HDFSFileReader(String url) { this.url = url; - this.filePath = new Path(url); + this.path = new Path(url); } @Override - public void open(Configuration conf) throws IOException { - FileSystem fs = FileSystem.get(URI.create(url), conf); - inputStream = new BufferedInputStream(fs.open(filePath)); - reader = new DataFileStream<>(inputStream, new GenericDatumReader<>()); + public void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOException { + fileSystem = FileSystem.get(URI.create(url), new Configuration()); + if (tableSchema) { + openSchemaReader(); + } else { + openDataReader(avroFileMeta); + } } @Override public Schema getSchema() { - return reader.getSchema(); + return schemaReader.getSchema(); } @Override - public boolean hasNext() { - return reader.hasNext(); + public boolean hasNext(AvroWrapper> inputPair, NullWritable ignore) throws IOException { + this.inputPair = inputPair; + return dataReader.next(this.inputPair, ignore); } @Override - public Object getNext() throws IOException { - return reader.next(); + public Object getNext() { + return inputPair.datum(); } @Override public void close() throws IOException { - if (Objects.nonNull(inputStream)) { - inputStream.close(); + if (Objects.nonNull(schemaReader)) { + schemaReader.close(); } - if (Objects.nonNull(reader)) { - reader.close(); + if (Objects.nonNull(dataReader)) { + dataReader.close(); } + fileSystem.close(); } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java index 4b1b4a864ce86b..d3bbac9e991caa 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java @@ -17,81 +17,86 @@ package org.apache.doris.avro; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; +import org.apache.doris.avro.AvroFileCache.AvroFileMeta; + import org.apache.avro.Schema; -import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapred.AvroWrapper; +import org.apache.avro.mapred.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.io.InputStream; +import java.net.URI; import java.util.Objects; -public class S3FileReader implements AvroReader { +public class S3FileReader extends AvroReader { private static final Logger LOG = LogManager.getLogger(S3FileReader.class); private final String bucketName; private final String key; - private AmazonS3 s3Client; - private DataFileStream reader; - private InputStream s3ObjectInputStream; - private final AWSCredentials credentials; + private AvroWrapper> inputPair; private final String endpoint; private final String region; + private final String accessKey; + private final String secretKey; + private final String s3aUri; public S3FileReader(String accessKey, String secretKey, String endpoint, String region, String uri) throws IOException { this.endpoint = endpoint; this.region = region; - this.credentials = new BasicAWSCredentials(accessKey, secretKey); S3Utils.parseURI(uri); this.bucketName = S3Utils.getBucket(); this.key = S3Utils.getKey(); + this.accessKey = accessKey; + this.secretKey = secretKey; + this.s3aUri = "s3a://" + bucketName + "/" + key; } @Override - public void open(Configuration conf) throws IOException { - s3Client = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(credentials)) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, region)) - .build(); - S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, key)); - s3ObjectInputStream = object.getObjectContent(); - reader = new DataFileStream<>(s3ObjectInputStream, new GenericDatumReader<>()); + public void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOException { + Configuration conf = new Configuration(); + conf.set(AvroProperties.FS_S3A_ACCESS_KEY, accessKey); + conf.set(AvroProperties.FS_S3A_SECRET_KEY, secretKey); + conf.set(AvroProperties.FS_S3A_ENDPOINT, endpoint); + conf.set(AvroProperties.FS_S3A_REGION, region); + path = new Path(s3aUri); + fileSystem = FileSystem.get(URI.create(s3aUri), conf); + if (tableSchema) { + openSchemaReader(); + } else { + openDataReader(avroFileMeta); + } } @Override public Schema getSchema() { - return reader.getSchema(); + return schemaReader.getSchema(); } @Override - public boolean hasNext() { - return reader.hasNext(); + public boolean hasNext(AvroWrapper> inputPair, NullWritable ignore) throws IOException { + this.inputPair = inputPair; + return dataReader.next(this.inputPair, ignore); } @Override - public Object getNext() throws IOException { - return reader.next(); + public Object getNext() { + return inputPair.datum(); } @Override public void close() throws IOException { - if (Objects.nonNull(s3ObjectInputStream)) { - s3ObjectInputStream.close(); + if (Objects.nonNull(schemaReader)) { + schemaReader.close(); } - if (Objects.nonNull(reader)) { - reader.close(); + if (Objects.nonNull(dataReader)) { + dataReader.close(); } + fileSystem.close(); } } diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out index 8f39bd410c9e67..d4205d988d0554 100644 --- a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out +++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out @@ -49,6 +49,18 @@ B [{"Key11":1}, {"Key22":0}] [{"arrayMapKey1":0}, {"arrayMapKey2":1}] +-- !10 -- +[{"Key11":1}, {"Key22":0}] false {"k1":1, "k2":2} 9999 \N +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] true {"key1":1, "key2":2} 3400 \N + +-- !11 -- +a test string 9.81 A +string test 9.1102 B + +-- !12 -- +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"k1":1, "k2":2} 2.11 +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"key1":1, "key2":2} 3.14 + -- !3 -- aBoolean BOOLEAN Yes false \N NONE aInt INT Yes false \N NONE @@ -71,3 +83,15 @@ true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5 -- !4 -- 2 +-- !13 -- +[{"Key11":1}, {"Key22":0}] false {"k1":1, "k2":2} 9999 \N +[{"arrayMapKey1":0}, {"arrayMapKey2":1}] true {"key1":1, "key2":2} 3400 \N + +-- !14 -- +a test string 9.81 A +string test 9.1102 B + +-- !15 -- +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"k1":1, "k2":2} 2.11 +{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} {"key1":1, "key2":2} 3.14 + diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy index 6f9b4f98b49c42..e6a54771582dde 100644 --- a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy @@ -126,6 +126,33 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "FORMAT" = "${format}"); """ + order_qt_10 """ + select arrayMapBoolean,aBoolean,aMap,aLong,aUnion from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_11 """ + select aString,aDouble,anEnum from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + + order_qt_12 """ + select aRecord,aMap,aFloat from s3( + "uri" = "${s3Uri}", + "ACCESS_KEY" = "${ak}", + "SECRET_KEY" = "${sk}", + "REGION" = "${region}", + "FORMAT" = "${format}"); + """ + // TVF hdfs() String enabled = context.config.otherConfigs.get("enableHiveTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { @@ -148,6 +175,24 @@ suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") { "fs.defaultFS" = "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}"); """ + + order_qt_13 """ select arrayMapBoolean,aBoolean,aMap,aLong,aUnion from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" + + order_qt_14 """ select aString,aDouble,anEnum from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" + + order_qt_15 """ select aRecord,aMap,aFloat from HDFS( + "uri" = "${hdfsUri}", + "fs.defaultFS" = "${defaultFS}", + "hadoop.username" = "${hdfsUserName}", + "format" = "${format}")""" } finally { } } From ef19de7cdee170f9e33733299bb03c9235b57912 Mon Sep 17 00:00:00 2001 From: wudongliang <46414265+DongLiang-0@users.noreply.github.com> Date: Tue, 19 Dec 2023 16:37:34 +0800 Subject: [PATCH 2/3] [Improve](tvf)jni-avro support split file (#27933) --- .../vec/exec/format/avro/avro_jni_reader.cpp | 4 +++ .../org/apache/doris/avro/AvroFileCache.java | 29 ++++++++++++------- .../org/apache/doris/avro/AvroJNIScanner.java | 8 +++++ .../org/apache/doris/avro/AvroProperties.java | 3 ++ .../org/apache/doris/avro/AvroReader.java | 5 +--- 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index bdec8f9fbe3523..dc1ea2183cfb0e 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader( if (type == TFileType::FILE_S3) { required_param.insert(_params.properties.begin(), _params.properties.end()); } + required_param.insert( + std::make_pair("split_start_offset", std::to_string(_range.start_offset))); + required_param.insert(std::make_pair("split_size", std::to_string(_range.size))); + required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size))); required_param.insert(std::make_pair("uri", _range.path)); _jni_connector = std::make_unique("org/apache/doris/avro/AvroJNIScanner", required_param, column_names); diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java index 0ffcb9bc99c089..bcec73b87be2a9 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java @@ -44,30 +44,37 @@ public static void invalidateFileCache(AvroFileCacheKey key) { public static class AvroFileMeta { private final String schema; private Set requiredFields; - // TODO split file - private String splitInfo; + private Long splitStartOffset; + private Long splitSize; AvroFileMeta(String schema) { this.schema = schema; } - AvroFileMeta(String schema, String splitInfo) { - this.schema = schema; - this.splitInfo = splitInfo; - } - public String getSchema() { return schema; } - public String getSplitInfo() { - return splitInfo; - } - public void setRequiredFields(Set requiredFields) { this.requiredFields = requiredFields; } + public void setSplitStartOffset(Long splitStartOffset) { + this.splitStartOffset = splitStartOffset; + } + + public void setSplitSize(Long splitSize) { + this.splitSize = splitSize; + } + + public Long getSplitStartOffset() { + return this.splitStartOffset; + } + + public Long getSplitSize() { + return this.splitSize; + } + public Set getRequiredFields() { return requiredFields; } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 9cfaa09262cc7b..a1b573cf5be8ed 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -74,6 +74,9 @@ public class AvroJNIScanner extends JniScanner { private AvroFileMeta avroFileMeta; private AvroWrapper> inputPair; private NullWritable ignore; + private Long splitStartOffset; + private Long splitSize; + private Long splitFileSize; /** * Call by JNI for get table data or get table schema @@ -100,6 +103,9 @@ public AvroJNIScanner(int fetchSize, Map requiredParams) { this.fieldInspectors = new ObjectInspector[requiredFields.length]; this.inputPair = new AvroWrapper<>(null); this.ignore = NullWritable.get(); + this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET)); + this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE)); + this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE)); } } @@ -171,6 +177,8 @@ private void initDataReader() { avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri); avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey); avroFileMeta.setRequiredFields(requiredFieldSet); + avroFileMeta.setSplitStartOffset(splitStartOffset); + avroFileMeta.setSplitSize(splitSize); initFieldInspector(); initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); } catch (Exception e) { diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java index 7c6cbbcbb4e567..416b7d25897f93 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java @@ -38,5 +38,8 @@ public class AvroProperties { protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; protected static final String FS_S3A_REGION = "fs.s3a.region"; + protected static final String SPLIT_START_OFFSET = "split_start_offset"; + protected static final String SPLIT_SIZE = "split_size"; + protected static final String SPLIT_FILE_SIZE = "split_file_size"; } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java index 9180cf617f7e4a..52a78188eabbf9 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java @@ -31,7 +31,6 @@ import org.apache.avro.mapred.AvroRecordReader; import org.apache.avro.mapred.AvroWrapper; import org.apache.avro.mapred.Pair; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -72,9 +71,7 @@ protected void openSchemaReader() throws IOException { protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException { JobConf job = new JobConf(); projectionSchema(job, avroFileMeta); - FileStatus fileStatus = fileSystem.getFileStatus(path); - // TODO split file - FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getLen(), job); + FileSplit fileSplit = new FileSplit(path, avroFileMeta.getSplitStartOffset(), avroFileMeta.getSplitSize(), job); dataReader = new AvroRecordReader<>(job, fileSplit); LOG.debug("success open avro data reader."); } From 2c637a95659f62230718d8501677dd729e646d5d Mon Sep 17 00:00:00 2001 From: wudongliang <46414265+DongLiang-0@users.noreply.github.com> Date: Wed, 20 Dec 2023 19:39:26 +0800 Subject: [PATCH 3/3] [fix](tvf)Fixed the avro-scanner projection pushdown failing to query on multiple BEs (#28709) --- .../org/apache/doris/avro/AvroFileCache.java | 125 ------------------ .../apache/doris/avro/AvroFileContext.java | 61 +++++++++ .../org/apache/doris/avro/AvroJNIScanner.java | 33 ++--- .../org/apache/doris/avro/AvroReader.java | 45 ++++--- .../org/apache/doris/avro/HDFSFileReader.java | 12 +- .../org/apache/doris/avro/S3FileReader.java | 12 +- 6 files changed, 107 insertions(+), 181 deletions(-) delete mode 100644 fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java create mode 100644 fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java deleted file mode 100644 index bcec73b87be2a9..00000000000000 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java +++ /dev/null @@ -1,125 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.avro; - -import com.google.common.base.Objects; -import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Set; - -public class AvroFileCache { - private static final Logger LOG = LoggerFactory.getLogger(AvroFileCache.class); - private static final Map fileCache = Maps.newHashMap(); - - public static void addFileMeta(AvroFileCacheKey avroFileCacheKey, AvroFileMeta avroFileMeta) { - fileCache.put(avroFileCacheKey, avroFileMeta); - } - - public static AvroFileMeta getAvroFileMeta(AvroFileCacheKey key) { - return fileCache.get(key); - } - - public static void invalidateFileCache(AvroFileCacheKey key) { - fileCache.remove(key); - } - - public static class AvroFileMeta { - private final String schema; - private Set requiredFields; - private Long splitStartOffset; - private Long splitSize; - - AvroFileMeta(String schema) { - this.schema = schema; - } - - public String getSchema() { - return schema; - } - - public void setRequiredFields(Set requiredFields) { - this.requiredFields = requiredFields; - } - - public void setSplitStartOffset(Long splitStartOffset) { - this.splitStartOffset = splitStartOffset; - } - - public void setSplitSize(Long splitSize) { - this.splitSize = splitSize; - } - - public Long getSplitStartOffset() { - return this.splitStartOffset; - } - - public Long getSplitSize() { - return this.splitSize; - } - - public Set getRequiredFields() { - return requiredFields; - } - } - - protected static class AvroFileCacheKey { - private final String fileType; - private final String uri; - - AvroFileCacheKey(String fileType, String uri) { - this.fileType = fileType; - this.uri = uri; - } - - protected String getUri() { - return uri; - } - - protected String getFileType() { - return fileType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AvroFileCacheKey that = (AvroFileCacheKey) o; - return Objects.equal(fileType, that.fileType) && Objects.equal(uri, that.uri); - } - - @Override - public int hashCode() { - return Objects.hashCode(fileType, uri); - } - - @Override - public String toString() { - return "AvroFileCacheKey{" - + "fileType='" + fileType + '\'' - + ", uri='" + uri + '\'' - + '}'; - } - } -} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java new file mode 100644 index 00000000000000..3e3264e62919cd --- /dev/null +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileContext.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.avro.Schema; + +import java.util.Set; + +public class AvroFileContext { + private Schema schema; + private Set requiredFields; + private Long splitStartOffset; + private Long splitSize; + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public Schema getSchema() { + return schema; + } + + public void setRequiredFields(Set requiredFields) { + this.requiredFields = requiredFields; + } + + public void setSplitStartOffset(Long splitStartOffset) { + this.splitStartOffset = splitStartOffset; + } + + public void setSplitSize(Long splitSize) { + this.splitSize = splitSize; + } + + public Long getSplitStartOffset() { + return this.splitStartOffset; + } + + public Long getSplitSize() { + return this.splitSize; + } + + public Set getRequiredFields() { + return requiredFields; + } +} diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index a1b573cf5be8ed..17a185d03ae138 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -17,8 +17,6 @@ package org.apache.doris.avro; -import org.apache.doris.avro.AvroFileCache.AvroFileCacheKey; -import org.apache.doris.avro.AvroFileCache.AvroFileMeta; import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; @@ -70,8 +68,7 @@ public class AvroJNIScanner extends JniScanner { private StructField[] structFields; private ObjectInspector[] fieldInspectors; private String serde; - private AvroFileCacheKey avroFileCacheKey; - private AvroFileMeta avroFileMeta; + private AvroFileContext avroFileContext; private AvroWrapper> inputPair; private NullWritable ignore; private Long splitStartOffset; @@ -169,26 +166,27 @@ public void open() throws IOException { if (!isGetTableSchema) { initDataReader(); } - this.avroReader.open(avroFileMeta, isGetTableSchema); + this.avroReader.open(avroFileContext, isGetTableSchema); } private void initDataReader() { try { - avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri); - avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey); - avroFileMeta.setRequiredFields(requiredFieldSet); - avroFileMeta.setSplitStartOffset(splitStartOffset); - avroFileMeta.setSplitSize(splitSize); + initAvroFileContext(); initFieldInspector(); initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); } catch (Exception e) { LOG.warn("Failed to init avro scanner. ", e); throw new RuntimeException(e); - } finally { - AvroFileCache.invalidateFileCache(avroFileCacheKey); } } + private void initAvroFileContext() { + avroFileContext = new AvroFileContext(); + avroFileContext.setRequiredFields(requiredFieldSet); + avroFileContext.setSplitStartOffset(splitStartOffset); + avroFileContext.setSplitSize(splitSize); + } + @Override public void close() throws IOException { if (Objects.nonNull(avroReader)) { @@ -220,18 +218,7 @@ protected int getNext() throws IOException { @Override protected TableSchema parseTableSchema() throws UnsupportedOperationException { Schema schema = avroReader.getSchema(); - addFileMeta2Cache(schema); return AvroTypeUtils.parseTableSchema(schema); } - /** - * Cache avro file metadata in order to push down the projection of the actual read data. - * - * @param schema avro file schema - */ - private void addFileMeta2Cache(Schema schema) { - AvroFileMeta avroFileMeta = new AvroFileMeta(schema.toString()); - AvroFileCacheKey avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri); - AvroFileCache.addFileMeta(avroFileCacheKey, avroFileMeta); - } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java index 52a78188eabbf9..50b647361bf9ff 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java @@ -17,8 +17,6 @@ package org.apache.doris.avro; -import org.apache.doris.avro.AvroFileCache.AvroFileMeta; - import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; @@ -52,7 +50,7 @@ public abstract class AvroReader { protected Path path; protected FileSystem fileSystem; - public abstract void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOException; + public abstract void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException; public abstract Schema getSchema(); @@ -68,29 +66,38 @@ protected void openSchemaReader() throws IOException { LOG.debug("success open avro schema reader."); } - protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException { + protected void openDataReader(AvroFileContext avroFileContext) throws IOException { JobConf job = new JobConf(); - projectionSchema(job, avroFileMeta); - FileSplit fileSplit = new FileSplit(path, avroFileMeta.getSplitStartOffset(), avroFileMeta.getSplitSize(), job); + projectionSchema(job, avroFileContext); + FileSplit fileSplit = + new FileSplit(path, avroFileContext.getSplitStartOffset(), avroFileContext.getSplitSize(), job); dataReader = new AvroRecordReader<>(job, fileSplit); LOG.debug("success open avro data reader."); } - protected void projectionSchema(JobConf job, AvroFileMeta avroFileMeta) { - Set filedNames = avroFileMeta.getRequiredFields(); - JsonObject schemaJson = new Gson().fromJson(avroFileMeta.getSchema(), JsonObject.class); - JsonObject copySchemaJson = schemaJson.deepCopy(); - JsonArray schemaFields = schemaJson.get("fields").getAsJsonArray(); - JsonArray copySchemaFields = copySchemaJson.get("fields").getAsJsonArray(); - for (int i = 0; i < schemaFields.size(); i++) { - JsonObject object = schemaFields.get(i).getAsJsonObject(); - String name = object.get("name").getAsString(); - if (filedNames.contains(name)) { - continue; + protected void projectionSchema(JobConf job, AvroFileContext avroFileContext) { + Schema projectionSchema; + Set filedNames = avroFileContext.getRequiredFields(); + Schema avroSchema = avroFileContext.getSchema(); + // The number of fields that need to be queried is the same as that of the avro file, + // so no projection is required. + if (filedNames.size() != avroSchema.getFields().size()) { + JsonObject schemaJson = new Gson().fromJson(avroSchema.toString(), JsonObject.class); + JsonArray schemaFields = schemaJson.get("fields").getAsJsonArray(); + JsonObject copySchemaJson = schemaJson.deepCopy(); + JsonArray copySchemaFields = copySchemaJson.get("fields").getAsJsonArray(); + for (int i = 0; i < schemaFields.size(); i++) { + JsonObject object = schemaFields.get(i).getAsJsonObject(); + String name = object.get("name").getAsString(); + if (filedNames.contains(name)) { + continue; + } + copySchemaFields.remove(schemaFields.get(i)); } - copySchemaFields.remove(schemaFields.get(i)); + projectionSchema = new Parser().parse(copySchemaJson.toString()); + } else { + projectionSchema = avroSchema; } - Schema projectionSchema = new Parser().parse(copySchemaJson.toString()); AvroJob.setInputSchema(job, projectionSchema); LOG.debug("projection avro schema is:" + projectionSchema.toString()); } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java index 7d6fc40eb13dee..be9f355912a930 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java @@ -17,8 +17,6 @@ package org.apache.doris.avro; -import org.apache.doris.avro.AvroFileCache.AvroFileMeta; - import org.apache.avro.Schema; import org.apache.avro.mapred.AvroWrapper; import org.apache.avro.mapred.Pair; @@ -44,12 +42,12 @@ public HDFSFileReader(String url) { } @Override - public void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOException { + public void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException { fileSystem = FileSystem.get(URI.create(url), new Configuration()); - if (tableSchema) { - openSchemaReader(); - } else { - openDataReader(avroFileMeta); + openSchemaReader(); + if (!tableSchema) { + avroFileContext.setSchema(schemaReader.getSchema()); + openDataReader(avroFileContext); } } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java index d3bbac9e991caa..185fc5a31070aa 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java @@ -17,8 +17,6 @@ package org.apache.doris.avro; -import org.apache.doris.avro.AvroFileCache.AvroFileMeta; - import org.apache.avro.Schema; import org.apache.avro.mapred.AvroWrapper; import org.apache.avro.mapred.Pair; @@ -58,7 +56,7 @@ public S3FileReader(String accessKey, String secretKey, String endpoint, String } @Override - public void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOException { + public void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException { Configuration conf = new Configuration(); conf.set(AvroProperties.FS_S3A_ACCESS_KEY, accessKey); conf.set(AvroProperties.FS_S3A_SECRET_KEY, secretKey); @@ -66,10 +64,10 @@ public void open(AvroFileMeta avroFileMeta, boolean tableSchema) throws IOExcept conf.set(AvroProperties.FS_S3A_REGION, region); path = new Path(s3aUri); fileSystem = FileSystem.get(URI.create(s3aUri), conf); - if (tableSchema) { - openSchemaReader(); - } else { - openDataReader(avroFileMeta); + openSchemaReader(); + if (!tableSchema) { + avroFileContext.setSchema(schemaReader.getSchema()); + openDataReader(avroFileContext); } }