From fd235fff7a0b5330c9ae454c21442c3b0ed3dc90 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 8 Aug 2024 23:36:08 -0700 Subject: [PATCH] Fix Parquet Reader when ingestion need to read columns in filter --- .../druid/segment/indexing/ReaderUtils.java | 9 +++++- .../segment/indexing/ReaderUtilsTest.java | 30 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java index 5298f2605d88..cd49757ee105 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.transform.Transform; import org.apache.druid.segment.transform.TransformSpec; @@ -130,12 +131,18 @@ public static Set getColumnsRequiredForIngestion( } } - // Determine any fields we need to read from input file that is used in the transformSpec + // Determine any fields we need to read from input file that is used in the transform of the transformSpec List transforms = transformSpec.getTransforms(); for (Transform transform : transforms) { fieldsRequired.addAll(transform.getRequiredColumns()); } + // Determine any fields we need to read from input file that is used in the filter of the transformSpec + DimFilter filter = transformSpec.getFilter(); + if (filter != null) { + fieldsRequired.addAll(filter.getRequiredColumns()); + } + // Determine any fields we need to read from input file that is used in the dimensionsSpec List dimensionSchema = dimensionsSpec.getDimensions(); for (DimensionSchema dim : dimensionSchema) { diff --git a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java index 6fddd3b4d650..b5bdc3458891 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java @@ -34,6 +34,8 @@ import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.AndDimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -364,4 +366,32 @@ public void testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndNotUse Set actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec); Assert.assertEquals(ImmutableSet.of("A", "C"), actual); } + + @Test + public void testGetColumnsRequiredForIngestionWithFilterInTransformSpec() + { + TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("B"), + new StringDimensionSchema("C"), + new LongDimensionSchema("D"), + new FloatDimensionSchema("E"), + new LongDimensionSchema("F") + ) + ); + + TransformSpec transformSpec = new TransformSpec( + new AndDimFilter( + ImmutableList.of( + new SelectorDimFilter("G", "foo", null), + new SelectorDimFilter("H", "foobar", null) + ) + ), + ImmutableList.of() + ); + + Set actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, transformSpec, new AggregatorFactory[]{}, null); + Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F", "G", "H"), actual); + } }