From bc2d29e91117365f718e7a518272af2c62656c29 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 13 Aug 2021 19:13:12 -0700 Subject: [PATCH 01/16] [PARQUET-1968] FilterApi support In predicate --- .../parquet/filter2/predicate/FilterApi.java | 61 +++++- .../filter2/predicate/FilterPredicate.java | 10 +- .../predicate/LogicalInverseRewriter.java | 18 +- .../filter2/predicate/LogicalInverter.java | 18 +- .../parquet/filter2/predicate/Operators.java | 89 ++++++++- .../SchemaCompatibilityValidator.java | 25 ++- .../IncrementallyUpdatedFilterPredicate.java | 47 ++++- ...allyUpdatedFilterPredicateBuilderBase.java | 34 +++- .../columnindex/ColumnIndexBuilder.java | 25 +++ .../columnindex/ColumnIndexFilter.java | 13 ++ .../columnindex/TestColumnIndexBuilder.java | 182 ++++++++++++++++++ .../columnindex/TestColumnIndexFilter.java | 55 ++++++ parquet-hadoop/pom.xml | 2 +- .../bloomfilterlevel/BloomFilterImpl.java | 41 ++++ .../dictionarylevel/DictionaryFilter.java | 104 +++++++++- .../statisticslevel/StatisticsFilter.java | 75 +++++++- .../TestFiltersWithMissingColumns.java | 10 + .../filter2/compat/TestRowGroupFilter.java | 34 +++- .../dictionarylevel/DictionaryFilterTest.java | 116 +++++++++++ .../statisticslevel/TestStatisticsFilter.java | 97 +++++++++- .../parquet/hadoop/TestBloomFiltering.java | 26 +++ pom.xml | 1 + 22 files changed, 1043 insertions(+), 40 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index b209fc7b6f..edfba7c55d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,6 +19,7 @@ package org.apache.parquet.filter2.predicate; import java.io.Serializable; +import java.util.Set; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.filter2.predicate.Operators.And; @@ -30,12 +31,14 @@ import org.apache.parquet.filter2.predicate.Operators.FloatColumn; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.filter2.predicate.Operators.LongColumn; import org.apache.parquet.filter2.predicate.Operators.Lt; import org.apache.parquet.filter2.predicate.Operators.LtEq; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq; import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt; @@ -204,6 +207,56 @@ public static , C extends Column & SupportsLtGt> GtEq return new GtEq<>(column, value); } + /** + * Keeps records if their value is in the provided values. + * The provided values set could not be null, but could contains a null value. + *

+ * For example: + *

+   *   {@code
+   *   Set set = new HashSet<>();
+   *   set.add(9);
+   *   set.add(null);
+   *   set.add(50);
+   *   in(column, set);}
+   * 
+ * will keep all records whose values are 9, null, or 50. + * + * @param column a column reference created by FilterApi + * @param values a set of values that match the column's type + * @param the Java type of values in the column + * @param the column type that corresponds to values of type T + * @return an in predicate for the given column and value + */ + public static , C extends Column & SupportsEqNotEq> In in(C column, Set values) { + return new In<>(column, values); + } + + /** + * Keeps records if their value is not in the provided values. + * The provided values set could not be null, but could contains a null value. + *

+ * For example: + *

+   *   {@code
+   *   Set set = new HashSet<>();
+   *   set.add(9);
+   *   set.add(null);
+   *   set.add(50);
+   *   notIn(column, set);}
+   * 
+ * will keep all records whose values are not 9, null, and 50. + * + * @param column a column reference created by FilterApi + * @param values a set of values that match the column's type + * @param the Java type of values in the column + * @param the column type that corresponds to values of type T + * @return an notIn predicate for the given column and value + */ + public static , C extends Column & SupportsEqNotEq> NotIn notIn(C column, Set values) { + return new NotIn<>(column, values); + } + /** * Keeps records that pass the provided {@link UserDefinedPredicate} *

@@ -220,7 +273,7 @@ public static , C extends Column & SupportsLtGt> GtEq UserDefined userDefined(Column column, Class clazz) { return new UserDefinedByClass<>(column, clazz); } - + /** * Keeps records that pass the provided {@link UserDefinedPredicate} *

diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java index 211c71e6d7..cefb43cfe6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,11 +22,13 @@ import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined; import org.apache.parquet.filter2.predicate.Operators.Lt; import org.apache.parquet.filter2.predicate.Operators.LtEq; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.UserDefined; @@ -66,6 +68,8 @@ public static interface Visitor { > R visit(LtEq ltEq); > R visit(Gt gt); > R visit(GtEq gtEq); + > R visit(In in); + > R visit(NotIn notIn); R visit(And and); R visit(Or or); R visit(Not not); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java index 88cb836e2c..5d6e3d0478 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,11 +23,13 @@ import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined; import org.apache.parquet.filter2.predicate.Operators.Lt; import org.apache.parquet.filter2.predicate.Operators.LtEq; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.UserDefined; @@ -87,6 +89,16 @@ public > FilterPredicate visit(GtEq gtEq) { return gtEq; } + @Override + public > FilterPredicate visit(In in) { + return in; + } + + @Override + public > FilterPredicate visit(NotIn notIn) { + return notIn; + } + @Override public FilterPredicate visit(And and) { return and(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java index cc0186b8b7..819fa96abf 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,11 +23,13 @@ import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined; import org.apache.parquet.filter2.predicate.Operators.Lt; import org.apache.parquet.filter2.predicate.Operators.LtEq; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.UserDefined; @@ -81,6 +83,16 @@ public > FilterPredicate visit(GtEq gtEq) { return new Lt<>(gtEq.getColumn(), gtEq.getValue()); } + @Override + public > FilterPredicate visit(In in) { + return new NotIn<>(in.getColumn(), in.getValues()); + } + + @Override + public > FilterPredicate visit(NotIn notIn) { + return new In<>(notIn.getColumn(), notIn.getValues()); + } + @Override public FilterPredicate visit(And and) { return new Or(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 9a1696c411..489d5a31c7 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,10 +21,13 @@ import java.io.Serializable; import java.util.Locale; import java.util.Objects; +import java.util.Set; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.api.Binary; +import static org.apache.parquet.Preconditions.checkArgument; + /** * These are the operators in a filter predicate expression tree. * They are constructed by using the methods in {@link FilterApi} @@ -169,7 +172,7 @@ public int hashCode() { public static final class Eq> extends ColumnFilterPredicate { // value can be null - Eq(Column column, T value) { + public Eq(Column column, T value) { super(column, value); } @@ -247,6 +250,80 @@ public R accept(Visitor visitor) { } } + // base class for In and NotIn + public static abstract class SetColumnFilterPredicate> implements FilterPredicate, Serializable { + private final Column column; + private final Set values; + private final String toString; + + protected SetColumnFilterPredicate(Column column, Set values) { + this.column = Objects.requireNonNull(column, "column cannot be null"); + this.values = Objects.requireNonNull(values, "values cannot be null"); + checkArgument(!values.isEmpty(), "values in SetColumnFilterPredicate shouldn't be empty!"); + + String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); + StringBuilder str = new StringBuilder(); + int iter = 0; + for (T value : values) { + if (iter >= 100) break; + str.append(value).append(", "); + iter++; + } + String valueStr = values.size() <= 100 ? str.substring(0, str.length() - 2) : str + "..."; + this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + valueStr + ")"; + } + + public Column getColumn() { + return column; + } + + public Set getValues() { + return values; + } + + @Override + public String toString() { + return toString; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SetColumnFilterPredicate that = (SetColumnFilterPredicate) o; + return column.equals(that.column) && values.equals(that.values) && Objects.equals(toString, that.toString); + } + + @Override + public int hashCode() { + return Objects.hash(column, values, toString); + } + } + + public static final class In> extends SetColumnFilterPredicate { + + public In(Column column, Set values) { + super(column, values); + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + } + + public static final class NotIn> extends SetColumnFilterPredicate { + + NotIn(Column column, Set values) { + super(column, values); + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + } + // base class for And, Or private static abstract class BinaryLogicalFilterPredicate implements FilterPredicate, Serializable { private final FilterPredicate left; @@ -374,7 +451,7 @@ public R accept(Visitor visitor) { return visitor.visit(this); } } - + public static final class UserDefinedByClass, U extends UserDefinedPredicate> extends UserDefined { private final Class udpClass; private final String toString; @@ -430,7 +507,7 @@ public int hashCode() { return result; } } - + public static final class UserDefinedByInstance, U extends UserDefinedPredicate & Serializable> extends UserDefined { private final String toString; private final U udpInstance; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index c75036bbdc..92fb51210b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,12 +29,15 @@ import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined; import org.apache.parquet.filter2.predicate.Operators.Lt; import org.apache.parquet.filter2.predicate.Operators.LtEq; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; @@ -114,6 +117,18 @@ public > Void visit(GtEq pred) { return null; } + @Override + public > Void visit(In pred) { + validateColumnFilterPredicate(pred); + return null; + } + + @Override + public > Void visit(NotIn pred) { + validateColumnFilterPredicate(pred); + return null; + } + @Override public Void visit(And and) { and.getLeft().accept(this); @@ -149,6 +164,10 @@ private > void validateColumnFilterPredicate(ColumnFilte validateColumn(pred.getColumn()); } + private > void validateColumnFilterPredicate(SetColumnFilterPredicate pred) { + validateColumn(pred.getColumn()); + } + private > void validateColumn(Column column) { ColumnPath path = column.getColumnPath(); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index a280e77c52..5c81729418 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,7 @@ import org.apache.parquet.io.api.Binary; import java.util.Objects; +import java.util.Set; /** * A rewritten version of a {@link org.apache.parquet.filter2.predicate.FilterPredicate} which receives @@ -123,6 +124,46 @@ public boolean accept(Visitor visitor) { } } + abstract class SetInspector implements IncrementallyUpdatedFilterPredicate { + private final Set predicates; + + SetInspector(Set predicates) { + this.predicates = predicates; + } + + public final Set getPredicates() { + return predicates; + } + } + + final class In extends SetInspector { + In(Set predicates) { + super(predicates); + } + + @Override + public boolean accept(Visitor visitor) { + for (ValueInspector vi : getPredicates()) { + if (vi.accept(visitor)) return true; + } + return false; + } + } + + final class NotIn extends SetInspector { + NotIn(Set predicates) { + super(predicates); + } + + @Override + public boolean accept(Visitor visitor) { + for (ValueInspector vi : getPredicates()) { + if (vi.accept(visitor)) return false; + } + return true; + } + } + // base class for and / or static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate { private final IncrementallyUpdatedFilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 481ddef09f..4e127d1768 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,16 +20,22 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; import org.apache.parquet.filter2.predicate.Operators.And; +import org.apache.parquet.filter2.predicate.Operators.Eq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.Not; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate; import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.PrimitiveComparator; @@ -97,6 +103,28 @@ protected final PrimitiveComparator getComparator(ColumnPath path) { return (PrimitiveComparator) comparatorsByColumn.get(path); } + @Override + public > IncrementallyUpdatedFilterPredicate visit(In in) { + Set predicates = constructPredicates(in); + return new IncrementallyUpdatedFilterPredicate.In(predicates); + } + + @Override + public > IncrementallyUpdatedFilterPredicate visit(NotIn notIn) { + Set predicates = constructPredicates(notIn); + return new IncrementallyUpdatedFilterPredicate.NotIn(predicates); + } + + private > Set constructPredicates(SetColumnFilterPredicate in) { + Set predicates = new HashSet<>(); + for (T value : in.getValues()) { + Eq eq = new Eq<>(in.getColumn(), value); + IncrementallyUpdatedFilterPredicate incremental = eq.accept(this); + predicates.add((ValueInspector) incremental); + } + return predicates; + } + @Override public final IncrementallyUpdatedFilterPredicate visit(And and) { return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 15be50e55d..1fb59a7973 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -25,6 +25,7 @@ import java.util.Formatter; import java.util.List; import java.util.PrimitiveIterator; +import java.util.function.IntConsumer; import java.util.function.IntPredicate; import org.apache.parquet.column.statistics.Statistics; @@ -32,12 +33,15 @@ import org.apache.parquet.filter2.predicate.Operators.Eq; import org.apache.parquet.filter2.predicate.Operators.Gt; import org.apache.parquet.filter2.predicate.Operators.GtEq; +import org.apache.parquet.filter2.predicate.Operators.In; import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined; import org.apache.parquet.filter2.predicate.Operators.Lt; import org.apache.parquet.filter2.predicate.Operators.LtEq; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; +import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; @@ -287,6 +291,27 @@ public > PrimitiveIterator.OfInt visit(NotEq notEq) { pageIndex -> nullCounts[pageIndex] > 0 || matchingIndexes.contains(pageIndex)); } + @Override + public > PrimitiveIterator.OfInt visit(In in) { + IntSet indexes = getMatchingIndexes(in); + return IndexIterator.filter(getPageCount(), indexes::contains); + } + + @Override + public > PrimitiveIterator.OfInt visit(NotIn notIn) { + IntSet indexes = getMatchingIndexes(notIn); + return IndexIterator.filter(getPageCount(), pageIndex -> !indexes.contains(pageIndex)); + } + + private > IntSet getMatchingIndexes(SetColumnFilterPredicate in) { + IntSet matchingIndexes = new IntOpenHashSet(); + for (T value : in.getValues()) { + Eq eq = new Eq<>(in.getColumn(), value); + visit(eq).forEachRemaining((IntConsumer) matchingIndexes::add); + } + return matchingIndexes; + } + @Override public , U extends UserDefinedPredicate> PrimitiveIterator.OfInt visit( UserDefined udp) { diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 6dec7741dd..6c27f98097 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -27,6 +27,7 @@ import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Column; import org.apache.parquet.filter2.predicate.Operators.Eq; @@ -146,6 +147,18 @@ public > RowRanges visit(GtEq gtEq) { return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq), RowRanges.EMPTY); } + @Override + public > RowRanges visit(Operators.In in) { + boolean isNull = in.getValues().contains(null); + return applyPredicate(in.getColumn(), ci -> ci.visit(in), isNull ? allRows() : RowRanges.EMPTY); + } + + @Override + public > RowRanges visit(Operators.NotIn notIn) { + boolean isNull = notIn.getValues().contains(null); + return applyPredicate(notIn.getColumn(), ci -> ci.visit(notIn), isNull ? RowRanges.EMPTY : allRows()); + } + @Override public , U extends UserDefinedPredicate> RowRanges visit(UserDefined udp) { return applyPredicate(udp.getColumn(), ci -> ci.visit(udp), diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java index 5a3947c980..7c5ed0d1b6 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java @@ -26,11 +26,13 @@ import static org.apache.parquet.filter2.predicate.FilterApi.floatColumn; import static org.apache.parquet.filter2.predicate.FilterApi.gt; import static org.apache.parquet.filter2.predicate.FilterApi.gtEq; +import static org.apache.parquet.filter2.predicate.FilterApi.in; import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; import static org.apache.parquet.filter2.predicate.FilterApi.lt; import static org.apache.parquet.filter2.predicate.FilterApi.ltEq; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; +import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; import static org.apache.parquet.schema.OriginalType.DECIMAL; @@ -55,7 +57,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.statistics.Statistics; @@ -274,6 +278,13 @@ public void testBuildBinaryDecimal() { decimalBinary("87656273")); assertCorrectFiltering(columnIndex, eq(col, decimalBinary("0.0")), 1, 4); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6); + Set set1 = new HashSet<>(); + set1.add(Binary.fromString("0.0")); + assertCorrectFiltering(columnIndex, in(col, set1), 1, 4); + assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 2, 3, 5, 6, 7); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5, 6); + assertCorrectFiltering(columnIndex, notIn(col, set1), 7); assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 0, 1, 2, 3, 4, 5, 6); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 7); assertCorrectFiltering(columnIndex, gt(col, decimalBinary("2348978.45")), 1); @@ -319,6 +330,13 @@ public void testBuildBinaryDecimal() { null); assertCorrectFiltering(columnIndex, eq(col, decimalBinary("87656273")), 2, 4); assertCorrectFiltering(columnIndex, eq(col, null), 0, 3, 5, 6, 7); + Set set2 = new HashSet<>(); + set2.add(decimalBinary("87656273")); + assertCorrectFiltering(columnIndex, in(col, set2), 2, 4); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 3, 5, 6, 7); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 2, 3, 4, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set2), 1); assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("87656273")), 0, 1, 2, 3, 5, 6, 7); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 4, 6); assertCorrectFiltering(columnIndex, gt(col, decimalBinary("87656273")), 6); @@ -364,6 +382,13 @@ public void testBuildBinaryDecimal() { decimalBinary("-9999293.23")); assertCorrectFiltering(columnIndex, eq(col, decimalBinary("1234567890.12")), 2, 4); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 6); + Set set3 = new HashSet<>(); + set3.add(decimalBinary("1234567890.12")); + assertCorrectFiltering(columnIndex, in(col, set3), 2, 4); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 3, 5, 6, 7); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 1, 2, 3, 4, 6); + assertCorrectFiltering(columnIndex, notIn(col, set3), 5, 7); assertCorrectFiltering(columnIndex, notEq(col, decimalBinary("0.0")), 0, 1, 2, 3, 4, 5, 6, 7); assertCorrectFiltering(columnIndex, notEq(col, null), 2, 4, 5, 7); assertCorrectFiltering(columnIndex, gt(col, decimalBinary("1234567890.12"))); @@ -417,6 +442,13 @@ public void testBuildBinaryUtf8() { null); assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 1, 4, 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 5, 7); + Set set1 = new HashSet<>(); + set1.add(stringBinary("Marvin")); + assertCorrectFiltering(columnIndex, in(col, set1), 1, 4, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 2, 3, 6, 7); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5, 7); + assertCorrectFiltering(columnIndex, notIn(col, set1), 6); assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Beeblebrox")), 0, 1, 2, 3, 4, 5, 7); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5, 6); assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1, 5); @@ -462,6 +494,13 @@ public void testBuildBinaryUtf8() { null); assertCorrectFiltering(columnIndex, eq(col, stringBinary("Jeltz")), 3, 4); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 4, 5, 7); + Set set2 = new HashSet<>(); + set2.add( stringBinary("Jeltz")); + assertCorrectFiltering(columnIndex, in(col, set2), 3, 4); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 5, 6, 7); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 7); + assertCorrectFiltering(columnIndex, notIn(col, set2), 6); assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Slartibartfast")), 0, 1, 2, 3, 4, 5, 7); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 3, 4, 6); assertCorrectFiltering(columnIndex, gt(col, stringBinary("Marvin")), 4, 6); @@ -507,6 +546,13 @@ public void testBuildBinaryUtf8() { stringBinary("Beeblebrox")); assertCorrectFiltering(columnIndex, eq(col, stringBinary("Marvin")), 3); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 5, 6, 7); + Set set3 = new HashSet<>(); + set3.add(stringBinary("Marvin")); + assertCorrectFiltering(columnIndex, in(col, set3), 3); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 4, 5, 6, 7); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 4); assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 1, 2, 3, 5, 6, 7); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 4, 7); assertCorrectFiltering(columnIndex, gt(col, stringBinary("Prefect")), 1); @@ -615,6 +661,13 @@ public void testFilterWithoutNullCounts() { BinaryColumn col = binaryColumn("test_col"); assertCorrectFiltering(columnIndex, eq(col, stringBinary("Dent")), 2, 3); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 5, 6, 7); + Set set = new HashSet<>(); + set.add(stringBinary("Dent")); + assertCorrectFiltering(columnIndex, in(col, set), 2, 3); + assertCorrectFiltering(columnIndex, notIn(col, set), 0, 1, 4, 5, 6, 7); + set.add(null); + assertCorrectFiltering(columnIndex, in(col, set), 0, 1, 2, 3, 4, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set), new int[0]); assertCorrectFiltering(columnIndex, notEq(col, stringBinary("Dent")), 0, 1, 2, 3, 4, 5, 6, 7); assertCorrectFiltering(columnIndex, notEq(col, null), 2, 3, 5, 7); assertCorrectFiltering(columnIndex, userDefined(col, BinaryDecimalIsNullOrZeroUdp.class), 0, 1, 2, 3, 4, 5, 6, 7); @@ -646,6 +699,13 @@ public void testBuildBoolean() { assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false); assertCorrectFiltering(columnIndex, eq(col, true), 0, 1, 2); assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3); + Set set1 = new HashSet<>(); + set1.add(true); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2); + assertCorrectFiltering(columnIndex, notIn(col, set1), 3, 4); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3); + assertCorrectFiltering(columnIndex, notIn(col, set1), 4); assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4); assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 1, 2, 3); @@ -670,6 +730,13 @@ public void testBuildBoolean() { assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null); assertCorrectFiltering(columnIndex, eq(col, true), 4, 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6); + Set set2 = new HashSet<>(); + set2.add(true); + assertCorrectFiltering(columnIndex, in(col, set2), 4, 5); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 3, 6); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 2, 3, 4, 5, 6); + assertCorrectFiltering(columnIndex, notIn(col, set2), 1); assertCorrectFiltering(columnIndex, notEq(col, true), 0, 1, 2, 3, 4, 5, 6); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5); assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 2, 3, 4, 5, 6); @@ -694,6 +761,13 @@ public void testBuildBoolean() { assertCorrectValues(columnIndex.getMinValues(), null, true, null, null, false, false, null); assertCorrectFiltering(columnIndex, eq(col, true), 1, 4); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 5, 6); + Set set3 = new HashSet<>(); + set3.add(true); + assertCorrectFiltering(columnIndex, in(col, set3), 1, 4); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 2, 3, 5, 6); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 1, 2, 3, 4, 5, 6); + assertCorrectFiltering(columnIndex, notIn(col, set3), new int[0]); assertCorrectFiltering(columnIndex, notEq(col, true), 0, 2, 3, 4, 5, 6); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 4, 5); assertCorrectFiltering(columnIndex, userDefined(col, BooleanIsTrueOrNull.class), 0, 1, 2, 3, 4, 5, 6); @@ -741,6 +815,14 @@ public void testBuildDouble() { assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0); assertCorrectFiltering(columnIndex, eq(col, 0.0), 1, 5); assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3); + Set set1 = new HashSet<>(); + set1.add(0.0); + set1.add(-4.2); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 2, 3, 4); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 4); assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5); assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 4, 5); @@ -771,6 +853,15 @@ public void testBuildDouble() { assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null); assertCorrectFiltering(columnIndex, eq(col, 0.0), 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8); + Set set2 = new HashSet<>(); + set2.add(0.0); + set2.add(3.5); + set2.add(-346.0); + assertCorrectFiltering(columnIndex, in(col, set2), 1, 5, 7); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 2, 3, 4, 6, 8); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 7, 8); + assertCorrectFiltering(columnIndex, notIn(col, set2), new int[0]); assertCorrectFiltering(columnIndex, notEq(col, 0.0), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7); assertCorrectFiltering(columnIndex, gt(col, 2.99999), 7); @@ -801,6 +892,13 @@ public void testBuildDouble() { assertCorrectValues(columnIndex.getMinValues(), null, 345.2, null, 234.6, null, -2.99999, null, null, -42.83); assertCorrectFiltering(columnIndex, eq(col, 234.6), 3, 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7); + Set set3 = new HashSet<>(); + set3.add(234.6); + assertCorrectFiltering(columnIndex, in(col, set3), 3, 5); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 4, 6, 7, 8); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8); assertCorrectFiltering(columnIndex, notEq(col, 2.2), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8); assertCorrectFiltering(columnIndex, gt(col, 2.2), 1, 3, 5); @@ -871,6 +969,13 @@ public void testBuildFloat() { assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f); assertCorrectFiltering(columnIndex, eq(col, 0.0f), 1, 5); assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3); + Set set1 = new HashSet<>(); + set1.add(0.0f); + assertCorrectFiltering(columnIndex, in(col, set1), 1, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 2, 3, 4); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 1, 2, 3, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 4); assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5); assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 4, 5); @@ -901,6 +1006,13 @@ public void testBuildFloat() { assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null); assertCorrectFiltering(columnIndex, eq(col, 0.0f), 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8); + Set set2 = new HashSet<>(); + set2.add(0.0f); + assertCorrectFiltering(columnIndex, in(col, set2), 5); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 3, 4, 6, 7, 8); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 8); + assertCorrectFiltering(columnIndex, notIn(col, set2), 7); assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7); assertCorrectFiltering(columnIndex, gt(col, 2.2f), 5, 7); @@ -931,6 +1043,13 @@ public void testBuildFloat() { assertCorrectValues(columnIndex.getMinValues(), null, 345.2f, null, 234.6f, null, -2.99999f, null, null, -42.83f); assertCorrectFiltering(columnIndex, eq(col, 234.65f), 3); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7); + Set set3 = new HashSet<>(); + set3.add(234.65f); + assertCorrectFiltering(columnIndex, in(col, set3), 3); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 4, 5, 6, 7, 8); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 5, 8); assertCorrectFiltering(columnIndex, notEq(col, 2.2f), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8); assertCorrectFiltering(columnIndex, gt(col, 2.2f), 1, 3, 5); @@ -1001,6 +1120,13 @@ public void testBuildInt32() { assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21); assertCorrectFiltering(columnIndex, eq(col, 2), 0, 1, 2, 4, 5); assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3); + Set set1 = new HashSet<>(); + set1.add(2); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 4, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 3); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 4, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), new int[0]); assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5); assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 5); @@ -1031,6 +1157,13 @@ public void testBuildInt32() { assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null); assertCorrectFiltering(columnIndex, eq(col, 2), 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8); + Set set2 = new HashSet<>(); + set2.add(2); + assertCorrectFiltering(columnIndex, in(col, set2), 5); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 2, 3, 4, 6, 7, 8); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 8); + assertCorrectFiltering(columnIndex, notIn(col, set2), 7); assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7); assertCorrectFiltering(columnIndex, gt(col, 2), 7); @@ -1062,6 +1195,13 @@ public void testBuildInt32() { assertCorrectValues(columnIndex.getMinValues(), null, 345, null, 42, null, -2, null, null, -42); assertCorrectFiltering(columnIndex, eq(col, 2), 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7); + Set set3 = new HashSet<>(); + set3.add(2); + assertCorrectFiltering(columnIndex, in(col, set3), 5); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 3, 4, 6, 7, 8); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8); assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8); assertCorrectFiltering(columnIndex, gt(col, 2), 1, 3, 5); @@ -1114,6 +1254,13 @@ public void testBuildUInt8() { assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF); assertCorrectFiltering(columnIndex, eq(col, 2), 2, 4); assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3); + Set set1 = new HashSet<>(); + set1.add(2); + assertCorrectFiltering(columnIndex, in(col, set1), 2, 4); + assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 1, 3, 5); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 1, 2, 3, 4); + assertCorrectFiltering(columnIndex, notIn(col, set1), 0, 5); assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5); assertCorrectFiltering(columnIndex, gt(col, 2), 0, 1, 4, 5); @@ -1144,6 +1291,13 @@ public void testBuildUInt8() { assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null); assertCorrectFiltering(columnIndex, eq(col, 2), 2); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8); + Set set2 = new HashSet<>(); + set2.add(2); + assertCorrectFiltering(columnIndex, in(col, set2), 2); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 3, 4, 5, 6, 7, 8); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 6, 8); + assertCorrectFiltering(columnIndex, notIn(col, set2), 5, 7); assertCorrectFiltering(columnIndex, notEq(col, 2), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7); assertCorrectFiltering(columnIndex, gt(col, 0xEE), 7); @@ -1175,6 +1329,13 @@ public void testBuildUInt8() { assertCorrectValues(columnIndex.getMinValues(), null, 0xFF, null, 0xEA, null, 42, null, null, 0); assertCorrectFiltering(columnIndex, eq(col, 0xAB), 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7); + Set set3 = new HashSet<>(); + set3.add(0xAB); + assertCorrectFiltering(columnIndex, in(col, set3), 5); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 3, 4, 6, 7, 8); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8); assertCorrectFiltering(columnIndex, notEq(col, 0xFF), 0, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8); assertCorrectFiltering(columnIndex, gt(col, 0xFF)); @@ -1211,6 +1372,13 @@ public void testBuildInt64() { assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l); assertCorrectFiltering(columnIndex, eq(col, 0l), 0, 1, 5); assertCorrectFiltering(columnIndex, eq(col, null), 1, 2, 3); + Set set1 = new HashSet<>(); + set1.add(0l); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 2, 3, 4); + set1.add(null); + assertCorrectFiltering(columnIndex, in(col, set1), 0, 1, 2, 3, 5); + assertCorrectFiltering(columnIndex, notIn(col, set1), 4); assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5); assertCorrectFiltering(columnIndex, notEq(col, null), 0, 1, 2, 4, 5); assertCorrectFiltering(columnIndex, gt(col, 2l), 0, 1, 5); @@ -1241,6 +1409,13 @@ public void testBuildInt64() { assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null); assertCorrectFiltering(columnIndex, eq(col, -42l), 2, 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 1, 2, 3, 4, 6, 8); + Set set2 = new HashSet<>(); + set2.add(-42l); + assertCorrectFiltering(columnIndex, in(col, set2), 2, 5); + assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 1, 3, 4, 6, 7, 8); + set2.add(null); + assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 8); + assertCorrectFiltering(columnIndex, notIn(col, set2), 7); assertCorrectFiltering(columnIndex, notEq(col, -42l), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 2, 5, 7); assertCorrectFiltering(columnIndex, gt(col, 2l), 7); @@ -1272,6 +1447,13 @@ public void testBuildInt64() { assertCorrectValues(columnIndex.getMinValues(), null, 345l, null, 42l, null, -2l, null, null, -42l); assertCorrectFiltering(columnIndex, eq(col, 0l), 5); assertCorrectFiltering(columnIndex, eq(col, null), 0, 2, 3, 4, 6, 7); + Set set3 = new HashSet<>(); + set3.add(0l); + assertCorrectFiltering(columnIndex, in(col, set3), 5); + assertCorrectFiltering(columnIndex, notIn(col, set3), 0, 1, 2, 3, 4, 6, 7, 8); + set3.add(null); + assertCorrectFiltering(columnIndex, in(col, set3), 0, 2, 3, 4, 5, 6, 7); + assertCorrectFiltering(columnIndex, notIn(col, set3), 1, 8); assertCorrectFiltering(columnIndex, notEq(col, 0l), 0, 1, 2, 3, 4, 5, 6, 7, 8); assertCorrectFiltering(columnIndex, notEq(col, null), 1, 3, 5, 8); assertCorrectFiltering(columnIndex, gt(col, 2l), 1, 3, 5); diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java index f37a343b40..b6725789d0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java @@ -26,11 +26,13 @@ import static org.apache.parquet.filter2.predicate.FilterApi.eq; import static org.apache.parquet.filter2.predicate.FilterApi.gt; import static org.apache.parquet.filter2.predicate.FilterApi.gtEq; +import static org.apache.parquet.filter2.predicate.FilterApi.in; import static org.apache.parquet.filter2.predicate.FilterApi.intColumn; import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; import static org.apache.parquet.filter2.predicate.FilterApi.lt; import static org.apache.parquet.filter2.predicate.FilterApi.ltEq; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; +import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; @@ -68,6 +70,7 @@ import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.BinaryUtf8StartsWithB; import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.DoubleIsInteger; import org.apache.parquet.internal.column.columnindex.TestColumnIndexBuilder.IntegerIsDivisableWith3; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; import org.junit.Test; @@ -360,6 +363,58 @@ public void testFiltering() { calculateRowRanges(FilterCompat.get( userDefined(intColumn("column1"), AnyInt.class)), STORE, paths, TOTAL_ROW_COUNT), TOTAL_ROW_COUNT); + + Set set1 = new HashSet<>(); + set1.add(7); + assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column1"), set1)), STORE, paths, TOTAL_ROW_COUNT), + 7, 8, 9, 10, 11, 12, 13); + set1.add(1); + assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column1"), set1)), STORE, paths, TOTAL_ROW_COUNT), + 0, 7, 8, 9, 10, 11, 12, 13); + assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column1"), set1)), STORE, paths, TOTAL_ROW_COUNT), + 1, 2, 3, 4, 5, 6, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + + Set set2 = new HashSet<>(); + set2.add(fromString("Zulu")); + set2.add(fromString("Alfa")); + assertRows(calculateRowRanges(FilterCompat.get(in(binaryColumn("column2"), set2)), STORE, paths, TOTAL_ROW_COUNT), + 0, 29); + assertRows(calculateRowRanges(FilterCompat.get(notIn(binaryColumn("column2"), set2)), STORE, paths, TOTAL_ROW_COUNT), + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28); + + Set set3 = new HashSet<>(); + set3.add(2.03); + assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), + 0, 1, 2, 3, 4, 5, 16, 17, 18, 19, 20, 21, 22); + assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), + 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 23, 24, 25, 26, 27, 28, 29); + set3.add(9.98); + assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), + 0, 1, 2, 3, 4, 5, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22); + assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), + 6, 7, 8, 9, 23, 24, 25, 26, 27, 28, 29); + set3.add(null); + assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 26, 27, 28, 29); + assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), + 23, 24, 25); + + Set set4 = new HashSet<>(); + set4.add(null); + assertRows(calculateRowRanges(FilterCompat.get(in(booleanColumn("column4"), set4)), STORE, paths, TOTAL_ROW_COUNT), + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + // no column index, can't filter this row + assertRows(calculateRowRanges(FilterCompat.get(notIn(booleanColumn("column4"), set4)), STORE, paths, TOTAL_ROW_COUNT), + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + + Set set5 = new HashSet<>(); + set5.add(7); + set5.add(20); + assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column5"), set5)), STORE, paths, TOTAL_ROW_COUNT), + new long[0]); + assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column5"), set5)), STORE, paths, TOTAL_ROW_COUNT), + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + assertRows(calculateRowRanges(FilterCompat.get( and( and( diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 8e42d42096..6c2f76cff4 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -1,5 +1,5 @@ org.apache.parquet.column.values.dictionary.DictionaryValuesWriter#dictionaryByteSize + org.apache.parquet.filter2.predicate.FilterPredicate From 493b34c6f2c8250d518ef9d88707bdc328aacc06 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 13 Aug 2021 19:37:04 -0700 Subject: [PATCH 02/16] fix white space --- .../apache/parquet/filter2/predicate/FilterApi.java | 8 ++++---- .../parquet/filter2/predicate/FilterPredicate.java | 6 +++--- .../filter2/predicate/LogicalInverseRewriter.java | 6 +++--- .../parquet/filter2/predicate/LogicalInverter.java | 6 +++--- .../apache/parquet/filter2/predicate/Operators.java | 10 +++++----- .../predicate/SchemaCompatibilityValidator.java | 6 +++--- .../IncrementallyUpdatedFilterPredicate.java | 6 +++--- ...IncrementallyUpdatedFilterPredicateBuilderBase.java | 6 +++--- .../filter2/dictionarylevel/DictionaryFilter.java | 2 +- .../filter2/statisticslevel/StatisticsFilter.java | 6 +++--- .../parquet/filter2/compat/TestRowGroupFilter.java | 6 +++--- .../filter2/statisticslevel/TestStatisticsFilter.java | 6 +++--- 12 files changed, 37 insertions(+), 37 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index edfba7c55d..a0490d9ac9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -273,7 +273,7 @@ public static , C extends Column & SupportsEqNotEq> N UserDefined userDefined(Column column, Class clazz) { return new UserDefinedByClass<>(column, clazz); } - + /** * Keeps records that pass the provided {@link UserDefinedPredicate} *

diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java index cefb43cfe6..a5f1aa3402 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java index 5d6e3d0478..49b862f602 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java index 819fa96abf..b95d473ef2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 489d5a31c7..ec80c03a76 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -451,7 +451,7 @@ public R accept(Visitor visitor) { return visitor.visit(this); } } - + public static final class UserDefinedByClass, U extends UserDefinedPredicate> extends UserDefined { private final Class udpClass; private final String toString; @@ -507,7 +507,7 @@ public int hashCode() { return result; } } - + public static final class UserDefinedByInstance, U extends UserDefinedPredicate & Serializable> extends UserDefined { private final String toString; private final U udpInstance; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index 92fb51210b..49fd10cc81 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index 5c81729418..ac8afe44a7 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 4e127d1768..35545f1c47 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index c097e96a66..c21212ac14 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -113,7 +113,7 @@ private > Set expandDictionary(ColumnChunkMetaData me for (int i = 0; i <= dict.getMaxId(); i++) { dictSet.add((T) dictValueProvider.apply(i)); } - + return dictSet; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 3bd41b343b..fd92c29fd5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java index 77fa8b3395..40527775bf 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/compat/TestRowGroupFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index ce2e28867d..65de8fd8a2 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY From 4b2d5a4abe689e995f3c5f9b48dcba81cfeb68f4 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 13 Aug 2021 19:47:39 -0700 Subject: [PATCH 03/16] fix white space --- .../java/org/apache/parquet/filter2/predicate/Operators.java | 2 +- parquet-hadoop/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index ec80c03a76..4c702c56ae 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -451,7 +451,7 @@ public R accept(Visitor visitor) { return visitor.visit(this); } } - + public static final class UserDefinedByClass, U extends UserDefinedPredicate> extends UserDefined { private final Class udpClass; private final String toString; diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 6c2f76cff4..8e42d42096 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -1,5 +1,5 @@ org.apache.parquet.column.values.dictionary.DictionaryValuesWriter#dictionaryByteSize - org.apache.parquet.filter2.predicate.FilterPredicate From 507ed47e1f7f3a1371a45b05dab3f06500d510d0 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 31 Aug 2021 17:55:13 -0700 Subject: [PATCH 07/16] address comments --- .../parquet/filter2/predicate/Operators.java | 42 ++++++++-------- .../columnindex/ColumnIndexBuilder.java | 49 ++++++++++++++++--- .../columnindex/TestColumnIndexBuilder.java | 2 +- .../columnindex/TestColumnIndexFilter.java | 9 ++-- 4 files changed, 70 insertions(+), 32 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 1975276755..bed0ea326a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -250,27 +250,16 @@ public R accept(Visitor visitor) { } } - // base class for In and NotIn + // base class for In and NotIn. In is used to filter data based on a list of values. NotIn is used to filter data that + // are not in the list of values. public static abstract class SetColumnFilterPredicate> implements FilterPredicate, Serializable { private final Column column; private final Set values; - private final String toString; protected SetColumnFilterPredicate(Column column, Set values) { this.column = Objects.requireNonNull(column, "column cannot be null"); this.values = Objects.requireNonNull(values, "values cannot be null"); checkArgument(!values.isEmpty(), "values in SetColumnFilterPredicate shouldn't be empty!"); - - String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); - StringBuilder str = new StringBuilder(); - int iter = 0; - for (T value : values) { - if (iter >= 100) break; - str.append(value).append(", "); - iter++; - } - String valueStr = values.size() <= 100 ? str.substring(0, str.length() - 2) : str + "..."; - this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + valueStr + ")"; } public Column getColumn() { @@ -283,7 +272,18 @@ public Set getValues() { @Override public String toString() { - return toString; + String name = getClass().getSimpleName().toLowerCase(Locale.ENGLISH); + StringBuilder str = new StringBuilder(); + str.append(name).append("(").append(column.getColumnPath().toDotString()).append(", "); + int iter = 0; + for (T value : values) { + if (iter >= 100) break; + str.append(value).append(", "); + iter++; + } + int length = str.length(); + str = values.size() <= 100 ? str.delete(length - 2, length) : str.append("..."); + return str.append(")").toString(); } @Override @@ -291,12 +291,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SetColumnFilterPredicate that = (SetColumnFilterPredicate) o; - return column.equals(that.column) && values.equals(that.values) && Objects.equals(toString, that.toString); + return column.equals(that.column) && values.equals(that.values); } @Override public int hashCode() { - return Objects.hash(column, values, toString); + return Objects.hash(column, values); } } @@ -451,7 +451,7 @@ public R accept(Visitor visitor) { return visitor.visit(this); } } - + public static final class UserDefinedByClass, U extends UserDefinedPredicate> extends UserDefined { private final Class udpClass; private final String toString; @@ -507,7 +507,7 @@ public int hashCode() { return result; } } - + public static final class UserDefinedByInstance, U extends UserDefinedPredicate & Serializable> extends UserDefined { private final String toString; private final U udpInstance; diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 1fb59a7973..cab1455b56 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -21,10 +21,7 @@ import static java.util.Objects.requireNonNull; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Formatter; -import java.util.List; -import java.util.PrimitiveIterator; +import java.util.*; import java.util.function.IntConsumer; import java.util.function.IntPredicate; @@ -293,8 +290,48 @@ public > PrimitiveIterator.OfInt visit(NotEq notEq) { @Override public > PrimitiveIterator.OfInt visit(In in) { - IntSet indexes = getMatchingIndexes(in); - return IndexIterator.filter(getPageCount(), indexes::contains); + Set values = in.getValues(); + TreeSet myTreeSet = new TreeSet<>(); + IntSet matchingIndexes1 = new IntOpenHashSet(); // for null + Iterator it = values.iterator(); + while(it.hasNext()) { + T value = it.next(); + if (value != null) { + myTreeSet.add(value); + } else { + if (nullCounts == null) { + // Searching for nulls so if we don't have null related statistics we have to return all pages + return IndexIterator.all(getPageCount()); + } else { + for (int i = 0; i < nullCounts.length; i++) { + if (nullCounts[i] > 0) { + matchingIndexes1.add(i); + } + } + } + } + } + + IntSet matchingIndexes2 = new IntOpenHashSet(); + IntSet matchingIndexes3 = new IntOpenHashSet(); + + T min = myTreeSet.first(); + T max = myTreeSet.last(); + + // We don't want to iterate through each of the values in the IN set to compare, + // because the size of the IN set might be very large. Instead, we want to only + // compare the max and min value of the IN set to see if the page might contain the + // values in the IN set. + // If the values in a page are <= the max value in the IN set, + // and >= the min value in the IN set, then the page might contain + // the values in the IN set. + getBoundaryOrder().ltEq(createValueComparator(max)) + .forEachRemaining((int index) -> matchingIndexes2.add(index)); + getBoundaryOrder().gtEq(createValueComparator(min)) + .forEachRemaining((int index) -> matchingIndexes3.add(index)); + matchingIndexes2.retainAll(matchingIndexes3); + matchingIndexes2.addAll(matchingIndexes1); // add the matching null pages + return IndexIterator.filter(getPageCount(), pageIndex -> matchingIndexes2.contains(pageIndex)); } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java index 7c5ed0d1b6..9c1d4dcedd 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java @@ -857,7 +857,7 @@ public void testBuildDouble() { set2.add(0.0); set2.add(3.5); set2.add(-346.0); - assertCorrectFiltering(columnIndex, in(col, set2), 1, 5, 7); + assertCorrectFiltering(columnIndex, in(col, set2), 1, 2, 5, 7); assertCorrectFiltering(columnIndex, notIn(col, set2), 0, 2, 3, 4, 6, 8); set2.add(null); assertCorrectFiltering(columnIndex, in(col, set2), 0, 1, 2, 3, 4, 5, 6, 7, 8); diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java index b6725789d0..aad9d0b0ec 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java @@ -370,7 +370,7 @@ public void testFiltering() { 7, 8, 9, 10, 11, 12, 13); set1.add(1); assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column1"), set1)), STORE, paths, TOTAL_ROW_COUNT), - 0, 7, 8, 9, 10, 11, 12, 13); + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13); assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column1"), set1)), STORE, paths, TOTAL_ROW_COUNT), 1, 2, 3, 4, 5, 6, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); @@ -378,7 +378,7 @@ public void testFiltering() { set2.add(fromString("Zulu")); set2.add(fromString("Alfa")); assertRows(calculateRowRanges(FilterCompat.get(in(binaryColumn("column2"), set2)), STORE, paths, TOTAL_ROW_COUNT), - 0, 29); + 0, 1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); assertRows(calculateRowRanges(FilterCompat.get(notIn(binaryColumn("column2"), set2)), STORE, paths, TOTAL_ROW_COUNT), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28); @@ -390,12 +390,12 @@ public void testFiltering() { 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 23, 24, 25, 26, 27, 28, 29); set3.add(9.98); assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), - 0, 1, 2, 3, 4, 5, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22); + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25); assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), 6, 7, 8, 9, 23, 24, 25, 26, 27, 28, 29); set3.add(null); assertRows(calculateRowRanges(FilterCompat.get(in(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), - 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 26, 27, 28, 29); + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29); assertRows(calculateRowRanges(FilterCompat.get(notIn(doubleColumn("column3"), set3)), STORE, paths, TOTAL_ROW_COUNT), 23, 24, 25); @@ -410,6 +410,7 @@ public void testFiltering() { Set set5 = new HashSet<>(); set5.add(7); set5.add(20); + System.out.println(in(intColumn("column5"), set5).toString()); assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column5"), set5)), STORE, paths, TOTAL_ROW_COUNT), new long[0]); assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column5"), set5)), STORE, paths, TOTAL_ROW_COUNT), From 02af9c4021d3bd61f81ff910a307dd64783076c4 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 14 Sep 2021 22:28:54 -0700 Subject: [PATCH 08/16] address comments --- .../parquet/filter2/predicate/Operators.java | 6 ++++-- .../columnindex/TestColumnIndexFilter.java | 1 - ...ementallyUpdatedFilterPredicateGenerator.java | 2 +- .../bloomfilterlevel/BloomFilterImpl.java | 16 ++++++---------- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index bed0ea326a..7e11948564 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -250,8 +250,10 @@ public R accept(Visitor visitor) { } } - // base class for In and NotIn. In is used to filter data based on a list of values. NotIn is used to filter data that - // are not in the list of values. + /* + * Base class for {@link In} and {@link NotIn}. {@link In} is used to filter data based on a list of values. + * {@link NotIn} is used to filter data that are not in the list of values. + */ public static abstract class SetColumnFilterPredicate> implements FilterPredicate, Serializable { private final Column column; private final Set values; diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java index aad9d0b0ec..47ea5fc5c1 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java @@ -410,7 +410,6 @@ public void testFiltering() { Set set5 = new HashSet<>(); set5.add(7); set5.add(20); - System.out.println(in(intColumn("column5"), set5).toString()); assertRows(calculateRowRanges(FilterCompat.get(in(intColumn("column5"), set5)), STORE, paths, TOTAL_ROW_COUNT), new long[0]); assertRows(calculateRowRanges(FilterCompat.get(notIn(intColumn("column5"), set5)), STORE, paths, TOTAL_ROW_COUNT), diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index 6cded6e475..a493968246 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -73,7 +73,7 @@ public void run() throws IOException { "import org.apache.parquet.filter2.predicate.Operators.Eq;\n" + "import org.apache.parquet.filter2.predicate.Operators.Gt;\n" + "import org.apache.parquet.filter2.predicate.Operators.GtEq;\n" + - "import org.apache.parquet.filter2.predicate.Operators.In;\n" + + "import org.apache.parquet.filter2.predicate.Operators.In;\n" + "import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;\n" + "import org.apache.parquet.filter2.predicate.Operators.Lt;\n" + "import org.apache.parquet.filter2.predicate.Operators.LtEq;\n" + diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java index 2303d34446..d069836337 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java @@ -138,18 +138,14 @@ public > Boolean visit(Operators.In in) { return BLOCK_CANNOT_MATCH; } - try { - BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta); - if (bloomFilter != null) { - for (T value : values) { - if (bloomFilter.findHash(bloomFilter.hash(value))) { - return BLOCK_MIGHT_MATCH; - } + BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta); + if (bloomFilter != null) { + for (T value : values) { + if (bloomFilter.findHash(bloomFilter.hash(value))) { + return BLOCK_MIGHT_MATCH; } - return BLOCK_CANNOT_MATCH; } - } catch (RuntimeException e) { - LOG.warn(e.getMessage()); + return BLOCK_CANNOT_MATCH; } return BLOCK_MIGHT_MATCH; } From 562888d1a3d5dcea121357315d48b266c06073d4 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 15 Sep 2021 15:15:31 -0700 Subject: [PATCH 09/16] /* -> /** for java doc --- .../java/org/apache/parquet/filter2/predicate/Operators.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 7e11948564..e97a1b9988 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -250,7 +250,7 @@ public R accept(Visitor visitor) { } } - /* + /** * Base class for {@link In} and {@link NotIn}. {@link In} is used to filter data based on a list of values. * {@link NotIn} is used to filter data that are not in the list of values. */ From 87d0791e2cf5d6787e35d7145c7183ec33519b7f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 22 Sep 2021 15:40:34 -0700 Subject: [PATCH 10/16] address comments --- .../columnindex/ColumnIndexBuilder.java | 42 +++++++++++------ .../statisticslevel/StatisticsFilter.java | 46 +++++++++++-------- .../recordlevel/TestRecordLevelFilters.java | 42 ++++++++++++++--- .../statisticslevel/TestStatisticsFilter.java | 30 ++++++------ 4 files changed, 105 insertions(+), 55 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index cab1455b56..900d251a3b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -291,32 +291,29 @@ public > PrimitiveIterator.OfInt visit(NotEq notEq) { @Override public > PrimitiveIterator.OfInt visit(In in) { Set values = in.getValues(); - TreeSet myTreeSet = new TreeSet<>(); - IntSet matchingIndexes1 = new IntOpenHashSet(); // for null + IntSet matchingIndexesForNull = new IntOpenHashSet(); // for null Iterator it = values.iterator(); while(it.hasNext()) { T value = it.next(); - if (value != null) { - myTreeSet.add(value); - } else { + if (value == null) { if (nullCounts == null) { // Searching for nulls so if we don't have null related statistics we have to return all pages return IndexIterator.all(getPageCount()); } else { for (int i = 0; i < nullCounts.length; i++) { if (nullCounts[i] > 0) { - matchingIndexes1.add(i); + matchingIndexesForNull.add(i); } } } } } - IntSet matchingIndexes2 = new IntOpenHashSet(); - IntSet matchingIndexes3 = new IntOpenHashSet(); + IntSet matchingIndexesLessThanMax = new IntOpenHashSet(); + IntSet matchingIndexesLargerThanMin = new IntOpenHashSet(); - T min = myTreeSet.first(); - T max = myTreeSet.last(); + T min = getMaxOrMin(false, values); + T max = getMaxOrMin(true, values); // We don't want to iterate through each of the values in the IN set to compare, // because the size of the IN set might be very large. Instead, we want to only @@ -326,12 +323,27 @@ public > PrimitiveIterator.OfInt visit(In in) { // and >= the min value in the IN set, then the page might contain // the values in the IN set. getBoundaryOrder().ltEq(createValueComparator(max)) - .forEachRemaining((int index) -> matchingIndexes2.add(index)); + .forEachRemaining((int index) -> matchingIndexesLessThanMax.add(index)); getBoundaryOrder().gtEq(createValueComparator(min)) - .forEachRemaining((int index) -> matchingIndexes3.add(index)); - matchingIndexes2.retainAll(matchingIndexes3); - matchingIndexes2.addAll(matchingIndexes1); // add the matching null pages - return IndexIterator.filter(getPageCount(), pageIndex -> matchingIndexes2.contains(pageIndex)); + .forEachRemaining((int index) -> matchingIndexesLargerThanMin.add(index)); + matchingIndexesLessThanMax.retainAll(matchingIndexesLargerThanMin); + IntSet matchingIndex = matchingIndexesLessThanMax; + matchingIndex.addAll(matchingIndexesForNull); // add the matching null pages + return IndexIterator.filter(getPageCount(), pageIndex -> matchingIndex.contains(pageIndex)); + } + + private > T getMaxOrMin(boolean isMax, Set values) { + T res = null; + for (T element : values) { + if (res == null) { + res = element; + } else if (isMax && res != null && element != null && res.compareTo(element) < 0) { + res = element; + } else if (!isMax && res != null && element != null && res.compareTo(element) > 0) { + res = element; + } + } + return res; } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index fd92c29fd5..e4ede18777 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,26 +186,36 @@ public > Boolean visit(In in) { return BLOCK_MIGHT_MATCH; } + if (stats.isNumNullsSet()) { + if (stats.getNumNulls() == 0) { + if (values.contains(null) && values.size() == 1) return BLOCK_CANNOT_MATCH; + } else { + if (values.contains(null)) return BLOCK_MIGHT_MATCH; + } + } + // drop if all the element in value < min || all the element in value > max - return allElementCanBeDropped(stats, values, meta); + if (stats.compareMinToValue(getMaxOrMin(true, values)) <= 0 && + stats.compareMaxToValue(getMaxOrMin(false, values)) >= 0) { + return BLOCK_MIGHT_MATCH; + } + else { + return BLOCK_CANNOT_MATCH; + } } - private > Boolean allElementCanBeDropped(Statistics stats, Set values, ColumnChunkMetaData meta) { - for (T value : values) { - if (value != null) { - if (stats.compareMinToValue(value) <= 0 && stats.compareMaxToValue(value) >= 0) - return false; - } else { - // numNulls is not set. We don't know anything about the nulls in this chunk - if (!stats.isNumNullsSet()) { - return false; - } - if (hasNulls(meta)) { - return false; - } + private > T getMaxOrMin(boolean isMax, Set values) { + T res = null; + for (T element : values) { + if (res == null) { + res = element; + } else if (isMax && res != null && element != null && res.compareTo(element) < 0) { + res = element; + } else if (!isMax && res != null && element != null && res.compareTo(element) > 0) { + res = element; } } - return true; + return res; } @Override diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 5a7d02f19f..df2543ba16 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -50,6 +50,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; import static org.apache.parquet.filter2.predicate.FilterApi.eq; import static org.apache.parquet.filter2.predicate.FilterApi.gt; +import static org.apache.parquet.filter2.predicate.FilterApi.in; import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; @@ -146,6 +147,33 @@ public void testAllFilter() throws Exception { assertEquals(new ArrayList(), found); } + @Test + public void testInFilter() throws Exception { + BinaryColumn name = binaryColumn("name"); + + HashSet nameSet = new HashSet<>(); + nameSet.add(Binary.fromString("thing2")); + nameSet.add(Binary.fromString("thing1")); + for (int i = 100; i < 200; i++) { + nameSet.add(Binary.fromString("p" + i)); + } + FilterPredicate pred = in(name, nameSet); + List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); + + List expectedNames = new ArrayList<>(); + expectedNames.add("thing1"); + expectedNames.add("thing2"); + for (int i = 100; i < 200; i++) { + expectedNames.add("p" + i); + } + + assertEquals(expectedNames.get(0), ((Group)(found.get(0))).getString("name", 0)); + assertEquals(expectedNames.get(1), ((Group)(found.get(1))).getString("name", 0)); + for (int i = 2; i < 102; i++) { + assertEquals(expectedNames.get(i), ((Group)(found.get(i))).getString("name", 0)); + } + } + @Test public void testNameNotNull() throws Exception { BinaryColumn name = binaryColumn("name"); @@ -182,7 +210,7 @@ public boolean inverseCanDrop(Statistics statistics) { return false; } } - + public static class SetInFilter extends UserDefinedPredicate implements Serializable { private HashSet hSet; @@ -226,16 +254,16 @@ public boolean keep(User u) { } }); } - + @Test public void testUserDefinedByInstance() throws Exception { LongColumn name = longColumn("id"); final HashSet h = new HashSet(); - h.add(20L); + h.add(20L); h.add(27L); h.add(28L); - + FilterPredicate pred = userDefined(name, new SetInFilter(h)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 65de8fd8a2..305ef519f8 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -299,8 +299,8 @@ public void testInNotIn() { values2.add(5); values2.add(117); values2.add(101); - assertTrue((canDrop(in(intColumn, values2), columnMetas))); - assertFalse((canDrop(notIn(intColumn, values2), columnMetas))); + assertFalse(canDrop(in(intColumn, values2), columnMetas)); + assertFalse(canDrop(notIn(intColumn, values2), columnMetas)); Set values3 = new HashSet<>(); values3.add(1); @@ -314,8 +314,8 @@ public void testInNotIn() { Set values4 = new HashSet<>(); values4.add(50); values4.add(60); - assertFalse((canDrop(in(intColumn, values4), missingMinMaxColumnMetas))); - assertFalse((canDrop(notIn(intColumn, values4), missingMinMaxColumnMetas))); + assertFalse(canDrop(in(intColumn, values4), missingMinMaxColumnMetas)); + assertFalse(canDrop(notIn(intColumn, values4), missingMinMaxColumnMetas)); Set values5 = new HashSet<>(); values5.add(1.0); @@ -323,24 +323,24 @@ public void testInNotIn() { values5.add(95.0); values5.add(107.0); values5.add(99.0); - assertFalse((canDrop(in(doubleColumn, values5), columnMetas))); - assertFalse((canDrop(notIn(doubleColumn, values5), columnMetas))); + assertFalse(canDrop(in(doubleColumn, values5), columnMetas)); + assertFalse(canDrop(notIn(doubleColumn, values5), columnMetas)); Set values6 = new HashSet<>(); values6.add(Binary.fromString("test1")); values6.add(Binary.fromString("test2")); - assertTrue((canDrop(in(missingColumn, values6), columnMetas))); - assertFalse((canDrop(notIn(missingColumn, values6), columnMetas))); + assertTrue(canDrop(in(missingColumn, values6), columnMetas)); + assertFalse(canDrop(notIn(missingColumn, values6), columnMetas)); Set values7 = new HashSet<>(); values7.add(null); - assertFalse((canDrop(in(intColumn, values7), nullColumnMetas))); - assertFalse((canDrop(notIn(intColumn, values7), nullColumnMetas))); + assertFalse(canDrop(in(intColumn, values7), nullColumnMetas)); + assertFalse(canDrop(notIn(intColumn, values7), nullColumnMetas)); Set values8 = new HashSet<>(); values8.add(null); - assertFalse((canDrop(in(missingColumn, values8), columnMetas))); - assertFalse((canDrop(notIn(missingColumn, values8), columnMetas))); + assertFalse(canDrop(in(missingColumn, values8), columnMetas)); + assertFalse(canDrop(notIn(missingColumn, values8), columnMetas)); IntStatistics statsNoNulls = new IntStatistics(); statsNoNulls.setMinMax(10, 100); From 803d51da7a84d5fdebe6517f0db95ff01ac39ea0 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 22 Sep 2021 15:57:46 -0700 Subject: [PATCH 11/16] fix white space --- .../parquet/filter2/predicate/FilterPredicate.java | 6 +++--- .../apache/parquet/filter2/predicate/Operators.java | 10 +++++----- .../IncrementallyUpdatedFilterPredicate.java | 6 +++--- ...IncrementallyUpdatedFilterPredicateBuilderBase.java | 6 +++--- .../IncrementallyUpdatedFilterPredicateGenerator.java | 6 +++--- .../filter2/statisticslevel/StatisticsFilter.java | 6 +++--- .../filter2/statisticslevel/TestStatisticsFilter.java | 6 +++--- 7 files changed, 23 insertions(+), 23 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java index 3be3cf71f6..d9156c2544 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index e97a1b9988..d52aa92495 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -453,7 +453,7 @@ public R accept(Visitor visitor) { return visitor.visit(this); } } - + public static final class UserDefinedByClass, U extends UserDefinedPredicate> extends UserDefined { private final Class udpClass; private final String toString; @@ -509,7 +509,7 @@ public int hashCode() { return result; } } - + public static final class UserDefinedByInstance, U extends UserDefinedPredicate & Serializable> extends UserDefined { private final String toString; private final U udpInstance; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index fc19b61b3d..a280e77c52 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 78b4da754b..481ddef09f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index a493968246..c360fa943c 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index e4ede18777..aa228078e0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 305ef519f8..e9682e6170 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY From ac3ea9ed3c6517bcbc3424fb43c5c58e3581edae Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 22 Sep 2021 16:01:53 -0700 Subject: [PATCH 12/16] fix white space --- .../filter2/recordlevel/TestRecordLevelFilters.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index df2543ba16..0cc98c5f78 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -210,7 +210,7 @@ public boolean inverseCanDrop(Statistics statistics) { return false; } } - + public static class SetInFilter extends UserDefinedPredicate implements Serializable { private HashSet hSet; @@ -254,7 +254,7 @@ public boolean keep(User u) { } }); } - + @Test public void testUserDefinedByInstance() throws Exception { LongColumn name = longColumn("id"); @@ -263,7 +263,7 @@ public void testUserDefinedByInstance() throws Exception { h.add(20L); h.add(27L); h.add(28L); - + FilterPredicate pred = userDefined(name, new SetInFilter(h)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); From 2e77e28da344a3b9444120808046c26d1ca38b04 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 26 Sep 2021 16:12:20 -0700 Subject: [PATCH 13/16] address comments --- .../org/apache/parquet/column/MinMax.java | 89 +++++++++++++++++++ .../columnindex/ColumnIndexBuilder.java | 38 ++++---- ...ntallyUpdatedFilterPredicateGenerator.java | 8 +- .../statisticslevel/StatisticsFilter.java | 32 +++---- .../recordlevel/TestRecordLevelFilters.java | 18 ++-- .../hadoop/TestColumnIndexFiltering.java | 39 ++++++++ 6 files changed, 172 insertions(+), 52 deletions(-) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/MinMax.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java new file mode 100644 index 0000000000..2385a0e831 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column; + +import java.util.Iterator; +import java.util.Set; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveComparator; + +/** + * This class calculates the max and min values of a Set. + */ +public final class MinMax { + private PrimitiveComparator comparator; + private Iterator iterator; + private T min = null; + private T max = null; + + public MinMax(PrimitiveComparator comparator, Iterator iterator) { + this.comparator = comparator; + this.iterator = iterator; + getMinAndMax(); + } + + public T getMin() { + return min; + } + + public T getMax() { + return max; + } + + private void getMinAndMax() { + while(iterator.hasNext()) { + T element = iterator.next(); + if (max == null) { + max = element; + } else if (max != null && element != null) { + if ((element instanceof Integer && + ((PrimitiveComparator)comparator).compare((Integer)max, (Integer)element) < 0) || + (element instanceof Binary && + ((PrimitiveComparator)comparator).compare((Binary)max, (Binary)element) < 0) || + (element instanceof Double && + ((PrimitiveComparator)comparator).compare((Double)max, (Double)element) < 0) || + (element instanceof Float && + ((PrimitiveComparator)comparator).compare((Float)max, (Float)element) < 0) || + (element instanceof Boolean && + ((PrimitiveComparator)comparator).compare((Boolean)max, (Boolean)element) < 0) || + (element instanceof Long && + ((PrimitiveComparator)comparator).compare((Long) max, (Long)element) < 0)) + max = element; + } + if (min == null) { + min = element; + } else if (min != null && element != null) { + if ((element instanceof Integer && + ((PrimitiveComparator)comparator).compare((Integer)min, (Integer)element) > 0) || + (element instanceof Binary && + ((PrimitiveComparator)comparator).compare((Binary)min, (Binary)element) > 0) || + (element instanceof Double && + ((PrimitiveComparator)comparator).compare((Double)min, (Double)element) > 0) || + (element instanceof Float && + ((PrimitiveComparator)comparator).compare((Float)min, (Float)element) > 0) || + (element instanceof Boolean && + ((PrimitiveComparator)comparator).compare((Boolean)min, (Boolean)element) > 0) || + (element instanceof Long && + ((PrimitiveComparator)comparator).compare((Long)min, (Long)element) > 0)) + min = element; + } + } + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 900d251a3b..609db106fb 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -21,10 +21,16 @@ import static java.util.Objects.requireNonNull; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Formatter; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; +import java.util.Set; import java.util.function.IntConsumer; import java.util.function.IntPredicate; +import org.apache.parquet.column.MinMax; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Eq; @@ -305,47 +311,37 @@ public > PrimitiveIterator.OfInt visit(In in) { matchingIndexesForNull.add(i); } } + if (values.size() == 1) { + return IndexIterator.filter(getPageCount(), pageIndex -> matchingIndexesForNull.contains(pageIndex)); + } } } } IntSet matchingIndexesLessThanMax = new IntOpenHashSet(); - IntSet matchingIndexesLargerThanMin = new IntOpenHashSet(); + IntSet matchingIndexesGreaterThanMin = new IntOpenHashSet(); - T min = getMaxOrMin(false, values); - T max = getMaxOrMin(true, values); + MinMax minMax = new MinMax(comparator, values.iterator()); + T min = minMax.getMin(); + T max = minMax.getMax(); // We don't want to iterate through each of the values in the IN set to compare, // because the size of the IN set might be very large. Instead, we want to only // compare the max and min value of the IN set to see if the page might contain the // values in the IN set. - // If the values in a page are <= the max value in the IN set, + // If there might be values in a page that are <= the max value in the IN set, // and >= the min value in the IN set, then the page might contain // the values in the IN set. getBoundaryOrder().ltEq(createValueComparator(max)) .forEachRemaining((int index) -> matchingIndexesLessThanMax.add(index)); getBoundaryOrder().gtEq(createValueComparator(min)) - .forEachRemaining((int index) -> matchingIndexesLargerThanMin.add(index)); - matchingIndexesLessThanMax.retainAll(matchingIndexesLargerThanMin); + .forEachRemaining((int index) -> matchingIndexesGreaterThanMin.add(index)); + matchingIndexesLessThanMax.retainAll(matchingIndexesGreaterThanMin); IntSet matchingIndex = matchingIndexesLessThanMax; matchingIndex.addAll(matchingIndexesForNull); // add the matching null pages return IndexIterator.filter(getPageCount(), pageIndex -> matchingIndex.contains(pageIndex)); } - private > T getMaxOrMin(boolean isMax, Set values) { - T res = null; - for (T element : values) { - if (res == null) { - res = element; - } else if (isMax && res != null && element != null && res.compareTo(element) < 0) { - res = element; - } else if (!isMax && res != null && element != null && res.compareTo(element) > 0) { - res = element; - } - } - return res; - } - @Override public > PrimitiveIterator.OfInt visit(NotIn notIn) { IntSet indexes = getMatchingIndexes(notIn); diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index c360fa943c..910f55f61b 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -250,7 +250,7 @@ private void addInequalityCase(TypeInfo info, String op) throws IOException { private void addInNotInCase(TypeInfo info, boolean isEq) throws IOException { add(" if (clazz.equals(" + info.className + ".class)) {\n" + - " if (pred.getValues() == null) {\n" + + " if (pred.getValues().contains(null)) {\n" + " valueInspector = new ValueInspector() {\n" + " @Override\n" + " public void updateNull() {\n" + diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index aa228078e0..49b5e017aa 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,6 +43,7 @@ import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.column.MinMax; /** * Applies a {@link org.apache.parquet.filter2.predicate.FilterPredicate} to statistics about a group of @@ -194,30 +195,19 @@ public > Boolean visit(In in) { } } + MinMax minMax = new MinMax(meta.getPrimitiveType().comparator(), values.iterator()); + T min = minMax.getMin(); + T max = minMax.getMax(); + // drop if all the element in value < min || all the element in value > max - if (stats.compareMinToValue(getMaxOrMin(true, values)) <= 0 && - stats.compareMaxToValue(getMaxOrMin(false, values)) >= 0) { + if (stats.compareMinToValue(max) <= 0 && + stats.compareMaxToValue(min) >= 0) { return BLOCK_MIGHT_MATCH; - } - else { + } else { return BLOCK_CANNOT_MATCH; } } - private > T getMaxOrMin(boolean isMax, Set values) { - T res = null; - for (T element : values) { - if (res == null) { - res = element; - } else if (isMax && res != null && element != null && res.compareTo(element) < 0) { - res = element; - } else if (!isMax && res != null && element != null && res.compareTo(element) > 0) { - res = element; - } - } - return res; - } - @Override public > Boolean visit(NotIn notIn) { return BLOCK_MIGHT_MATCH; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 0cc98c5f78..952a3795f9 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -166,12 +166,18 @@ public void testInFilter() throws Exception { for (int i = 100; i < 200; i++) { expectedNames.add("p" + i); } + expectedNames.add("dummy1"); + expectedNames.add("dummy2"); + expectedNames.add("dummy3"); + // validate that all the values returned by the reader fulfills the filter and there are no values left out, + // i.e. "thing1", "thing2" and from "p100" to "p199" and nothing else. assertEquals(expectedNames.get(0), ((Group)(found.get(0))).getString("name", 0)); assertEquals(expectedNames.get(1), ((Group)(found.get(1))).getString("name", 0)); for (int i = 2; i < 102; i++) { assertEquals(expectedNames.get(i), ((Group)(found.get(i))).getString("name", 0)); } + assert(found.size() == 102); } @Test @@ -210,7 +216,7 @@ public boolean inverseCanDrop(Statistics statistics) { return false; } } - + public static class SetInFilter extends UserDefinedPredicate implements Serializable { private HashSet hSet; @@ -254,7 +260,7 @@ public boolean keep(User u) { } }); } - + @Test public void testUserDefinedByInstance() throws Exception { LongColumn name = longColumn("id"); @@ -263,7 +269,7 @@ public void testUserDefinedByInstance() throws Exception { h.add(20L); h.add(27L); h.add(28L); - + FilterPredicate pred = userDefined(name, new SetInFilter(h)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index a66533d0f9..5e181059f0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -25,6 +25,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn; import static org.apache.parquet.filter2.predicate.FilterApi.eq; import static org.apache.parquet.filter2.predicate.FilterApi.gtEq; +import static org.apache.parquet.filter2.predicate.FilterApi.in; import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; import static org.apache.parquet.filter2.predicate.FilterApi.lt; import static org.apache.parquet.filter2.predicate.FilterApi.ltEq; @@ -53,10 +54,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -377,12 +380,48 @@ public void testSimpleFiltering() throws IOException { assertCorrectFiltering( record -> record.getId() == 1234, eq(longColumn("id"), 1234l)); + + Set idSet = new HashSet<>(); + idSet.add(1234l); + idSet.add(5678l); + idSet.add(1357l); + idSet.add(111l); + idSet.add(6666l); + idSet.add(2l); + idSet.add(2468l); + + assertCorrectFiltering( + record -> (record.getId() == 1234 || record.getId() == 5678 || record.getId() == 1357 || + record.getId() == 111 || record.getId() == 6666 || record.getId() == 2 || record.getId() == 2468), + in(longColumn("id"), idSet) + ); + assertCorrectFiltering( record -> "miller".equals(record.getName()), eq(binaryColumn("name"), Binary.fromString("miller"))); + + Set nameSet = new HashSet<>(); + nameSet.add(Binary.fromString("anderson")); + nameSet.add(Binary.fromString("miller")); + nameSet.add(Binary.fromString("thomas")); + nameSet.add(Binary.fromString("williams")); + + assertCorrectFiltering( + record -> ("anderson".equals(record.getName()) || "miller".equals(record.getName()) || + "thomas".equals(record.getName()) || "williams".equals(record.getName())), + in(binaryColumn("name"), nameSet) + ); + assertCorrectFiltering( record -> record.getName() == null, eq(binaryColumn("name"), null)); + + Set nullSet = new HashSet<>(); + nullSet.add(null); + + assertCorrectFiltering( + record -> record.getName() == null, + in(binaryColumn("name"), nullSet)); } @Test From b0976aa1468dd04c68c0dd8e7c3af437e3ca0ef5 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 26 Sep 2021 16:19:38 -0700 Subject: [PATCH 14/16] fix white space --- ...IncrementallyUpdatedFilterPredicateGenerator.java | 6 +++--- .../filter2/statisticslevel/StatisticsFilter.java | 6 +++--- .../filter2/recordlevel/TestRecordLevelFilters.java | 12 ++++++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index 910f55f61b..2827167cb2 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 49b5e017aa..40d5f9f068 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 952a3795f9..4c3538c3d5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -216,7 +216,7 @@ public boolean inverseCanDrop(Statistics statistics) { return false; } } - + public static class SetInFilter extends UserDefinedPredicate implements Serializable { private HashSet hSet; @@ -260,7 +260,7 @@ public boolean keep(User u) { } }); } - + @Test public void testUserDefinedByInstance() throws Exception { LongColumn name = longColumn("id"); @@ -269,7 +269,7 @@ public void testUserDefinedByInstance() throws Exception { h.add(20L); h.add(27L); h.add(28L); - + FilterPredicate pred = userDefined(name, new SetInFilter(h)); List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(pred)); From 3571d43280c0e7dd6af19b8ba0987c82f3a70c4f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 27 Sep 2021 11:46:20 -0700 Subject: [PATCH 15/16] address comments --- .../org/apache/parquet/column/MinMax.java | 53 +++++-------------- .../columnindex/ColumnIndexBuilder.java | 2 +- .../statisticslevel/StatisticsFilter.java | 2 +- 3 files changed, 14 insertions(+), 43 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java index 2385a0e831..4e98505ed9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java @@ -18,25 +18,17 @@ */ package org.apache.parquet.column; -import java.util.Iterator; -import java.util.Set; - -import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveComparator; /** - * This class calculates the max and min values of a Set. + * This class calculates the max and min values of an iterable collection. */ public final class MinMax { - private PrimitiveComparator comparator; - private Iterator iterator; private T min = null; private T max = null; - public MinMax(PrimitiveComparator comparator, Iterator iterator) { - this.comparator = comparator; - this.iterator = iterator; - getMinAndMax(); + public MinMax(PrimitiveComparator comparator, Iterable iterable) { + getMinAndMax(comparator, iterable); } public T getMin() { @@ -47,43 +39,22 @@ public T getMax() { return max; } - private void getMinAndMax() { - while(iterator.hasNext()) { - T element = iterator.next(); + private void getMinAndMax(PrimitiveComparator comparator, Iterable iterable) { + iterable.forEach(element -> { if (max == null) { max = element; - } else if (max != null && element != null) { - if ((element instanceof Integer && - ((PrimitiveComparator)comparator).compare((Integer)max, (Integer)element) < 0) || - (element instanceof Binary && - ((PrimitiveComparator)comparator).compare((Binary)max, (Binary)element) < 0) || - (element instanceof Double && - ((PrimitiveComparator)comparator).compare((Double)max, (Double)element) < 0) || - (element instanceof Float && - ((PrimitiveComparator)comparator).compare((Float)max, (Float)element) < 0) || - (element instanceof Boolean && - ((PrimitiveComparator)comparator).compare((Boolean)max, (Boolean)element) < 0) || - (element instanceof Long && - ((PrimitiveComparator)comparator).compare((Long) max, (Long)element) < 0)) + } else if (element != null) { + if (comparator.compare(max, element) < 0) { max = element; + } } if (min == null) { min = element; - } else if (min != null && element != null) { - if ((element instanceof Integer && - ((PrimitiveComparator)comparator).compare((Integer)min, (Integer)element) > 0) || - (element instanceof Binary && - ((PrimitiveComparator)comparator).compare((Binary)min, (Binary)element) > 0) || - (element instanceof Double && - ((PrimitiveComparator)comparator).compare((Double)min, (Double)element) > 0) || - (element instanceof Float && - ((PrimitiveComparator)comparator).compare((Float)min, (Float)element) > 0) || - (element instanceof Boolean && - ((PrimitiveComparator)comparator).compare((Boolean)min, (Boolean)element) > 0) || - (element instanceof Long && - ((PrimitiveComparator)comparator).compare((Long)min, (Long)element) > 0)) + } else if (element != null) { + if (comparator.compare(min, element) > 0) { min = element; + } } - } + }); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 609db106fb..fc3859b9ca 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -321,7 +321,7 @@ public > PrimitiveIterator.OfInt visit(In in) { IntSet matchingIndexesLessThanMax = new IntOpenHashSet(); IntSet matchingIndexesGreaterThanMin = new IntOpenHashSet(); - MinMax minMax = new MinMax(comparator, values.iterator()); + MinMax minMax = new MinMax(comparator, values); T min = minMax.getMin(); T max = minMax.getMax(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 40d5f9f068..23609a93d5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -195,7 +195,7 @@ public > Boolean visit(In in) { } } - MinMax minMax = new MinMax(meta.getPrimitiveType().comparator(), values.iterator()); + MinMax minMax = new MinMax(meta.getPrimitiveType().comparator(), values); T min = minMax.getMin(); T max = minMax.getMax(); From 65f2a35a2a6412022aea8845b31d0433a044973f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 28 Sep 2021 08:25:34 -0700 Subject: [PATCH 16/16] address comments --- .../main/java/org/apache/parquet/column/MinMax.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java index 4e98505ed9..c97b681b5a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/MinMax.java @@ -27,7 +27,7 @@ public final class MinMax { private T min = null; private T max = null; - public MinMax(PrimitiveComparator comparator, Iterable iterable) { + public MinMax(PrimitiveComparator comparator, Iterable iterable) { getMinAndMax(comparator, iterable); } @@ -39,21 +39,17 @@ public T getMax() { return max; } - private void getMinAndMax(PrimitiveComparator comparator, Iterable iterable) { + private void getMinAndMax(PrimitiveComparator comparator, Iterable iterable) { iterable.forEach(element -> { if (max == null) { max = element; - } else if (element != null) { - if (comparator.compare(max, element) < 0) { + } else if (element != null && comparator.compare(max, element) < 0) { max = element; - } } if (min == null) { min = element; - } else if (element != null) { - if (comparator.compare(min, element) > 0) { + } else if (element != null && comparator.compare(min, element) > 0) { min = element; - } } }); }