Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions extensions-core/avro-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
<schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version>
<avro.version>1.8.2</avro.version>
<pig.version>0.15.0</pig.version>
</properties>

<repositories>
Expand Down Expand Up @@ -160,51 +159,6 @@
<version>2.2.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>${pig.version}</version>
<classifier>h2</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>piggybank</artifactId>
<version>${pig.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,16 @@
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final boolean fromPigAvroStorage;
private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;

@JsonCreator
public AvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
@JsonProperty("parseSpec") ParseSpec parseSpec
)
{
this.parseSpec = parseSpec;
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.mapParser = new MapInputRowParser(parseSpec);
}

Expand All @@ -62,15 +59,9 @@ public ParseSpec getParseSpec()
return parseSpec;
}

@JsonProperty
public boolean isFromPigAvroStorage()
{
return fromPigAvroStorage;
}

@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
return new AvroHadoopInputRowParser(parseSpec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public AvroStreamInputRowParser(
{
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false);
this.mapParser = new MapInputRowParser(parseSpec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

package org.apache.druid.data.input.avro;

import com.google.common.collect.Lists;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -41,7 +39,7 @@

public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
{
static final Configuration JSONPATH_CONFIGURATION =
private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder()
.jsonProvider(new GenericAvroJsonProvider())
.mappingProvider(new NotImplementedMappingProvider())
Expand All @@ -57,17 +55,17 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
Schema.Type.DOUBLE
);

public static boolean isPrimitive(Schema schema)
private static boolean isPrimitive(Schema schema)
{
return ROOT_TYPES.contains(schema.getType());
}

public static boolean isPrimitiveArray(Schema schema)
private static boolean isPrimitiveArray(Schema schema)
{
return schema.getType().equals(Schema.Type.ARRAY) && isPrimitive(schema.getElementType());
}

public static boolean isOptionalPrimitive(Schema schema)
private static boolean isOptionalPrimitive(Schema schema)
{
return schema.getType().equals(Schema.Type.UNION) &&
schema.getTypes().size() == 2 &&
Expand All @@ -79,20 +77,21 @@ public static boolean isOptionalPrimitive(Schema schema)
);
}

static boolean isFieldPrimitive(Schema.Field field)
private static boolean isFieldPrimitive(Schema.Field field)
{
return isPrimitive(field.schema()) ||
isPrimitiveArray(field.schema()) ||
isOptionalPrimitive(field.schema());
}


private final boolean fromPigAvroStorage;
private final boolean binaryAsString;

public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString)
/**
* @param binaryAsString boolean to encode the byte[] as a string.
*/
public AvroFlattenerMaker(final boolean binaryAsString)
{
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
}

Expand Down Expand Up @@ -128,21 +127,16 @@ public Function<GenericRecord, Object> makeJsonQueryExtractor(final String expr)

private Object transformValue(final Object field)
{
if (fromPigAvroStorage && field instanceof GenericData.Array) {
return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
}
if (field instanceof ByteBuffer) {
if (binaryAsString) {
return StringUtils.fromUtf8(((ByteBuffer) field).array());
} else {
return ((ByteBuffer) field).array();
}
}
if (field instanceof Utf8) {
} else if (field instanceof Utf8) {
return field.toString();
}
if (field instanceof List) {
return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
} else if (field instanceof List) {
return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
}
return field;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,17 @@ private AvroParsers()

public static ObjectFlattener<GenericRecord> makeFlattener(
final ParseSpec parseSpec,
final boolean fromPigAvroStorage,
final boolean binaryAsString
)
{
final JSONPathSpec flattenSpec;
if (parseSpec != null && (parseSpec instanceof AvroParseSpec)) {
if (parseSpec instanceof AvroParseSpec) {
flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec();
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}

return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString));
return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString));
}

public static List<InputRow> parseGenericRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;

import javax.annotation.Nullable;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -139,6 +141,7 @@ public void setArrayIndex(final Object o, final int i, final Object o1)
}
}

@Nullable
@Override
public Object getMapValue(final Object o, final String s)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.parsers.ParseException;

import java.nio.ByteBuffer;
import java.util.Objects;

public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
{
Expand Down Expand Up @@ -83,7 +84,7 @@ public boolean equals(Object o)

SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;

return registry != null ? registry.equals(that.registry) : that.registry == null;
return Objects.equals(registry, that.registry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Objects;

public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDecoder
{
Expand Down Expand Up @@ -107,14 +108,10 @@ public boolean equals(Object o)

SchemaRepoBasedAvroBytesDecoder<?, ?> that = (SchemaRepoBasedAvroBytesDecoder<?, ?>) o;

if (subjectAndIdConverter != null
? !subjectAndIdConverter.equals(that.subjectAndIdConverter)
: that.subjectAndIdConverter != null) {
if (!Objects.equals(subjectAndIdConverter, that.subjectAndIdConverter)) {
return false;
}
return !(schemaRepository != null
? !schemaRepository.equals(that.schemaRepository)
: that.schemaRepository != null);
return Objects.equals(schemaRepository, that.schemaRepository);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.schemarepo.client.Avro1124RESTRepositoryClient;

import java.util.Objects;

public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient
{
private final String url;
Expand Down Expand Up @@ -60,7 +62,7 @@ public boolean equals(Object o)

Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o;

return !(url != null ? !url.equals(that.url) : that.url != null);
return Objects.equals(url, that.url);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.schemarepo.api.converter.IntegerConverter;

import java.nio.ByteBuffer;
import java.util.Objects;

/**
* This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro
Expand Down Expand Up @@ -88,7 +89,7 @@ public boolean equals(Object o)

Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o;

return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null);
return Objects.equals(topic, converter.topic);

}

Expand Down
Loading