From 8a3106134aa5aecc5c1fc92ad2a06f59afffd6be Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 31 May 2019 22:25:09 +0200 Subject: [PATCH 1/6] Remove Apache Pig from the tests --- extensions-core/avro-extensions/pom.xml | 46 -------- .../data/input/AvroHadoopInputRowParser.java | 15 +-- .../data/input/AvroStreamInputRowParser.java | 2 +- .../data/input/avro/AvroFlattenerMaker.java | 12 +- .../druid/data/input/avro/AvroParsers.java | 5 +- .../SchemaRegistryBasedAvroBytesDecoder.java | 3 +- .../avro/SchemaRepoBasedAvroBytesDecoder.java | 9 +- .../Avro1124RESTRepositoryClientWrapper.java | 4 +- .../Avro1124SubjectAndIdConverter.java | 3 +- .../input/AvroHadoopInputRowParserTest.java | 105 +++++------------- .../input/AvroStreamInputRowParserTest.java | 90 +++++++-------- .../avro/ParquetAvroHadoopInputRowParser.java | 4 +- 12 files changed, 97 insertions(+), 201 deletions(-) diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml index aa1801834419..e8393995c37e 100644 --- a/extensions-core/avro-extensions/pom.xml +++ b/extensions-core/avro-extensions/pom.xml @@ -38,7 +38,6 @@ 0.1.3 3.0.1 1.8.2 - 0.15.0 @@ -160,51 +159,6 @@ 2.2.10 test - - org.apache.pig - pig - ${pig.version} - h2 - test - - - org.apache.avro - avro - - - commons-net - commons-net - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - - - org.apache.pig - piggybank - ${pig.version} - test - - - log4j - log4j - - - org.apache.pig - pig - - - org.apache.avro - avro - - - org.apache.druid druid-processing diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java index 39ce48ec2b5e..7d7c8eb8bc3c 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java @@ -33,19 +33,16 @@ public class AvroHadoopInputRowParser implements InputRowParser { private final ParseSpec parseSpec; - private final boolean fromPigAvroStorage; private final ObjectFlattener avroFlattener; private final MapInputRowParser mapParser; @JsonCreator public AvroHadoopInputRowParser( - @JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage + @JsonProperty("parseSpec") ParseSpec parseSpec ) { this.parseSpec = parseSpec; - this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; - this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false); + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false); this.mapParser = new MapInputRowParser(parseSpec); } @@ -62,15 +59,9 @@ public ParseSpec getParseSpec() return parseSpec; } - @JsonProperty - public boolean isFromPigAvroStorage() - { - return fromPigAvroStorage; - } - @Override public InputRowParser withParseSpec(ParseSpec parseSpec) { - return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage); + return new AvroHadoopInputRowParser(parseSpec); } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java index f375c63b8b23..7dddf2387489 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java @@ -48,7 +48,7 @@ public AvroStreamInputRowParser( { this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); - this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false); this.mapParser = new MapInputRowParser(parseSpec); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index 0b55d762f009..19326ed24e5c 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -41,7 +41,7 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker { - static final Configuration JSONPATH_CONFIGURATION = + private static final Configuration JSONPATH_CONFIGURATION = Configuration.builder() .jsonProvider(new GenericAvroJsonProvider()) .mappingProvider(new NotImplementedMappingProvider()) @@ -87,12 +87,10 @@ static boolean isFieldPrimitive(Schema.Field field) } - private final boolean fromPigAvroStorage; private final boolean binaryAsString; - public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString) + public AvroFlattenerMaker(final boolean binaryAsString) { - this.fromPigAvroStorage = fromPigAvroStorage; this.binaryAsString = binaryAsString; } @@ -128,8 +126,8 @@ public Function makeJsonQueryExtractor(final String expr) private Object transformValue(final Object field) { - if (fromPigAvroStorage && field instanceof GenericData.Array) { - return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0))); + if (field instanceof GenericData.Array) { + return Lists.transform((List) field, String::valueOf); } if (field instanceof ByteBuffer) { if (binaryAsString) { @@ -142,7 +140,7 @@ private Object transformValue(final Object field) return field.toString(); } if (field instanceof List) { - return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList()); + return ((List) field).stream().filter(Objects::nonNull).collect(Collectors.toList()); } return field; } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index 92ea3ae1bdaa..a8baa428d753 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -38,18 +38,17 @@ private AvroParsers() public static ObjectFlattener makeFlattener( final ParseSpec parseSpec, - final boolean fromPigAvroStorage, final boolean binaryAsString ) { final JSONPathSpec flattenSpec; - if (parseSpec != null && (parseSpec instanceof AvroParseSpec)) { + if (parseSpec instanceof AvroParseSpec) { flattenSpec = ((AvroParseSpec) parseSpec).getFlattenSpec(); } else { flattenSpec = JSONPathSpec.DEFAULT; } - return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(fromPigAvroStorage, binaryAsString)); + return ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(binaryAsString)); } public static List parseGenericRecord( diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 5b9d38695561..6ff97c41857a 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.parsers.ParseException; import java.nio.ByteBuffer; +import java.util.Objects; public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder { @@ -83,7 +84,7 @@ public boolean equals(Object o) SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; - return registry != null ? registry.equals(that.registry) : that.registry == null; + return Objects.equals(registry, that.registry); } @Override diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java index e7de115c7b44..326b58a29dab 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Objects; public class SchemaRepoBasedAvroBytesDecoder implements AvroBytesDecoder { @@ -107,14 +108,10 @@ public boolean equals(Object o) SchemaRepoBasedAvroBytesDecoder that = (SchemaRepoBasedAvroBytesDecoder) o; - if (subjectAndIdConverter != null - ? !subjectAndIdConverter.equals(that.subjectAndIdConverter) - : that.subjectAndIdConverter != null) { + if (!Objects.equals(subjectAndIdConverter, that.subjectAndIdConverter)) { return false; } - return !(schemaRepository != null - ? !schemaRepository.equals(that.schemaRepository) - : that.schemaRepository != null); + return Objects.equals(schemaRepository, that.schemaRepository); } @Override diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java index 0ba557932d61..e5c2541ef386 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.schemarepo.client.Avro1124RESTRepositoryClient; +import java.util.Objects; + public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient { private final String url; @@ -60,7 +62,7 @@ public boolean equals(Object o) Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o; - return !(url != null ? !url.equals(that.url) : that.url != null); + return Objects.equals(url, that.url); } @Override diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java index 82f89f6a1664..d2685f53900c 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java @@ -27,6 +27,7 @@ import org.schemarepo.api.converter.IntegerConverter; import java.nio.ByteBuffer; +import java.util.Objects; /** * This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro @@ -88,7 +89,7 @@ public boolean equals(Object o) Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o; - return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null); + return Objects.equals(topic, converter.topic); } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java index dc25e8a2930b..ed9dfa2994ec 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.io.Closeables; import com.google.common.io.Files; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; @@ -29,12 +28,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.commons.io.FileUtils; import org.apache.druid.data.input.avro.AvroExtensionsModule; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecJob; import org.junit.Before; import org.junit.Test; @@ -59,102 +53,59 @@ public void setUp() } @Test - public void testParseNotFromPigAvroStorage() throws IOException + public void testParseNotFromSpark() throws IOException { - testParse(buildSomeAvroDatum(), false); + testParse(buildSomeAvroDatum()); } @Test - public void testParseFromPiggyBankAvroStorage() throws IOException + public void testParseFromSpark() throws IOException { - testParse(buildPiggyBankAvro(), false); + testParse(buildAvroFromFile()); } - @Test - public void testParseFromPigAvroStorage() throws IOException + private void testParse(GenericRecord record) throws IOException { - testParse(buildPigAvro(), true); - } - - private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException - { - AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC, fromPigAvroStorage); + AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC); AvroHadoopInputRowParser parser2 = jsonMapper.readValue( jsonMapper.writeValueAsBytes(parser), AvroHadoopInputRowParser.class ); InputRow inputRow = parser2.parseBatch(record).get(0); - assertInputRowCorrect(inputRow, DIMENSIONS, fromPigAvroStorage); - } - - - public static GenericRecord buildPigAvro() throws IOException - { - return buildPigAvro(buildSomeAvroDatum(), "AvroStorage", "AvroStorage"); + assertInputRowCorrect(inputRow, DIMENSIONS); } - public static GenericRecord buildPiggyBankAvro() throws IOException + public static GenericRecord buildAvroFromFile() throws IOException { - return buildPigAvro( - buildSomeAvroDatum(), - "org.apache.pig.piggybank.storage.avro.AvroStorage", - "org.apache.pig.piggybank.storage.avro.AvroStorage('field7','{\"type\":\"map\",\"values\":\"int\"}','field8','{\"type\":\"map\",\"values\":\"string\"}')" + return buildAvroFromFile( + buildSomeAvroDatum() ); } - private static GenericRecord buildPigAvro(GenericRecord datum, String inputStorage, String outputStorage) + private static GenericRecord buildAvroFromFile(GenericRecord datum) throws IOException { final File tmpDir = Files.createTempDir(); - FileReader reader = null; - PigServer pigServer = null; - try { - // 0. write avro object into temp file. - File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro"); - DataFileWriter dataFileWriter = new DataFileWriter<>( - new SpecificDatumWriter<>() - ); + + // 0. write avro object into temp file. + File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro"); + try (DataFileWriter dataFileWriter = new DataFileWriter<>( + new SpecificDatumWriter<>() + )) { dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile); dataFileWriter.append(datum); - dataFileWriter.close(); - - // 1. read avro files into Pig - pigServer = new PigServer(ExecType.LOCAL); - pigServer.registerQuery( - StringUtils.format( - "A = LOAD '%s' USING %s;", - someAvroDatumFile, - inputStorage - ) - ); - - // 2. write new avro file using AvroStorage - File outputDir = new File(tmpDir, "output"); - ExecJob job = pigServer.store("A", String.valueOf(outputDir), outputStorage); - - while (!job.hasCompleted()) { - Thread.sleep(100); - } - - assert (job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); - - // 3. read avro object from AvroStorage - reader = DataFileReader.openReader( - new File(outputDir, "part-m-00000.avro"), - new GenericDatumReader() - ); - - return reader.next(); } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - finally { - if (pigServer != null) { - pigServer.shutdown(); - } - Closeables.close(reader, true); - FileUtils.deleteDirectory(tmpDir); + + final GenericRecord record; + // 3. read avro object from AvroStorage + try (FileReader reader = DataFileReader.openReader( + someAvroDatumFile, + new GenericDatumReader<>() + )) { + record = reader.next(); } + + return record; } + } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java index 184c8b247023..cadf7a96dd50 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java @@ -55,6 +55,7 @@ import org.schemarepo.api.converter.IdentityConverter; import org.schemarepo.api.converter.IntegerConverter; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -71,20 +72,20 @@ public class AvroStreamInputRowParserTest { - public static final String EVENT_TYPE = "eventType"; - public static final String ID = "id"; - public static final String SOME_OTHER_ID = "someOtherId"; - public static final String IS_VALID = "isValid"; - public static final String TOPIC = "aTopic"; - public static final String EVENT_TYPE_VALUE = "type-a"; - public static final long ID_VALUE = 1976491L; - public static final long SOME_OTHER_ID_VALUE = 6568719896L; - public static final float SOME_FLOAT_VALUE = 0.23555f; - public static final int SOME_INT_VALUE = 1; - public static final long SOME_LONG_VALUE = 679865987569912369L; - public static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC()); - public static final List DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); - public static final List DIMENSIONS_SCHEMALESS = Arrays.asList( + private static final String EVENT_TYPE = "eventType"; + private static final String ID = "id"; + private static final String SOME_OTHER_ID = "someOtherId"; + private static final String IS_VALID = "isValid"; + private static final String TOPIC = "aTopic"; + private static final String EVENT_TYPE_VALUE = "type-a"; + private static final long ID_VALUE = 1976491L; + private static final long SOME_OTHER_ID_VALUE = 6568719896L; + private static final float SOME_FLOAT_VALUE = 0.23555f; + private static final int SOME_INT_VALUE = 1; + private static final long SOME_LONG_VALUE = 679865987569912369L; + private static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30, ISOChronology.getInstanceUTC()); + static final List DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); + private static final List DIMENSIONS_SCHEMALESS = Arrays.asList( "nested", SOME_OTHER_ID, "someStringArray", @@ -98,7 +99,7 @@ public class AvroStreamInputRowParserTest "someInt", "timestamp" ); - public static final AvroParseSpec PARSE_SPEC = new AvroParseSpec( + static final AvroParseSpec PARSE_SPEC = new AvroParseSpec( new TimestampSpec("nested", "millis", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(DIMENSIONS), Collections.emptyList(), null), new JSONPathSpec( @@ -108,7 +109,7 @@ public class AvroStreamInputRowParserTest ) ) ); - public static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec( + private static final AvroParseSpec PARSE_SPEC_SCHEMALESS = new AvroParseSpec( new TimestampSpec("nested", "millis", null), new DimensionsSpec(null, null, null), new JSONPathSpec( @@ -118,19 +119,19 @@ public class AvroStreamInputRowParserTest ) ) ); - public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array()); + private static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array()); private static final long SUB_LONG_VALUE = 1543698L; private static final int SUB_INT_VALUE = 4892; - public static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder() + private static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder() .setSubInt(SUB_INT_VALUE) .setSubLong(SUB_LONG_VALUE) .build(); - public static final List SOME_STRING_ARRAY_VALUE = Arrays.asList((CharSequence) "8", "4", "2", "1"); - public static final List SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8); - public static final Map SOME_INT_VALUE_MAP_VALUE = Maps.asMap( - new HashSet(Arrays.asList("8", "2", "4", "1")), new Function() + private static final List SOME_STRING_ARRAY_VALUE = Arrays.asList("8", "4", "2", "1"); + private static final List SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8); + private static final Map SOME_INT_VALUE_MAP_VALUE = Maps.asMap( + new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function() { - @Nullable + @Nonnull @Override public Integer apply(@Nullable CharSequence input) { @@ -138,10 +139,10 @@ public Integer apply(@Nullable CharSequence input) } } ); - public static final Map SOME_STRING_VALUE_MAP_VALUE = Maps.asMap( - new HashSet(Arrays.asList("8", "2", "4", "1")), new Function() + private static final Map SOME_STRING_VALUE_MAP_VALUE = Maps.asMap( + new HashSet<>(Arrays.asList("8", "2", "4", "1")), new Function() { - @Nullable + @Nonnull @Override public CharSequence apply(@Nullable CharSequence input) { @@ -149,8 +150,8 @@ public CharSequence apply(@Nullable CharSequence input) } } ); - public static final String SOME_UNION_VALUE = "string as union"; - public static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8); + private static final String SOME_UNION_VALUE = "string as union"; + private static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8); private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]"); @@ -173,7 +174,7 @@ public void testSerde() throws IOException Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); AvroStreamInputRowParser parser = new AvroStreamInputRowParser( PARSE_SPEC, - new SchemaRepoBasedAvroBytesDecoder(new Avro1124SubjectAndIdConverter(TOPIC), repository) + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository) ); ByteBufferInputRowParser parser2 = jsonMapper.readValue( jsonMapper.writeValueAsString(parser), @@ -190,7 +191,7 @@ public void testParse() throws SchemaValidationException, IOException Repository repository = new InMemoryRepository(null); AvroStreamInputRowParser parser = new AvroStreamInputRowParser( PARSE_SPEC, - new SchemaRepoBasedAvroBytesDecoder(new Avro1124SubjectAndIdConverter(TOPIC), repository) + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository) ); ByteBufferInputRowParser parser2 = jsonMapper.readValue( jsonMapper.writeValueAsString(parser), @@ -221,7 +222,7 @@ public void testParse() throws SchemaValidationException, IOException InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - assertInputRowCorrect(inputRow, DIMENSIONS, false); + assertInputRowCorrect(inputRow, DIMENSIONS); } @Test @@ -231,7 +232,7 @@ public void testParseSchemaless() throws SchemaValidationException, IOException Repository repository = new InMemoryRepository(null); AvroStreamInputRowParser parser = new AvroStreamInputRowParser( PARSE_SPEC_SCHEMALESS, - new SchemaRepoBasedAvroBytesDecoder(new Avro1124SubjectAndIdConverter(TOPIC), repository) + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository) ); ByteBufferInputRowParser parser2 = jsonMapper.readValue( jsonMapper.writeValueAsString(parser), @@ -244,7 +245,7 @@ public void testParseSchemaless() throws SchemaValidationException, IOException // encode schema id Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); - TypedSchemaRepository repositoryClient = new TypedSchemaRepository( + TypedSchemaRepository repositoryClient = new TypedSchemaRepository<>( repository, new IntegerConverter(), new AvroSchemaConverter(), @@ -253,19 +254,20 @@ public void testParseSchemaless() throws SchemaValidationException, IOException Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); ByteBuffer byteBuffer = ByteBuffer.allocate(4); converter.putSubjectAndId(id, byteBuffer); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - out.write(byteBuffer.array()); - // encode data - DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); - // write avro datum to bytes - writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + out.write(byteBuffer.array()); + // encode data + DatumWriter writer = new SpecificDatumWriter<>(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); - InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); + InputRow inputRow = parser2.parseBatch(ByteBuffer.wrap(out.toByteArray())).get(0); - assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS, false); + assertInputRowCorrect(inputRow, DIMENSIONS_SCHEMALESS); + } } - public static void assertInputRowCorrect(InputRow inputRow, List expectedDimensions, boolean isFromPigAvro) + static void assertInputRowCorrect(InputRow inputRow, List expectedDimensions) { assertEquals(expectedDimensions, inputRow.getDimensions()); assertEquals(1543698L, inputRow.getTimestampFromEpoch()); @@ -316,9 +318,7 @@ public Integer apply(@Nullable String input) ); assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion")); assertEquals(Collections.emptyList(), inputRow.getDimension("someNull")); - if (isFromPigAvro) { - assertEquals(String.valueOf(SOME_FIXED_VALUE), Arrays.toString((byte[]) inputRow.getRaw("someFixed"))); - } + assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed")); assertEquals( Arrays.toString(SOME_BYTES_VALUE.array()), Arrays.toString((byte[]) (inputRow.getRaw("someBytes"))) diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index 1658d1717582..3c63eb26ae54 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.joda.time.DateTime; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -74,7 +75,7 @@ public ParquetAvroHadoopInputRowParser( this.recordFlattener = ObjectFlatteners.create( flattenSpec, - new AvroFlattenerMaker(false, this.binaryAsString) + new AvroFlattenerMaker(this.binaryAsString) ); } @@ -92,6 +93,7 @@ private LogicalType determineTimestampSpecLogicalType(Schema schema, String time /** * imitate avro extension {@link org.apache.druid.data.input.avro.AvroParsers#parseGenericRecord} */ + @Nonnull @Override public List parseBatch(GenericRecord record) { From 4fc74e482871bbc5db9a60634b2798f9838c7c2e Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 2 Jun 2019 10:10:54 +0200 Subject: [PATCH 2/6] Remove the Pig specific part --- .../apache/druid/data/input/avro/AvroFlattenerMaker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index 19326ed24e5c..b927645075e1 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -126,9 +126,9 @@ public Function makeJsonQueryExtractor(final String expr) private Object transformValue(final Object field) { - if (field instanceof GenericData.Array) { - return Lists.transform((List) field, String::valueOf); - } +// if (field instanceof GenericData.Array) { +// return Lists.transform((List) field, String::valueOf); +// } if (field instanceof ByteBuffer) { if (binaryAsString) { return StringUtils.fromUtf8(((ByteBuffer) field).array()); From 0fdd7bea661b388e0b83d732b7de3a3c182f0d74 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 3 Jun 2019 09:07:11 +0200 Subject: [PATCH 3/6] Fix the Checkstyle issues --- .../org/apache/druid/data/input/avro/AvroFlattenerMaker.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index b927645075e1..9f3aae2fd342 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -19,12 +19,10 @@ package org.apache.druid.data.input.avro; -import com.google.common.collect.Lists; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; import org.apache.druid.java.util.common.StringUtils; @@ -126,9 +124,6 @@ public Function makeJsonQueryExtractor(final String expr) private Object transformValue(final Object field) { -// if (field instanceof GenericData.Array) { -// return Lists.transform((List) field, String::valueOf); -// } if (field instanceof ByteBuffer) { if (binaryAsString) { return StringUtils.fromUtf8(((ByteBuffer) field).array()); From 58dc004281b6b9d39c8d89bd77c6586c0e0f418f Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 3 Jun 2019 15:33:57 +0200 Subject: [PATCH 4/6] Cleanup a bit --- .../util/common/parsers/ObjectFlatteners.java | 7 +------ .../data/input/avro/AvroFlattenerMaker.java | 17 +++++++++-------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index 2cdb2339f06d..a26dd7e5aba8 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -32,13 +32,8 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class ObjectFlatteners +public abstract class ObjectFlatteners { - private ObjectFlatteners() - { - // No instantiation. - } - public static ObjectFlattener create( final JSONPathSpec flattenSpec, final FlattenerMaker flattenerMaker diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java index 9f3aae2fd342..a142dcab9ee8 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroFlattenerMaker.java @@ -55,17 +55,17 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker) field).stream().filter(Objects::nonNull).collect(Collectors.toList()); } return field; From 9462f4ee38d819cea6f9e8c0f15de8cf6518deb3 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 4 Jun 2019 19:29:43 +0200 Subject: [PATCH 5/6] Add an additional test --- .../input/avro/GenericAvroJsonProvider.java | 3 + .../input/AvroHadoopInputRowParserTest.java | 2 +- .../input/avro/AvroFlattenerMakerTest.java | 203 ++++++++++++++++++ ...hemaRegistryBasedAvroBytesDecoderTest.java | 5 +- 4 files changed, 209 insertions(+), 4 deletions(-) create mode 100644 extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java index bfb6e2224a98..42195ca1b3fa 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java @@ -25,6 +25,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import javax.annotation.Nullable; + import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; @@ -139,6 +141,7 @@ public void setArrayIndex(final Object o, final int i, final Object o1) } } + @Nullable @Override public Object getMapValue(final Object o, final String s) { diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java index ed9dfa2994ec..082f5e7cd622 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java @@ -75,7 +75,7 @@ private void testParse(GenericRecord record) throws IOException assertInputRowCorrect(inputRow, DIMENSIONS); } - public static GenericRecord buildAvroFromFile() throws IOException + private static GenericRecord buildAvroFromFile() throws IOException { return buildAvroFromFile( buildSomeAvroDatum() diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java new file mode 100644 index 000000000000..675a9ceb02da --- /dev/null +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroFlattenerMakerTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.avro; + +import org.apache.druid.data.input.AvroStreamInputRowParserTest; +import org.apache.druid.data.input.SomeAvroDatum; +import org.junit.Assert; +import org.junit.Test; + +public class AvroFlattenerMakerTest +{ + + @Test + public void getRootField() + { + final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false); + + Assert.assertEquals( + record.timestamp, + flattener.getRootField(record, "timestamp") + ); + Assert.assertEquals( + record.eventType, + flattener.getRootField(record, "eventType") + ); + Assert.assertEquals( + record.id, + flattener.getRootField(record, "id") + ); + Assert.assertEquals( + record.someOtherId, + flattener.getRootField(record, "someOtherId") + ); + Assert.assertEquals( + record.isValid, + flattener.getRootField(record, "isValid") + ); + Assert.assertEquals( + record.someIntArray, + flattener.getRootField(record, "someIntArray") + ); + Assert.assertEquals( + record.someStringArray, + flattener.getRootField(record, "someStringArray") + ); + Assert.assertEquals( + record.someIntValueMap, + flattener.getRootField(record, "someIntValueMap") + ); + Assert.assertEquals( + record.someStringValueMap, + flattener.getRootField(record, "someStringValueMap") + ); + Assert.assertEquals( + record.someUnion, + flattener.getRootField(record, "someUnion") + ); + Assert.assertEquals( + record.someNull, + flattener.getRootField(record, "someNull") + ); + Assert.assertEquals( + record.someFixed, + flattener.getRootField(record, "someFixed") + ); + Assert.assertEquals( + // Casted to an array by transformValue + record.someBytes.array(), + flattener.getRootField(record, "someBytes") + ); + Assert.assertEquals( + record.someEnum, + flattener.getRootField(record, "someEnum") + ); + Assert.assertEquals( + record.someRecord, + flattener.getRootField(record, "someRecord") + ); + Assert.assertEquals( + record.someLong, + flattener.getRootField(record, "someLong") + ); + Assert.assertEquals( + record.someInt, + flattener.getRootField(record, "someInt") + ); + Assert.assertEquals( + record.someFloat, + flattener.getRootField(record, "someFloat") + ); + } + + @Test + public void makeJsonPathExtractor() + { + final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false); + + Assert.assertEquals( + record.timestamp, + flattener.makeJsonPathExtractor("$.timestamp").apply(record) + ); + Assert.assertEquals( + record.eventType, + flattener.makeJsonPathExtractor("$.eventType").apply(record) + ); + Assert.assertEquals( + record.id, + flattener.makeJsonPathExtractor("$.id").apply(record) + ); + Assert.assertEquals( + record.someOtherId, + flattener.makeJsonPathExtractor("$.someOtherId").apply(record) + ); + Assert.assertEquals( + record.isValid, + flattener.makeJsonPathExtractor("$.isValid").apply(record) + ); + Assert.assertEquals( + record.someIntArray, + flattener.makeJsonPathExtractor("$.someIntArray").apply(record) + ); + Assert.assertEquals( + record.someStringArray, + flattener.makeJsonPathExtractor("$.someStringArray").apply(record) + ); + Assert.assertEquals( + record.someIntValueMap, + flattener.makeJsonPathExtractor("$.someIntValueMap").apply(record) + ); + Assert.assertEquals( + record.someStringValueMap, + flattener.makeJsonPathExtractor("$.someStringValueMap").apply(record) + ); + Assert.assertEquals( + record.someUnion, + flattener.makeJsonPathExtractor("$.someUnion").apply(record) + ); + Assert.assertEquals( + record.someNull, + flattener.makeJsonPathExtractor("$.someNull").apply(record) + ); + Assert.assertEquals( + record.someFixed, + flattener.makeJsonPathExtractor("$.someFixed").apply(record) + ); + Assert.assertEquals( + // Casted to an array by transformValue + record.someBytes.array(), + flattener.makeJsonPathExtractor("$.someBytes").apply(record) + ); + Assert.assertEquals( + record.someEnum, + flattener.makeJsonPathExtractor("$.someEnum").apply(record) + ); + Assert.assertEquals( + record.someRecord, + flattener.makeJsonPathExtractor("$.someRecord").apply(record) + ); + Assert.assertEquals( + record.someLong, + flattener.makeJsonPathExtractor("$.someLong").apply(record) + ); + Assert.assertEquals( + record.someInt, + flattener.makeJsonPathExtractor("$.someInt").apply(record) + ); + Assert.assertEquals( + record.someFloat, + flattener.makeJsonPathExtractor("$.someFloat").apply(record) + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void makeJsonQueryExtractor() + { + final SomeAvroDatum record = AvroStreamInputRowParserTest.buildSomeAvroDatum(); + final AvroFlattenerMaker flattener = new AvroFlattenerMaker(false); + + Assert.assertEquals( + record.timestamp, + flattener.makeJsonQueryExtractor("$.timestamp").apply(record) + ); + } +} diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java index 911819d63f3e..5481c0eb9ccc 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoderTest.java @@ -45,8 +45,7 @@ */ public class SchemaRegistryBasedAvroBytesDecoderTest { - - SchemaRegistryClient registry; + private SchemaRegistryClient registry; @Before public void setUp() @@ -96,7 +95,7 @@ public void testParseWrongId() throws Exception new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); } - byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException + private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter writer = new SpecificDatumWriter<>(schema); From d1a1ce75eef2470cdc53c94bae4b6f7e7d27584a Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 10 Jun 2019 16:18:24 +0200 Subject: [PATCH 6/6] Revert the abstract class --- .../druid/java/util/common/parsers/ObjectFlatteners.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index a26dd7e5aba8..2cdb2339f06d 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -32,8 +32,13 @@ import java.util.function.Function; import java.util.stream.Collectors; -public abstract class ObjectFlatteners +public class ObjectFlatteners { + private ObjectFlatteners() + { + // No instantiation. + } + public static ObjectFlattener create( final JSONPathSpec flattenSpec, final FlattenerMaker flattenerMaker