From 1bf7e7a97f7a7b6d84322e350dce1d6bc7d682b1 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 16 Nov 2022 04:40:10 -0800 Subject: [PATCH 1/2] nested column support for ORC --- .../orc-extensions/example/test_list_map.orc | Bin 0 -> 832 bytes .../example/test_nested_array.orc | Bin 0 -> 907 bytes .../data/input/orc/OrcStructConverter.java | 71 ++++- .../input/orc/OrcStructFlattenerMaker.java | 15 +- .../data/input/orc/OrcStructJsonProvider.java | 5 + .../druid/data/input/orc/OrcReaderTest.java | 259 +++++++++++++++++- 6 files changed, 340 insertions(+), 10 deletions(-) create mode 100644 extensions-core/orc-extensions/example/test_list_map.orc create mode 100644 extensions-core/orc-extensions/example/test_nested_array.orc diff --git a/extensions-core/orc-extensions/example/test_list_map.orc b/extensions-core/orc-extensions/example/test_list_map.orc new file mode 100644 index 0000000000000000000000000000000000000000..08580220e80ff9cad2d76e8de762409cbc0562fb GIT binary patch literal 832 zcmeYda+YOa;Nsz8VE_Ul77oS$1}_GN$D5u%N@-wJROWM2mDNjBW>!{?Wp-BJGBnu0 zq^y|JB$UO-!yp4x_vm?2dIN)^7~e4lHU=}Gz$A!9MLsoEIlX6RM3@y75;YDAA7Egx z0LlhE0UIj_l(o~F_Gpq4v!XzzhN1va8pAXcfo15%iy;}WD2Q&n7?SZi3qi)SGBEsa z02AyC41Ns43JeU!Mg~TgObu8T2{1Ej8!$nX0A-yRzzPq5n2aBI85lBC3^G#;L7G8g zZj3I13=G93MFv0&7HvS+v;d^`1CU^37GYpG?PGYt$M6ixWUvijVHPJ525&dxzQt?oYNqX2 z%dQ%iur%A|p_uiJ8*|^x;XmWO@3I49_NB>__ACzn))lwx($w!JVkODuW!>CW$@zE6 zcU`vryX1^q_#57DlPB%VE&F4B{@3}R7v4Yr%X|bFEl~$f96fYU;=qB_BL`P3IjowK zl8~|_JAuPRk&P`ZL8Zkvz=?MS`=a2EX3x&{ZcF2%4jdj5ULP1zT^f>VVswtGy0SSb zTncbvSU0b=ne&9$qXfr`s*Kl;HZ3|G;iTBW$WX<|@Hj#2$9bKjhY#v}=m|_xTa;8bKFW}(W zgQ`bPw5^#lmw9^;vzs!{?Wp-BJGBnu0 zq^y|JB$UO-!yp4x_vm?2dIN)^7~e4lHU#$xQ3RHuTP22MmC{12R$&-`Vz(IBZa&V5K$BS+ z82&ea33dhsKL%k11_omz1EWi(1}uvNn3=T=m>^1kvQ7+Og$IC4H%1ph28QC2B10fH zf@w!lpn^rziE#l)^aGG!WRYWF(ALv5(g9+oq+|s%Mr;JElhVDup9xt247W@zU(D}B(#rpRjH>uSnkq7mvjBZ_?`5}k@dsF!Co!s{l zmYWx}UjCiSh0*+1B4%uYUv>hf&8)96fYU;=qB_BL`P3IjowKl8~|_ zJAuPRk&P`ZL8Zkvz=?MS`=a2MX3x(4?xj1HoCx4>k@Dm?@U+5WM#~M087vVhHy$ME z$oM#H<6!6R(i7%nH4(Lu(Mi}VBcudOX_FWka(fK94mpUpe9t%gsO}bdP2_Z#Vr#^T z95Gjq)J#>eIE6s&?uHD#2b^D2W|Y?-b2Pb3a=LH z=PWo=I5qms=esLCJ~uaoI4( convertList(TypeDescription fieldDescription, OrcLis return new ArrayList(orcList); } - private static Map convertMap( + private Map convertMap( TypeDescription fieldDescription, OrcMap map, boolean binaryAsString @@ -75,11 +75,7 @@ private static Map convertMap( TypeDescription valueDescription = fieldDescription.getChildren().get(1); for (WritableComparable key : map.navigableKeySet()) { Object newKey = convertPrimitive(keyDescription, key, binaryAsString); - if (valueDescription.getCategory().isPrimitive()) { - converted.put(newKey, convertPrimitive(valueDescription, map.get(key), binaryAsString)); - } else { - converted.put(newKey, map.get(key)); - } + converted.put(newKey, convertField(valueDescription, map.get(key))); } return converted; } @@ -148,6 +144,63 @@ private static Object convertPrimitive( } } + @Nullable + public Object tryConvertPrimitive(@Nullable WritableComparable field) + { + if (field == null) { + return null; + } + /* + ORC TYPE WRITABLE TYPE + binary org.apache.hadoop.io.BytesWritable + bigint org.apache.hadoop.io.LongWritable + boolean org.apache.hadoop.io.BooleanWritable + char org.apache.hadoop.io.Text + date org.apache.hadoop.hive.serde2.io.DateWritable + decimal org.apache.hadoop.hive.serde2.io.HiveDecimalWritable + double org.apache.hadoop.io.DoubleWritable + float org.apache.hadoop.io.FloatWritable + int org.apache.hadoop.io.IntWritable + smallint org.apache.hadoop.io.ShortWritable + string org.apache.hadoop.io.Text + timestamp org.apache.orc.mapred.OrcTimestamp + tinyint org.apache.hadoop.io.ByteWritable + varchar org.apache.hadoop.io.Text + */ + if (field instanceof Text) { + return ((Text) field).toString(); + } else if (field instanceof BooleanWritable) { + return ((BooleanWritable) field).get(); + } else if (field instanceof ByteWritable) { + return ((ByteWritable) field).get(); + } else if (field instanceof ShortWritable) { + return ((ShortWritable) field).get(); + } else if (field instanceof IntWritable) { + return ((IntWritable) field).get(); + } else if (field instanceof LongWritable) { + return ((LongWritable) field).get(); + } else if (field instanceof FloatWritable) { + return ((FloatWritable) field).get(); + } else if (field instanceof DoubleWritable) { + return ((DoubleWritable) field).get(); + } else if (field instanceof HiveDecimalWritable) { + return ((HiveDecimalWritable) field).getHiveDecimal().doubleValue(); + } else if (field instanceof OrcTimestamp) { + return ((OrcTimestamp) field).getTime(); + } else if (field instanceof DateWritable) { + return DateTimes.utc(((DateWritable) field).get().getTime()); + } else if (field instanceof BytesWritable) { + byte[] bytes = ((BytesWritable) field).getBytes(); + if (binaryAsString) { + return StringUtils.fromUtf8(bytes); + } else { + return bytes; + } + } + // unknown conversion, return null + return null; + } + private final boolean binaryAsString; private Object2IntMap fieldIndexCache; @@ -204,6 +257,12 @@ Object convertField(OrcStruct struct, int fieldIndex) return null; } + return convertField(fieldDescription, fieldValue); + } + + @Nullable + Object convertField(TypeDescription fieldDescription, WritableComparable fieldValue) + { if (fieldDescription.getCategory().isPrimitive()) { return convertPrimitive(fieldDescription, fieldValue, binaryAsString); } else { diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java index abaea0349dd3..ee770d3e8e20 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructFlattenerMaker.java @@ -25,6 +25,7 @@ import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.hadoop.io.WritableComparable; import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcList; import org.apache.orc.mapred.OrcMap; @@ -71,7 +72,7 @@ public Iterable discoverRootFields(OrcStruct obj) @Override public Object getRootField(OrcStruct obj, String key) { - return finalizeConversion(converter.convertRootField(obj, key)); + return toPlainJavaType(converter.convertRootField(obj, key)); } @Override @@ -107,11 +108,19 @@ public JsonProvider getJsonProvider() return orcJsonProvider; } + @Override + public Object finalizeConversionForMap(Object o) + { + return finalizeConversion(o); + } + private Object finalizeConversion(Object o) { - // replace any remaining complex types with null + // recursively convert any complex types if (o instanceof OrcStruct || o instanceof OrcMap || o instanceof OrcList) { - return null; + return toPlainJavaType(o); + } else if (o instanceof WritableComparable) { + return converter.tryConvertPrimitive((WritableComparable) o); } return o; } diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java index 780de1930ca3..15f81b6f2039 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java @@ -21,6 +21,8 @@ import com.jayway.jsonpath.InvalidJsonException; import com.jayway.jsonpath.spi.json.JsonProvider; +import org.apache.hadoop.io.Text; +import org.apache.orc.mapred.OrcMap; import org.apache.orc.mapred.OrcStruct; import java.io.InputStream; @@ -103,6 +105,9 @@ public Object getMapValue(final Object o, final String s) { if (o == null) { return null; + } else if (o instanceof OrcMap) { + OrcMap map = (OrcMap) o; + return map.get(new Text(s)); } else if (o instanceof Map) { return ((Map) o).get(s); } else if (o instanceof OrcStruct) { diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java index 33ed34ce4191..d0f012245b03 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java @@ -20,6 +20,7 @@ package org.apache.druid.data.input.orc; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; @@ -34,6 +35,13 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.math.expr.ExpressionProcessing; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.NestedDataDimensionSchema; +import org.apache.druid.segment.transform.ExpressionTransform; +import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.segment.transform.TransformingInputEntityReader; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Rule; @@ -44,7 +52,7 @@ import java.io.IOException; import java.util.Collections; -public class OrcReaderTest +public class OrcReaderTest extends InitializedNullHandlingTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -312,6 +320,255 @@ public void testJsonPathFunctions() throws IOException } } + @Test + public void testNestedColumn() throws IOException + { + final OrcInputFormat inputFormat = new OrcInputFormat( + new JSONPathSpec(true, ImmutableList.of()), + null, + new Configuration() + ); + final InputEntityReader reader = createReader( + new TimestampSpec("ts", "millis", null), + new DimensionsSpec( + ImmutableList.of( + new NestedDataDimensionSchema("middle"), + new NestedDataDimensionSchema("list"), + new NestedDataDimensionSchema("map") + ) + ), + inputFormat, + "example/orc-file-11-format.orc" + ); + TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("struct_list_struct_int", "json_value(middle, '$.list[1].int1')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("map_struct_int", "json_value(map, '$.chani.int1')", TestExprMacroTable.INSTANCE) + ) + ); + TransformingInputEntityReader transformingReader = new TransformingInputEntityReader( + reader, + transformSpec.toTransformer() + ); + try (CloseableIterator iterator = transformingReader.read()) { + int actualRowCount = 0; + + // Check the first row + Assert.assertTrue(iterator.hasNext()); + InputRow row = iterator.next(); + actualRowCount++; + Assert.assertEquals( + ImmutableMap.of( + "list", + ImmutableList.of( + ImmutableMap.of("int1", 1, "string1", "bye"), + ImmutableMap.of("int1", 2, "string1", "sigh") + ) + ), + row.getRaw("middle") + ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("int1", 3, "string1", "good"), + ImmutableMap.of("int1", 4, "string1", "bad") + ), + row.getRaw("list") + ); + Assert.assertEquals( + ImmutableMap.of(), + row.getRaw("map") + ); + Assert.assertEquals(2L, row.getRaw("struct_list_struct_int")); + Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), row.getTimestamp()); + + while (iterator.hasNext()) { + actualRowCount++; + row = iterator.next(); + } + + // Check the last row + Assert.assertEquals( + ImmutableMap.of( + "list", + ImmutableList.of( + ImmutableMap.of("int1", 1, "string1", "bye"), + ImmutableMap.of("int1", 2, "string1", "sigh") + ) + ), + row.getRaw("middle") + ); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("int1", 100000000, "string1", "cat"), + ImmutableMap.of("int1", -100000, "string1", "in"), + ImmutableMap.of("int1", 1234, "string1", "hat") + ), + row.getRaw("list") + ); + Assert.assertEquals( + ImmutableMap.of( + "chani", ImmutableMap.of("int1", 5, "string1", "chani"), + "mauddib", ImmutableMap.of("int1", 1, "string1", "mauddib") + ), + row.getRaw("map") + ); + Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_int"))); + Assert.assertEquals("5", Iterables.getOnlyElement(row.getDimension("map_struct_int"))); + + Assert.assertEquals(7500, actualRowCount); + } + } + + @Test + public void testListMap() throws IOException + { + final InputFormat inputFormat = new OrcInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "a_id0", "$.a['id0']"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "b_raw_str", "$.b"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "b0_id0", "$.b[0]['id0']") + ) + ), + null, + new Configuration() + ); + final InputEntityReader reader = createReader( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec( + ImmutableList.of( + new NestedDataDimensionSchema("a"), + new NestedDataDimensionSchema("b") + ) + ), + inputFormat, + "example/test_list_map.orc" + ); + final TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("t_a_id0", "json_value(a, '$.id0')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("t_b0_id0", "json_value(b, '$[0].id0')", TestExprMacroTable.INSTANCE) + ) + ); + final InputEntityReader transformingReader = new TransformingInputEntityReader( + reader, + transformSpec.toTransformer() + ); + try (CloseableIterator iterator = transformingReader.read()) { + Assert.assertTrue(iterator.hasNext()); + final InputRow row = iterator.next(); + /* + { + "timestamp": "2022-01-01T00:00:00", + "a": {"id0": "str0", "id1": "str1"}, + "b": [{"id0": "str0", "id1": "str1"}, {"id0": "str2", "id1": "str3"}] + } + */ + Assert.assertEquals(DateTimes.of("2022-01-01T00:00:00.000Z"), row.getTimestamp()); + Assert.assertEquals("str0", Iterables.getOnlyElement(row.getDimension("a_id0"))); + Assert.assertEquals("str0", Iterables.getOnlyElement(row.getDimension("t_a_id0"))); + Assert.assertEquals("str0", Iterables.getOnlyElement(row.getDimension("b0_id0"))); + Assert.assertEquals("str0", Iterables.getOnlyElement(row.getDimension("t_b0_id0"))); + Assert.assertEquals(ImmutableList.of("{id0=str0, id1=str1}", "{id0=str2, id1=str3}"), row.getDimension("b_raw_str")); + Assert.assertEquals(ImmutableMap.of("id0", "str0", "id1", "str1"), row.getRaw("a")); + Assert.assertEquals( + ImmutableList.of( + ImmutableMap.of("id0", "str0", "id1", "str1"), + ImmutableMap.of("id0", "str2", "id1", "str3") + ), + row.getRaw("b") + ); + Assert.assertFalse(iterator.hasNext()); + } + } + + @Test + public void testNestedArray() throws IOException + { + ExpressionProcessing.initializeForTests(true); + final InputFormat inputFormat = new OrcInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "a_0", "$.a[0]"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "b_0", "$.b[0]"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "c_0_0", "$.c[0][0]"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "d_0_1", "$.d[0][1]") + ) + ), + null, + new Configuration() + ); + final InputEntityReader reader = createReader( + new TimestampSpec("timestamp", "auto", null), + new DimensionsSpec( + ImmutableList.of( + new NestedDataDimensionSchema("a"), + new NestedDataDimensionSchema("b"), + new NestedDataDimensionSchema("c"), + new NestedDataDimensionSchema("d"), + new NestedDataDimensionSchema("t_d_0") + ) + ), + inputFormat, + "example/test_nested_array.orc" + ); + final TransformSpec transformSpec = new TransformSpec( + null, + ImmutableList.of( + new ExpressionTransform("t_a_0", "json_value(a, '$[0]')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("t_b_0", "json_value(b, '$[0]')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("t_c_0_0", "json_value(c, '$[0][0]')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("t_d_0_1", "json_value(d, '$[0][1]')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("t_d_0", "json_query(d, '$[0]')", TestExprMacroTable.INSTANCE) + ) + ); + final InputEntityReader transformingReader = new TransformingInputEntityReader( + reader, + transformSpec.toTransformer() + ); + try (CloseableIterator iterator = transformingReader.read()) { + Assert.assertTrue(iterator.hasNext()); + final InputRow row = iterator.next(); + /* + { + "timestamp": "2022-01-01T00:00:00", + "a": ["str1", "str2"], + "b": [1, 2], + "c": [["str1", "str2"], ["str3", "str4"]], + "d": [[1, 2], [3, 4]] + } + */ + Assert.assertEquals(DateTimes.of("2022-01-01T00:00:00.000Z"), row.getTimestamp()); + Assert.assertEquals("str1", Iterables.getOnlyElement(row.getDimension("a_0"))); + Assert.assertEquals("str1", Iterables.getOnlyElement(row.getDimension("t_a_0"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("b_0"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("t_b_0"))); + Assert.assertEquals("str1", Iterables.getOnlyElement(row.getDimension("c_0_0"))); + Assert.assertEquals("str1", Iterables.getOnlyElement(row.getDimension("t_c_0_0"))); + Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("d_0_1"))); + Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("t_d_0_1"))); + Assert.assertEquals(ImmutableList.of("str1", "str2"), row.getRaw("a")); + Assert.assertEquals(ImmutableList.of(1, 2), row.getRaw("b")); + Assert.assertEquals( + ImmutableList.of(ImmutableList.of("str1", "str2"), ImmutableList.of("str3", "str4")), + row.getRaw("c") + ); + Assert.assertEquals( + ImmutableList.of(ImmutableList.of(1, 2), ImmutableList.of(3, 4)), + row.getRaw("d") + ); + Assert.assertArrayEquals(new Object[]{1L, 2L}, (Object[]) row.getRaw("t_d_0")); + Assert.assertFalse(iterator.hasNext()); + } + finally { + ExpressionProcessing.initializeForTests(null); + } + } + private InputEntityReader createReader( TimestampSpec timestampSpec, DimensionsSpec dimensionsSpec, From 363d81cf3fb1af53e3be49174db9416c7ef1f2db Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 16 Nov 2022 12:33:18 -0800 Subject: [PATCH 2/2] more test --- extensions-core/orc-extensions/pom.xml | 6 ++++++ .../druid/data/input/orc/OrcStructConverterTest.java | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/extensions-core/orc-extensions/pom.xml b/extensions-core/orc-extensions/pom.xml index 3cddfbee6c83..57812e840d0d 100644 --- a/extensions-core/orc-extensions/pom.xml +++ b/extensions-core/orc-extensions/pom.xml @@ -40,6 +40,12 @@ ${project.parent.version} provided + + org.apache.druid + druid-processing + ${project.parent.version} + provided + org.apache.druid druid-indexing-hadoop diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcStructConverterTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcStructConverterTest.java index fe076235bd55..b1734ec052c8 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcStructConverterTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcStructConverterTest.java @@ -456,6 +456,15 @@ private static void assertFieldValue( final Object field = converter.convertRootField(orcStruct, fieldName); Assert.assertNotNull(field); Assert.assertEquals(expectedValue, field); + + + final int fieldIndex = orcStruct.getSchema().getFieldNames().indexOf(fieldName); + TypeDescription fieldDescription = orcStruct.getSchema().getChildren().get(fieldIndex); + if (fieldDescription.getCategory().isPrimitive()) { + final Object simple = converter.tryConvertPrimitive(orcStruct.getFieldValue(fieldIndex)); + Assert.assertNotNull(simple); + Assert.assertEquals(expectedValue, simple); + } } private static void assertNullValue(OrcStructConverter converter, OrcStruct orcStruct, String fieldName)