Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader(
if (type == TFileType::FILE_S3) {
required_param.insert(_params.properties.begin(), _params.properties.end());
}
required_param.insert(
std::make_pair("split_start_offset", std::to_string(_range.start_offset)));
required_param.insert(std::make_pair("split_size", std::to_string(_range.size)));
required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size)));
required_param.insert(std::make_pair("uri", _range.path));
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param, column_names);
Expand Down
38 changes: 35 additions & 3 deletions fe/be-java-extensions/avro-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ under the License.
</properties>

<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
Expand All @@ -53,12 +58,39 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
<artifactId>hadoop-aws</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>com.amazonaws</groupId>
</exclusion>
<exclusion>
<artifactId>aws-java-sdk-bundle</artifactId>
<groupId>com.amazonaws</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.avro;

import org.apache.avro.Schema;

import java.util.Set;

public class AvroFileContext {
private Schema schema;
private Set<String> requiredFields;
private Long splitStartOffset;
private Long splitSize;

public void setSchema(Schema schema) {
this.schema = schema;
}

public Schema getSchema() {
return schema;
}

public void setRequiredFields(Set<String> requiredFields) {
this.requiredFields = requiredFields;
}

public void setSplitStartOffset(Long splitStartOffset) {
this.splitStartOffset = splitStartOffset;
}

public void setSplitSize(Long splitSize) {
this.splitSize = splitSize;
}

public Long getSplitStartOffset() {
return this.splitStartOffset;
}

public Long getSplitSize() {
return this.splitSize;
}

public Set<String> getRequiredFields() {
return requiredFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.serde.serdeConstants;
Expand All @@ -33,14 +35,17 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class AvroJNIScanner extends JniScanner {
Expand All @@ -50,9 +55,11 @@ public class AvroJNIScanner extends JniScanner {
private final String uri;
private final Map<String, String> requiredParams;
private final Integer fetchSize;
private final ClassLoader classLoader;
private int[] requiredColumnIds;
private String[] columnTypes;
private String[] requiredFields;
private Set<String> requiredFieldSet;
private ColumnType[] requiredTypes;
private AvroReader avroReader;
private final boolean isGetTableSchema;
Expand All @@ -61,6 +68,12 @@ public class AvroJNIScanner extends JniScanner {
private StructField[] structFields;
private ObjectInspector[] fieldInspectors;
private String serde;
private AvroFileContext avroFileContext;
private AvroWrapper<Pair<Integer, Long>> inputPair;
private NullWritable ignore;
private Long splitStartOffset;
private Long splitSize;
private Long splitFileSize;

/**
* Call by JNI for get table data or get table schema
Expand All @@ -69,6 +82,7 @@ public class AvroJNIScanner extends JniScanner {
* @param requiredParams required params
*/
public AvroJNIScanner(int fetchSize, Map<String, String> requiredParams) {
this.classLoader = this.getClass().getClassLoader();
this.requiredParams = requiredParams;
this.fetchSize = fetchSize;
this.isGetTableSchema = Boolean.parseBoolean(requiredParams.get(AvroProperties.IS_GET_TABLE_SCHEMA));
Expand All @@ -79,14 +93,20 @@ public AvroJNIScanner(int fetchSize, Map<String, String> requiredParams) {
.split(AvroProperties.COLUMNS_TYPE_DELIMITER);
this.requiredFields = requiredParams.get(AvroProperties.REQUIRED_FIELDS)
.split(AvroProperties.FIELDS_DELIMITER);
this.requiredFieldSet = new HashSet<>(Arrays.asList(requiredFields));
this.requiredTypes = new ColumnType[requiredFields.length];
this.serde = requiredParams.get(AvroProperties.HIVE_SERDE);
this.structFields = new StructField[requiredFields.length];
this.fieldInspectors = new ObjectInspector[requiredFields.length];
this.inputPair = new AvroWrapper<>(null);
this.ignore = NullWritable.get();
this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET));
this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE));
this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE));
}
}

private void init() throws Exception {
private void initFieldInspector() throws Exception {
requiredColumnIds = new int[requiredFields.length];
for (int i = 0; i < requiredFields.length; i++) {
ColumnType columnType = ColumnType.parseType(requiredFields[i], columnTypes[i]);
Expand Down Expand Up @@ -127,14 +147,7 @@ private Deserializer getDeserializer(Configuration configuration, Properties pro

@Override
public void open() throws IOException {
try {
if (!isGetTableSchema) {
init();
}
} catch (Exception e) {
LOG.warn("Failed to init avro scanner. ", e);
throw new IOException(e);
}
Thread.currentThread().setContextClassLoader(classLoader);
switch (fileType) {
case FILE_HDFS:
this.avroReader = new HDFSFileReader(uri);
Expand All @@ -150,12 +163,30 @@ public void open() throws IOException {
LOG.warn("Unsupported " + fileType.name() + " file type.");
throw new IOException("Unsupported " + fileType.name() + " file type.");
}
this.avroReader.open(new Configuration());
if (!isGetTableSchema) {
initDataReader();
}
this.avroReader.open(avroFileContext, isGetTableSchema);
}

private void initDataReader() {
try {
initAvroFileContext();
initFieldInspector();
initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize);
} catch (Exception e) {
LOG.warn("Failed to init avro scanner. ", e);
throw new RuntimeException(e);
}
}

private void initAvroFileContext() {
avroFileContext = new AvroFileContext();
avroFileContext.setRequiredFields(requiredFieldSet);
avroFileContext.setSplitStartOffset(splitStartOffset);
avroFileContext.setSplitSize(splitSize);
}

@Override
public void close() throws IOException {
if (Objects.nonNull(avroReader)) {
Expand All @@ -167,7 +198,7 @@ public void close() throws IOException {
protected int getNext() throws IOException {
int numRows = 0;
for (; numRows < getBatchSize(); numRows++) {
if (!avroReader.hasNext()) {
if (!avroReader.hasNext(inputPair, ignore)) {
break;
}
GenericRecord rowRecord = (GenericRecord) avroReader.getNext();
Expand All @@ -189,4 +220,5 @@ protected TableSchema parseTableSchema() throws UnsupportedOperationException {
Schema schema = avroReader.getSchema();
return AvroTypeUtils.parseTableSchema(schema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,12 @@ public class AvroProperties {
protected static final String HIVE_SERDE = "hive.serde";
protected static final String COLUMNS = "columns";
protected static final String COLUMNS2TYPES = "columns.types";
protected static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
protected static final String FS_S3A_REGION = "fs.s3a.region";
protected static final String SPLIT_START_OFFSET = "split_start_offset";
protected static final String SPLIT_SIZE = "split_size";
protected static final String SPLIT_FILE_SIZE = "split_file_size";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,89 @@

package org.apache.doris.avro;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.avro.Schema.Parser;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroRecordReader;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;

public interface AvroReader {
public abstract class AvroReader {

void open(Configuration conf) throws IOException;
private static final Logger LOG = LogManager.getLogger(AvroReader.class);
protected AvroRecordReader<Pair<Integer, Long>> dataReader;
protected DataFileStream<GenericRecord> schemaReader;
protected Path path;
protected FileSystem fileSystem;

Schema getSchema();
public abstract void open(AvroFileContext avroFileContext, boolean tableSchema) throws IOException;

boolean hasNext();
public abstract Schema getSchema();

Object getNext() throws IOException;
public abstract boolean hasNext(AvroWrapper<Pair<Integer, Long>> inputPair, NullWritable ignore) throws IOException;

void close() throws IOException;
public abstract Object getNext();

public abstract void close() throws IOException;

protected void openSchemaReader() throws IOException {
InputStream inputStream = new BufferedInputStream(fileSystem.open(path));
schemaReader = new DataFileStream<>(inputStream, new GenericDatumReader<>());
LOG.debug("success open avro schema reader.");
}

protected void openDataReader(AvroFileContext avroFileContext) throws IOException {
JobConf job = new JobConf();
projectionSchema(job, avroFileContext);
FileSplit fileSplit =
new FileSplit(path, avroFileContext.getSplitStartOffset(), avroFileContext.getSplitSize(), job);
dataReader = new AvroRecordReader<>(job, fileSplit);
LOG.debug("success open avro data reader.");
}

protected void projectionSchema(JobConf job, AvroFileContext avroFileContext) {
Schema projectionSchema;
Set<String> filedNames = avroFileContext.getRequiredFields();
Schema avroSchema = avroFileContext.getSchema();
// The number of fields that need to be queried is the same as that of the avro file,
// so no projection is required.
if (filedNames.size() != avroSchema.getFields().size()) {
JsonObject schemaJson = new Gson().fromJson(avroSchema.toString(), JsonObject.class);
JsonArray schemaFields = schemaJson.get("fields").getAsJsonArray();
JsonObject copySchemaJson = schemaJson.deepCopy();
JsonArray copySchemaFields = copySchemaJson.get("fields").getAsJsonArray();
for (int i = 0; i < schemaFields.size(); i++) {
JsonObject object = schemaFields.get(i).getAsJsonObject();
String name = object.get("name").getAsString();
if (filedNames.contains(name)) {
continue;
}
copySchemaFields.remove(schemaFields.get(i));
}
projectionSchema = new Parser().parse(copySchemaJson.toString());
} else {
projectionSchema = avroSchema;
}
AvroJob.setInputSchema(job, projectionSchema);
LOG.debug("projection avro schema is:" + projectionSchema.toString());
}

}
Loading