diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index 01ebdd0a9310..4558769a4e5f 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.Lists; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.indexer.HadoopTuningConfig; @@ -56,6 +57,10 @@ public class MaterializedViewSupervisorSpecTest { + static { + NullHandling.initializeForTests(); + } + @Rule public ExpectedException expectedException = ExpectedException.none(); diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 8dc6e53a4e2b..c60e9a0823ed 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.indexer.HadoopIOConfig; @@ -70,6 +71,10 @@ public class MaterializedViewSupervisorTest { + static { + NullHandling.initializeForTests(); + } + @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChestTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChestTest.java index b96ff17f670b..cdee091893ae 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChestTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryQueryToolChestTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.MapBasedRow; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -57,6 +58,10 @@ public class MaterializedViewQueryQueryToolChestTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + static { + NullHandling.initializeForTests(); + } + @Test public void testMakePostComputeManipulatorFn() { diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java index 1432f519ef73..4bd80313b0d6 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/MaterializedViewQueryTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunnerTestHelper; @@ -44,6 +45,11 @@ public class MaterializedViewQueryTest { private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); + + static { + NullHandling.initializeForTests(); + } + private DataSourceOptimizer optimizer; @Before diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java index d01b4c536e03..8528c4c1d450 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/BloomDimFilterTest.java @@ -59,39 +59,45 @@ public class BloomDimFilterTest extends BaseFilterTest { private static final String TIMESTAMP_COLUMN = "timestamp"; - private static final InputRowParser> PARSER = new MapInputRowParser( - new TimeAndDimsParseSpec( - new TimestampSpec(TIMESTAMP_COLUMN, "iso", DateTimes.of("2000")), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3", "dim6")), - null, - null - ) - ) - ); - - private static final List ROWS = ImmutableList.of( - PARSER.parseBatch(ImmutableMap.of( - "dim0", - "0", - "dim1", - "", - "dim2", - ImmutableList.of("a", "b"), - "dim6", - "2017-07-25" - )).get(0), - PARSER.parseBatch(ImmutableMap.of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of(), "dim6", "2017-07-25")) - .get(0), - PARSER.parseBatch(ImmutableMap.of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""), "dim6", "2017-05-25")) - .get(0), - PARSER.parseBatch(ImmutableMap.of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0), - PARSER.parseBatch(ImmutableMap.of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0), - PARSER.parseBatch(ImmutableMap.of("dim0", "5", "dim1", "abc")).get(0) - ); + private static final InputRowParser> PARSER; + + private static final List ROWS; private static DefaultObjectMapper mapper = new DefaultObjectMapper(); + static { + NullHandling.initializeForTests(); + PARSER = new MapInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec(TIMESTAMP_COLUMN, "iso", DateTimes.of("2000")), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim0", "dim1", "dim2", "dim3", "dim6")), + null, + null + ) + ) + ); + ROWS = ImmutableList.of( + PARSER.parseBatch(ImmutableMap.of( + "dim0", + "0", + "dim1", + "", + "dim2", + ImmutableList.of("a", "b"), + "dim6", + "2017-07-25" + )).get(0), + PARSER.parseBatch(ImmutableMap.of("dim0", "1", "dim1", "10", "dim2", ImmutableList.of(), "dim6", "2017-07-25")) + .get(0), + PARSER.parseBatch(ImmutableMap.of("dim0", "2", "dim1", "2", "dim2", ImmutableList.of(""), "dim6", "2017-05-25")) + .get(0), + PARSER.parseBatch(ImmutableMap.of("dim0", "3", "dim1", "1", "dim2", ImmutableList.of("a"))).get(0), + PARSER.parseBatch(ImmutableMap.of("dim0", "4", "dim1", "def", "dim2", ImmutableList.of("c"))).get(0), + PARSER.parseBatch(ImmutableMap.of("dim0", "5", "dim1", "abc")).get(0) + ); + } + public BloomDimFilterTest( String testName, IndexBuilder indexBuilder, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 34ef75cc87a3..f8e86c0dc87d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -40,6 +40,7 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; @@ -211,6 +212,7 @@ public byte[] value() ); static { + NullHandling.initializeForTests(); Stream.concat( new KafkaIndexTaskModule().getJacksonModules().stream(), Stream.of(TEST_MODULE) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index e17ce1e2f31c..d5cbd0d8554b 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -62,30 +62,31 @@ public class KinesisSamplerSpecTest extends EasyMockSupport { private static final String STREAM = "sampling"; private static final String SHARD_ID = "1"; - private static final DataSchema DATA_SCHEMA = new DataSchema( - "test_ds", - new TimestampSpec("timestamp", "iso", null), - new DimensionsSpec( - Arrays.asList( - new StringDimensionSchema("dim1"), - new StringDimensionSchema("dim1t"), - new StringDimensionSchema("dim2"), - new LongDimensionSchema("dimLong"), - new FloatDimensionSchema("dimFloat") - ), - null, - null - ), - new AggregatorFactory[]{ - new DoubleSumAggregatorFactory("met1sum", "met1"), - new CountAggregatorFactory("rows") - }, - new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), - null - ); + private static final DataSchema DATA_SCHEMA; static { NullHandling.initializeForTests(); + DATA_SCHEMA = new DataSchema( + "test_ds", + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim1t"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ), + null, + null + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("met1sum", "met1"), + new CountAggregatorFactory("rows") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null + ); } private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericAggregator.java new file mode 100644 index 000000000000..a071751be878 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericAggregator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; + +/** + * Null-aware numeric {@link Aggregator} whose input is not nullable, but which should be null valued if no + * values are aggregated at all. + * + * The result of this aggregator will be null only if no values are aggregated at all, otherwise the result will + * be the aggregated value of the delegate aggregator. This class is only used when SQL compatible null handling + * is enabled. + * + * @see NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory) + * @see NonnullNumericBufferAggregator for the non-vectorized buffer version. + * @see NonnullNumericVectorAggregator the vectorized version. + */ +public final class NonnullNumericAggregator extends NullAwareNumericAggregator +{ + public NonnullNumericAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) + { + super(delegate, selector); + } + + @Override + public void aggregate() + { + doAggregate(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericBufferAggregator.java new file mode 100644 index 000000000000..a964e213906b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericBufferAggregator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; + +import java.nio.ByteBuffer; + +/** + * Null-aware numeric {@link BufferAggregator} whose input is not nullable, but which should be null valued if no + * values are aggregated at all. + * + * The result of this aggregator will only be null if no values are aggregated at all, otherwise the result will + * be the aggregated value of the delegate aggregator. This class is only used when SQL compatible null handling + * is enabled. + * + * @see NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory) + * @see NullableNumericAggregator for the non-vectorized heap version. + * @see NullableNumericVectorAggregator the vectorized version. + */ +public final class NonnullNumericBufferAggregator extends NullAwareNumericBufferAggregator +{ + public NonnullNumericBufferAggregator( + BufferAggregator delegate, + BaseNullableColumnValueSelector nullSelector + ) + { + super(delegate, nullSelector); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + doAggregate(buf, position); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericVectorAggregator.java new file mode 100644 index 000000000000..166cc1ede95a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NonnullNumericVectorAggregator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * A wrapper around a non-null-aware {@link VectorAggregator} whose input is not nullable, but which + * should be null valued if no values are aggregated at all. + * + * The result of this aggregator will be null only if no values are aggregated at all, otherwise the result + * would be the aggregated value of the delegate aggregator. This class is only used when SQL compatible null + * handling is enabled. + * + * @see NullableNumericAggregatorFactory#factorizeVector(VectorColumnSelectorFactory) + * @see NonnullNumericAggregator for the non-vectorized heap version. + * @see NonnullNumericBufferAggregator for the non-vectorized version. + */ +public final class NonnullNumericVectorAggregator extends NullAwareNumericVectorAggregator +{ + NonnullNumericVectorAggregator( + VectorAggregator delegate, + VectorValueSelector selector + ) + { + super(delegate, selector); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + doAggregate(buf, position, startRow, endRow); + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + doAggregate(buf, numRows, positions, rows, positionOffset); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericAggregator.java new file mode 100644 index 000000000000..5fdb387ed630 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericAggregator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; + +import javax.annotation.Nullable; + +/** + * Base class for null-aware numeric {@link Aggregator}s, which will be initialized to a null value. + * + * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra + * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before + * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) + * + * Used by {@link NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory)} to wrap non-null aware + * aggregators. This class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory) + * @see NullAwareNumericBufferAggregator for the non-vectorized buffer version. + * @see NullAwareNumericVectorAggregator the vectorized version. + */ +@PublicApi +public abstract class NullAwareNumericAggregator implements Aggregator +{ + protected final Aggregator delegate; + protected final BaseNullableColumnValueSelector selector; + protected boolean isNullResult = true; + + public NullAwareNumericAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) + { + this.delegate = delegate; + this.selector = selector; + } + + @Override + @Nullable + public final Object get() + { + if (isNullResult) { + return null; + } + return delegate.get(); + } + + @Override + public final float getFloat() + { + if (isNullResult) { + throw new IllegalStateException("Cannot return float for Null Value"); + } + return delegate.getFloat(); + } + + @Override + public final long getLong() + { + if (isNullResult) { + throw new IllegalStateException("Cannot return long for Null Value"); + } + return delegate.getLong(); + } + + @Override + public final double getDouble() + { + if (isNullResult) { + throw new IllegalStateException("Cannot return double for Null Value"); + } + return delegate.getDouble(); + } + + @Override + public final boolean isNull() + { + return isNullResult || delegate.isNull(); + } + + @Override + public final void close() + { + delegate.close(); + } + + protected final void doAggregate() + { + if (isNullResult) { + isNullResult = false; + } + delegate.aggregate(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericBufferAggregator.java new file mode 100644 index 000000000000..0ae36bd34384 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericBufferAggregator.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.guice.annotations.PublicApi; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Base class for null-aware numeric {@link BufferAggregator}s, which will be initialized to a null value. + * + * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra + * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before + * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) + * + * Used by {@link NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory)} to wrap non-null aware + * aggregators. This class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory) + * @see NullAwareNumericAggregator for the non-vectorized heap version. + * @see NullAwareNumericVectorAggregator the vectorized version. + */ +@PublicApi +public abstract class NullAwareNumericBufferAggregator implements BufferAggregator +{ + protected final BufferAggregator delegate; + protected final BaseNullableColumnValueSelector nullSelector; + + public NullAwareNumericBufferAggregator(BufferAggregator delegate, BaseNullableColumnValueSelector nullSelector) + { + this.delegate = delegate; + this.nullSelector = nullSelector; + } + + @Override + public final void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + delegate.init(buf, position + Byte.BYTES); + } + + @Override + @Nullable + public final Object get(ByteBuffer buf, int position) + { + if (buf.get(position) == NullHandling.IS_NULL_BYTE) { + return null; + } + return delegate.get(buf, position + Byte.BYTES); + } + + @Override + public final float getFloat(ByteBuffer buf, int position) + { + if (buf.get(position) == NullHandling.IS_NULL_BYTE) { + throw new IllegalStateException("Cannot return float for Null Value"); + } + return delegate.getFloat(buf, position + Byte.BYTES); + } + + @Override + public final long getLong(ByteBuffer buf, int position) + { + if (buf.get(position) == NullHandling.IS_NULL_BYTE) { + throw new IllegalStateException("Cannot return long for Null Value"); + } + return delegate.getLong(buf, position + Byte.BYTES); + } + + @Override + public final double getDouble(ByteBuffer buf, int position) + { + if (buf.get(position) == NullHandling.IS_NULL_BYTE) { + throw new IllegalStateException("Cannot return double for Null Value"); + } + return delegate.getDouble(buf, position + Byte.BYTES); + } + + @Override + public final boolean isNull(ByteBuffer buf, int position) + { + return buf.get(position) == NullHandling.IS_NULL_BYTE || delegate.isNull(buf, position + Byte.BYTES); + } + + @Override + public final void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + delegate.relocate(oldPosition + Byte.BYTES, newPosition + Byte.BYTES, oldBuffer, newBuffer); + } + + @Override + public final void close() + { + delegate.close(); + } + + @Override + public final void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("delegate", delegate); + inspector.visit("nullSelector", nullSelector); + } + + protected final void doAggregate(ByteBuffer buf, int position) + { + if (buf.get(position) == NullHandling.IS_NULL_BYTE) { + buf.put(position, NullHandling.IS_NOT_NULL_BYTE); + } + delegate.aggregate(buf, position + Byte.BYTES); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericVectorAggregator.java new file mode 100644 index 000000000000..0ba685ab986d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullAwareNumericVectorAggregator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * A wrapper around a non-null-aware VectorAggregator that makes it null-aware. This removes the need for each + * aggregator class to handle nulls on its own. This class only makes sense as a wrapper for "primitive" aggregators, + * i.e., ones that take {@link VectorValueSelector} as input. + * + * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra + * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before + * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) + * + * @see NullableNumericAggregatorFactory#factorizeVector(VectorColumnSelectorFactory) + * @see NullAwareNumericAggregator for the non-vectorized heap version. + * @see NullAwareNumericBufferAggregator for the non-vectorized version. + */ +public abstract class NullAwareNumericVectorAggregator implements VectorAggregator +{ + protected final VectorAggregator delegate; + protected final VectorValueSelector selector; + + @Nullable + protected int[] vAggregationPositions = null; + + @Nullable + protected int[] vAggregationRows = null; + + NullAwareNumericVectorAggregator(VectorAggregator delegate, VectorValueSelector selector) + { + this.delegate = delegate; + this.selector = selector; + } + + @Override + public final void init(ByteBuffer buf, int position) + { + buf.put(position, NullHandling.IS_NULL_BYTE); + delegate.init(buf, position + Byte.BYTES); + } + + @Override + @Nullable + public final Object get(ByteBuffer buf, int position) + { + switch (buf.get(position)) { + case NullHandling.IS_NULL_BYTE: + return null; + case NullHandling.IS_NOT_NULL_BYTE: + return delegate.get(buf, position + Byte.BYTES); + default: + // Corrupted byte? + throw new ISE("Bad null-marker byte, delegate class[%s]", delegate.getClass().getName()); + } + } + + @Override + public final void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) + { + delegate.relocate(oldPosition + Byte.BYTES, newPosition + Byte.BYTES, oldBuffer, newBuffer); + } + + @Override + public final void close() + { + delegate.close(); + } + + protected final void doAggregate(ByteBuffer buf, int position, int start, int end) + { + buf.put(position, NullHandling.IS_NOT_NULL_BYTE); + delegate.aggregate(buf, position + Byte.BYTES, start, end); + } + + protected final void doAggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + for (int i = 0; i < numRows; i++) { + buf.put(positions[i] + positionOffset, NullHandling.IS_NOT_NULL_BYTE); + } + + delegate.aggregate(buf, numRows, positions, rows, positionOffset + Byte.BYTES); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java index 33c0c2438fd7..85156841de5d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java @@ -23,35 +23,23 @@ import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import javax.annotation.Nullable; - /** - * Null-aware numeric {@link Aggregator}. + * Null-aware numeric {@link Aggregator} which can handle null valued inputs. * * The result of this aggregator will be null if all the values to be aggregated are null values or no values are * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate - * aggregator. - * - * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra - * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before - * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) - * - * Used by {@link NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory)} to wrap non-null aware - * aggregators. This class is only used when SQL compatible null handling is enabled. + * aggregator. This class is only used when SQL compatible null handling is enabled. * * @see NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory) + * @see NullableNumericBufferAggregator for the non-vectorized buffer version. + * @see NullableNumericVectorAggregator the vectorized version. */ @PublicApi -public final class NullableNumericAggregator implements Aggregator +public final class NullableNumericAggregator extends NullAwareNumericAggregator { - private final Aggregator delegate; - private final BaseNullableColumnValueSelector selector; - private boolean isNullResult = true; - public NullableNumericAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) { - this.delegate = delegate; - this.selector = selector; + super(delegate, selector); } @Override @@ -59,59 +47,7 @@ public void aggregate() { boolean isNotNull = !selector.isNull(); if (isNotNull) { - if (isNullResult) { - isNullResult = false; - } - delegate.aggregate(); - } - } - - @Override - @Nullable - public Object get() - { - if (isNullResult) { - return null; - } - return delegate.get(); - } - - @Override - public float getFloat() - { - if (isNullResult) { - throw new IllegalStateException("Cannot return float for Null Value"); - } - return delegate.getFloat(); - } - - @Override - public long getLong() - { - if (isNullResult) { - throw new IllegalStateException("Cannot return long for Null Value"); - } - return delegate.getLong(); - } - - @Override - public double getDouble() - { - if (isNullResult) { - throw new IllegalStateException("Cannot return double for Null Value"); + doAggregate(); } - return delegate.getDouble(); - } - - @Override - public boolean isNull() - { - return isNullResult || delegate.isNull(); - } - - @Override - public void close() - { - delegate.close(); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java index 551717d2e257..3524756fa7f0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java @@ -24,6 +24,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.column.ValueType; @@ -47,12 +48,19 @@ public abstract class NullableNumericAggregatorFactory extends AggregatorFactory { + private final boolean sqlCompatible = NullHandling.sqlCompatible(); + @Override public final Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) { T selector = selector(columnSelectorFactory); Aggregator aggregator = factorize(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericAggregator(aggregator, selector); + if (!sqlCompatible) { + return aggregator; + } + return hasNulls(columnSelectorFactory) + ? new NullableNumericAggregator(aggregator, selector) + : new NonnullNumericAggregator(aggregator, selector); } @Override @@ -60,7 +68,12 @@ public final BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSele { T selector = selector(columnSelectorFactory); BufferAggregator aggregator = factorizeBuffered(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericBufferAggregator(aggregator, selector); + if (!sqlCompatible) { + return aggregator; + } + return hasNulls(columnSelectorFactory) + ? new NullableNumericBufferAggregator(aggregator, selector) + : new NonnullNumericBufferAggregator(aggregator, selector); } @Override @@ -69,23 +82,41 @@ public final VectorAggregator factorizeVector(VectorColumnSelectorFactory column Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot vectorize"); VectorValueSelector selector = vectorSelector(columnSelectorFactory); VectorAggregator aggregator = factorizeVector(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericVectorAggregator(aggregator, selector); + if (!sqlCompatible) { + return aggregator; + } + return hasNulls(columnSelectorFactory) + ? new NullableNumericVectorAggregator(aggregator, selector) + : new NonnullNumericVectorAggregator(aggregator, selector); } @Override public final AggregateCombiner makeNullableAggregateCombiner() { AggregateCombiner combiner = makeAggregateCombiner(); - return NullHandling.replaceWithDefault() ? combiner : new NullableNumericAggregateCombiner(combiner); + return sqlCompatible ? new NullableNumericAggregateCombiner(combiner) : combiner; } @Override public final int getMaxIntermediateSizeWithNulls() { - return getMaxIntermediateSize() + (NullHandling.replaceWithDefault() ? 0 : Byte.BYTES); + return getMaxIntermediateSize() + (sqlCompatible ? Byte.BYTES : 0); + } + + @Override + public ValueType getFinalizedType() + { + return getType(); } - // ---- ABSTRACT METHODS BELOW ------ + /** + * Returns true if the aggregator will actually produce null values given its input selectors, e.g. if + * the inputs to the aggregator have any nulls. + */ + protected boolean hasNulls(ColumnInspector inspector) + { + return sqlCompatible; + } /** * Creates a {@link ColumnValueSelector} for the aggregated column. @@ -122,10 +153,7 @@ protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnS * * @see BufferAggregator */ - protected abstract BufferAggregator factorizeBuffered( - ColumnSelectorFactory columnSelectorFactory, - T selector - ); + protected abstract BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory, T selector); /** * Creates a {@link VectorAggregator} to aggregate values from several rows into a ByteBuffer. @@ -137,8 +165,7 @@ protected abstract BufferAggregator factorizeBuffered( */ protected VectorAggregator factorizeVector( VectorColumnSelectorFactory columnSelectorFactory, - VectorValueSelector selector - ) + VectorValueSelector selector) { if (!canVectorize(columnSelectorFactory)) { throw new UnsupportedOperationException("Cannot vectorize"); @@ -146,10 +173,4 @@ protected VectorAggregator factorizeVector( throw new UnsupportedOperationException("canVectorize returned true but 'factorizeVector' is not implemented"); } } - - @Override - public ValueType getFinalizedType() - { - return getType(); - } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java index 4a8ce6f792a4..d840ab805674 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java @@ -19,49 +19,29 @@ package org.apache.druid.query.aggregation; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.guice.annotations.PublicApi; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; -import javax.annotation.Nullable; import java.nio.ByteBuffer; /** - * Null-aware numeric {@link BufferAggregator}. + * Null-aware numeric {@link BufferAggregator} which can handle null valued inputs. * * The result of this aggregator will be null if all the values to be aggregated are null values or no values are * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate - * aggregator. - * - * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra - * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before - * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) - * - * Used by {@link NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory)} to wrap non-null aware - * aggregators. This class is only used when SQL compatible null handling is enabled. + * aggregator. This class is only used when SQL compatible null handling is enabled. * * @see NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory) + * @see NullableNumericAggregator for the non-vectorized heap version. * @see NullableNumericVectorAggregator the vectorized version. */ @PublicApi -public final class NullableNumericBufferAggregator implements BufferAggregator +public final class NullableNumericBufferAggregator extends NullAwareNumericBufferAggregator { - private final BufferAggregator delegate; - private final BaseNullableColumnValueSelector nullSelector; - public NullableNumericBufferAggregator(BufferAggregator delegate, BaseNullableColumnValueSelector nullSelector) { - this.delegate = delegate; - this.nullSelector = nullSelector; - } - - @Override - public void init(ByteBuffer buf, int position) - { - buf.put(position, NullHandling.IS_NULL_BYTE); - delegate.init(buf, position + Byte.BYTES); + super(delegate, nullSelector); } @Override @@ -69,72 +49,7 @@ public void aggregate(ByteBuffer buf, int position) { boolean isNotNull = !nullSelector.isNull(); if (isNotNull) { - if (buf.get(position) == NullHandling.IS_NULL_BYTE) { - buf.put(position, NullHandling.IS_NOT_NULL_BYTE); - } - delegate.aggregate(buf, position + Byte.BYTES); - } - } - - @Override - @Nullable - public Object get(ByteBuffer buf, int position) - { - if (buf.get(position) == NullHandling.IS_NULL_BYTE) { - return null; - } - return delegate.get(buf, position + Byte.BYTES); - } - - @Override - public float getFloat(ByteBuffer buf, int position) - { - if (buf.get(position) == NullHandling.IS_NULL_BYTE) { - throw new IllegalStateException("Cannot return float for Null Value"); - } - return delegate.getFloat(buf, position + Byte.BYTES); - } - - @Override - public long getLong(ByteBuffer buf, int position) - { - if (buf.get(position) == NullHandling.IS_NULL_BYTE) { - throw new IllegalStateException("Cannot return long for Null Value"); + doAggregate(buf, position); } - return delegate.getLong(buf, position + Byte.BYTES); - } - - @Override - public double getDouble(ByteBuffer buf, int position) - { - if (buf.get(position) == NullHandling.IS_NULL_BYTE) { - throw new IllegalStateException("Cannot return double for Null Value"); - } - return delegate.getDouble(buf, position + Byte.BYTES); - } - - @Override - public boolean isNull(ByteBuffer buf, int position) - { - return buf.get(position) == NullHandling.IS_NULL_BYTE || delegate.isNull(buf, position + Byte.BYTES); - } - - @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - delegate.relocate(oldPosition + Byte.BYTES, newPosition + Byte.BYTES, oldBuffer, newBuffer); - } - - @Override - public void close() - { - delegate.close(); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("delegate", delegate); - inspector.visit("nullSelector", nullSelector); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java index cdc4499f013e..7bbde760b17b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java @@ -19,8 +19,7 @@ package org.apache.druid.query.aggregation; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; @@ -28,48 +27,25 @@ import java.util.Arrays; /** - * A wrapper around a non-null-aware VectorAggregator that makes it null-aware. This removes the need for each - * aggregator class to handle nulls on its own. This class only makes sense as a wrapper for "primitive" aggregators, + * A wrapper around a non-null-aware {@link VectorAggregator} which removes the need for each aggregator class to + * handle null inputs on its own. This class only makes sense as a wrapper for "primitive" aggregators, * i.e., ones that take {@link VectorValueSelector} as input. * - * The result of this aggregator will be null if all the values to be aggregated are null values or no values are - * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate - * aggregator. - * - * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra - * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before - * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) - * * The result of a NullableAggregator will be null if all the values to be aggregated are null values * or no values are aggregated at all. If any of the value is non-null, the result would be the aggregated * value of the delegate aggregator. Note that the delegate aggregator is not required to perform check for * {@link VectorValueSelector#getNullVector()} on the selector as only non-null values will be passed * to the delegate aggregator. This class is only used when SQL compatible null handling is enabled. * - * @see NullableNumericBufferAggregator , the vectorized version. + * @see NullableNumericAggregatorFactory#factorizeVector(VectorColumnSelectorFactory) + * @see NullableNumericAggregator for the non-vectorized heap version. + * @see NullableNumericBufferAggregator for the non-vectorized version. */ -public class NullableNumericVectorAggregator implements VectorAggregator +public final class NullableNumericVectorAggregator extends NullAwareNumericVectorAggregator { - private final VectorAggregator delegate; - private final VectorValueSelector selector; - - @Nullable - private int[] vAggregationPositions = null; - - @Nullable - private int[] vAggregationRows = null; - NullableNumericVectorAggregator(VectorAggregator delegate, VectorValueSelector selector) { - this.delegate = delegate; - this.selector = selector; - } - - @Override - public void init(ByteBuffer buf, int position) - { - buf.put(position, NullHandling.IS_NULL_BYTE); - delegate.init(buf, position + Byte.BYTES); + super(delegate, selector); } @Override @@ -126,46 +102,4 @@ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable in doAggregate(buf, numRows, positions, rows, positionOffset); } } - - @Override - @Nullable - public Object get(ByteBuffer buf, int position) - { - switch (buf.get(position)) { - case NullHandling.IS_NULL_BYTE: - return null; - case NullHandling.IS_NOT_NULL_BYTE: - return delegate.get(buf, position + Byte.BYTES); - default: - // Corrupted byte? - throw new ISE("Bad null-marker byte, delegate class[%s]", delegate.getClass().getName()); - } - } - - @Override - public void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer) - { - delegate.relocate(oldPosition + Byte.BYTES, newPosition + Byte.BYTES, oldBuffer, newBuffer); - } - - @Override - public void close() - { - delegate.close(); - } - - private void doAggregate(ByteBuffer buf, int position, int start, int end) - { - buf.put(position, NullHandling.IS_NOT_NULL_BYTE); - delegate.aggregate(buf, position + Byte.BYTES, start, end); - } - - private void doAggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) - { - for (int i = 0; i < numRows; i++) { - buf.put(positions[i] + positionOffset, NullHandling.IS_NOT_NULL_BYTE); - } - - delegate.aggregate(buf, numRows, positions, rows, positionOffset + Byte.BYTES); - } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java index c540018dc6c4..c32ebd69742b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java @@ -20,27 +20,16 @@ package org.apache.druid.query.aggregation; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseDoubleColumnValueSelector; -import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.vector.VectorColumnSelectorFactory; -import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; -import java.util.Collections; import java.util.Comparator; -import java.util.List; -import java.util.Objects; /** * This is an abstract class inherited by various {@link AggregatorFactory} implementations that consume double input @@ -48,16 +37,9 @@ * It extends "NullableAggregatorFactory" instead of "NullableAggregatorFactory" * to additionally support aggregation on single/multi value string column types. */ -public abstract class SimpleDoubleAggregatorFactory extends NullableNumericAggregatorFactory +public abstract class SimpleDoubleAggregatorFactory extends SimpleNumericAggregatorFactory { - protected final String name; - @Nullable - protected final String fieldName; - @Nullable - protected final String expression; - protected final ExprMacroTable macroTable; protected final boolean storeDoubleAsFloat; - protected final Supplier fieldExpression; public SimpleDoubleAggregatorFactory( ExprMacroTable macroTable, @@ -66,49 +48,11 @@ public SimpleDoubleAggregatorFactory( @Nullable String expression ) { - this.macroTable = macroTable; - this.name = name; - this.fieldName = fieldName; - this.expression = expression; + super(macroTable, name, fieldName, expression); this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat(); - this.fieldExpression = Parser.lazyParse(expression, macroTable); - Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); - Preconditions.checkArgument( - fieldName == null ^ expression == null, - "Must have a valid, non-null fieldName or expression" - ); } - @Override - protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) - { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnDoubleAggregatorWrapper( - selector, - SimpleDoubleAggregatorFactory.this::buildAggregator, - nullValue() - ); - } else { - return buildAggregator(selector); - } - } - - @Override - protected BufferAggregator factorizeBuffered( - ColumnSelectorFactory metricFactory, - ColumnValueSelector selector - ) - { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnDoubleBufferAggregatorWrapper( - selector, - SimpleDoubleAggregatorFactory.this::buildBufferAggregator, - nullValue() - ); - } else { - return buildBufferAggregator(selector); - } - } + protected abstract double nullValue(); @Override protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory) @@ -122,18 +66,23 @@ protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory) } @Override - protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory) + protected Aggregator buildStringColumnAggregatorWrapper(BaseObjectColumnValueSelector selector) { - return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); + return new StringColumnDoubleAggregatorWrapper( + selector, + SimpleDoubleAggregatorFactory.this::buildAggregator, + nullValue() + ); } - private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) + @Override + protected BufferAggregator buildStringColumnBufferAggregatorWrapper(BaseObjectColumnValueSelector selector) { - if (fieldName != null) { - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); - return capabilities != null && capabilities.getType() == ValueType.STRING; - } - return false; + return new StringColumnDoubleBufferAggregatorWrapper( + selector, + SimpleDoubleAggregatorFactory.this::buildBufferAggregator, + nullValue() + ); } @Override @@ -166,92 +115,4 @@ public Comparator getComparator() { return DoubleSumAggregator.COMPARATOR; } - - @Override - @Nullable - public Object finalizeComputation(@Nullable Object object) - { - return object; - } - - @Override - public List requiredFields() - { - return fieldName != null - ? Collections.singletonList(fieldName) - : fieldExpression.get().analyzeInputs().getRequiredBindingsList(); - } - - @Override - public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException - { - if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { - return getCombiningFactory(); - } else { - throw new AggregatorFactoryNotMergeableException(this, other); - } - } - - @Override - public int hashCode() - { - return Objects.hash(fieldName, expression, name); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SimpleDoubleAggregatorFactory that = (SimpleDoubleAggregatorFactory) o; - - if (!Objects.equals(fieldName, that.fieldName)) { - return false; - } - if (!Objects.equals(expression, that.expression)) { - return false; - } - if (!Objects.equals(name, that.name)) { - return false; - } - return true; - } - - @Override - @JsonProperty - public String getName() - { - return name; - } - - @Nullable - @JsonProperty - public String getFieldName() - { - return fieldName; - } - - @Nullable - @JsonProperty - public String getExpression() - { - return expression; - } - - @Override - public boolean canVectorize(ColumnInspector columnInspector) - { - return AggregatorUtil.canVectorize(columnInspector, fieldName, expression, fieldExpression); - } - - protected abstract double nullValue(); - - protected abstract Aggregator buildAggregator(BaseDoubleColumnValueSelector selector); - - protected abstract BufferAggregator buildBufferAggregator(BaseDoubleColumnValueSelector selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java index 03b9f923da6c..8b7243fd9df2 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java @@ -20,37 +20,18 @@ package org.apache.druid.query.aggregation; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseFloatColumnValueSelector; -import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.vector.VectorColumnSelectorFactory; -import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; -import java.util.Collections; import java.util.Comparator; -import java.util.List; -import java.util.Objects; -public abstract class SimpleFloatAggregatorFactory extends NullableNumericAggregatorFactory +public abstract class SimpleFloatAggregatorFactory extends SimpleNumericAggregatorFactory { - protected final String name; - @Nullable - protected final String fieldName; - @Nullable - protected final String expression; - protected final ExprMacroTable macroTable; - protected final Supplier fieldExpression; - public SimpleFloatAggregatorFactory( ExprMacroTable macroTable, String name, @@ -58,48 +39,10 @@ public SimpleFloatAggregatorFactory( @Nullable String expression ) { - this.macroTable = macroTable; - this.name = name; - this.fieldName = fieldName; - this.expression = expression; - this.fieldExpression = Parser.lazyParse(expression, macroTable); - Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); - Preconditions.checkArgument( - fieldName == null ^ expression == null, - "Must have a valid, non-null fieldName or expression" - ); + super(macroTable, name, fieldName, expression); } - @Override - protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) - { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnFloatAggregatorWrapper( - selector, - SimpleFloatAggregatorFactory.this::buildAggregator, - nullValue() - ); - } else { - return buildAggregator(selector); - } - } - - @Override - protected BufferAggregator factorizeBuffered( - ColumnSelectorFactory metricFactory, - ColumnValueSelector selector - ) - { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnFloatBufferAggregatorWrapper( - selector, - SimpleFloatAggregatorFactory.this::buildBufferAggregator, - nullValue() - ); - } else { - return buildBufferAggregator(selector); - } - } + protected abstract float nullValue(); @Override protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory) @@ -113,9 +56,23 @@ protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory) } @Override - protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory) + protected Aggregator buildStringColumnAggregatorWrapper(BaseObjectColumnValueSelector selector) + { + return new StringColumnFloatAggregatorWrapper( + selector, + SimpleFloatAggregatorFactory.this::buildAggregator, + nullValue() + ); + } + + @Override + protected BufferAggregator buildStringColumnBufferAggregatorWrapper(BaseObjectColumnValueSelector selector) { - return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); + return new StringColumnFloatBufferAggregatorWrapper( + selector, + SimpleFloatAggregatorFactory.this::buildBufferAggregator, + nullValue() + ); } @Override @@ -145,101 +102,4 @@ public Comparator getComparator() { return FloatSumAggregator.COMPARATOR; } - - @Override - @Nullable - public Object finalizeComputation(@Nullable Object object) - { - return object; - } - - @Override - public List requiredFields() - { - return fieldName != null - ? Collections.singletonList(fieldName) - : fieldExpression.get().analyzeInputs().getRequiredBindingsList(); - } - - @Override - public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException - { - if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { - return getCombiningFactory(); - } else { - throw new AggregatorFactoryNotMergeableException(this, other); - } - } - - @Override - public int hashCode() - { - return Objects.hash(fieldName, expression, name); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SimpleFloatAggregatorFactory that = (SimpleFloatAggregatorFactory) o; - - if (!Objects.equals(fieldName, that.fieldName)) { - return false; - } - if (!Objects.equals(expression, that.expression)) { - return false; - } - if (!Objects.equals(name, that.name)) { - return false; - } - return true; - } - - @Override - @JsonProperty - public String getName() - { - return name; - } - - @Nullable - @JsonProperty - public String getFieldName() - { - return fieldName; - } - - @Nullable - @JsonProperty - public String getExpression() - { - return expression; - } - - @Override - public boolean canVectorize(ColumnInspector columnInspector) - { - return AggregatorUtil.canVectorize(columnInspector, fieldName, expression, fieldExpression); - } - - private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) - { - if (fieldName != null) { - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); - return capabilities != null && capabilities.getType() == ValueType.STRING; - } - return false; - } - - protected abstract float nullValue(); - - protected abstract Aggregator buildAggregator(BaseFloatColumnValueSelector selector); - - protected abstract BufferAggregator buildBufferAggregator(BaseFloatColumnValueSelector selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java index bf297e1d4e93..958e58aabe22 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java @@ -20,26 +20,15 @@ package org.apache.druid.query.aggregation; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; -import org.apache.druid.segment.vector.VectorColumnSelectorFactory; -import org.apache.druid.segment.vector.VectorValueSelector; import javax.annotation.Nullable; -import java.util.Collections; import java.util.Comparator; -import java.util.List; -import java.util.Objects; /** * This is an abstract class inherited by various {@link AggregatorFactory} implementations that consume long input @@ -47,16 +36,8 @@ * It extends "NullableAggregatorFactory" instead of "NullableAggregatorFactory" * to additionally support aggregation on single/multi value string column types. */ -public abstract class SimpleLongAggregatorFactory extends NullableNumericAggregatorFactory +public abstract class SimpleLongAggregatorFactory extends SimpleNumericAggregatorFactory { - protected final String name; - @Nullable - protected final String fieldName; - @Nullable - protected final String expression; - protected final ExprMacroTable macroTable; - protected final Supplier fieldExpression; - public SimpleLongAggregatorFactory( ExprMacroTable macroTable, String name, @@ -64,47 +45,7 @@ public SimpleLongAggregatorFactory( @Nullable String expression ) { - this.macroTable = macroTable; - this.name = name; - this.fieldName = fieldName; - this.expression = expression; - this.fieldExpression = Parser.lazyParse(expression, macroTable); - Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); - Preconditions.checkArgument( - fieldName == null ^ expression == null, - "Must have a valid, non-null fieldName or expression" - ); - } - - @Override - protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) - { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnLongAggregatorWrapper( - selector, - SimpleLongAggregatorFactory.this::buildAggregator, - nullValue() - ); - } else { - return buildAggregator(selector); - } - } - - @Override - protected BufferAggregator factorizeBuffered( - ColumnSelectorFactory metricFactory, - ColumnValueSelector selector - ) - { - if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { - return new StringColumnLongBufferAggregatorWrapper( - selector, - SimpleLongAggregatorFactory.this::buildBufferAggregator, - nullValue() - ); - } else { - return buildBufferAggregator(selector); - } + super(macroTable, name, fieldName, expression); } @Override @@ -119,18 +60,25 @@ protected ColumnValueSelector selector(ColumnSelectorFactory metricFactory) } @Override - protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory) + protected Aggregator buildStringColumnAggregatorWrapper(BaseObjectColumnValueSelector selector) { - return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); + return new StringColumnLongAggregatorWrapper( + selector, + SimpleLongAggregatorFactory.this::buildAggregator, + nullValue() + ); } @Override - public Object deserialize(Object object) + protected BufferAggregator buildStringColumnBufferAggregatorWrapper(BaseObjectColumnValueSelector selector) { - return object; + return new StringColumnLongBufferAggregatorWrapper( + selector, + SimpleLongAggregatorFactory.this::buildBufferAggregator, + nullValue() + ); } - @Override public ValueType getType() { @@ -149,100 +97,5 @@ public Comparator getComparator() return LongSumAggregator.COMPARATOR; } - @Override - @Nullable - public Object finalizeComputation(@Nullable Object object) - { - return object; - } - - @Override - public List requiredFields() - { - return fieldName != null - ? Collections.singletonList(fieldName) - : fieldExpression.get().analyzeInputs().getRequiredBindingsList(); - } - - @Override - public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException - { - if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { - return getCombiningFactory(); - } else { - throw new AggregatorFactoryNotMergeableException(this, other); - } - } - - @Override - public int hashCode() - { - return Objects.hash(fieldName, expression, name); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SimpleLongAggregatorFactory that = (SimpleLongAggregatorFactory) o; - - if (!Objects.equals(fieldName, that.fieldName)) { - return false; - } - if (!Objects.equals(expression, that.expression)) { - return false; - } - if (!Objects.equals(name, that.name)) { - return false; - } - return true; - } - - @Override - @JsonProperty - public String getName() - { - return name; - } - - @Nullable - @JsonProperty - public String getFieldName() - { - return fieldName; - } - - @Nullable - @JsonProperty - public String getExpression() - { - return expression; - } - - @Override - public boolean canVectorize(ColumnInspector columnInspector) - { - return AggregatorUtil.canVectorize(columnInspector, fieldName, expression, fieldExpression); - } - - private boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) - { - if (fieldName != null) { - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); - return capabilities != null && capabilities.getType() == ValueType.STRING; - } - return false; - } - protected abstract long nullValue(); - - protected abstract Aggregator buildAggregator(BaseLongColumnValueSelector selector); - - protected abstract BufferAggregator buildBufferAggregator(BaseLongColumnValueSelector selector); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleNumericAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleNumericAggregatorFactory.java new file mode 100644 index 000000000000..25146a73fcdb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleNumericAggregatorFactory.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.math.expr.Expr; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.math.expr.Parser; +import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Base {@link AggregatorFactory} for all {@link NullableNumericAggregatorFactory} implementations which produce a + * primitive numeric output and are based on either a single column or single expression (which might have inputs + * from multiple column) + */ +public abstract class SimpleNumericAggregatorFactory + extends NullableNumericAggregatorFactory +{ + protected final String name; + @Nullable + protected final String fieldName; + @Nullable + protected final String expression; + protected final ExprMacroTable macroTable; + protected final Supplier fieldExpression; + + public SimpleNumericAggregatorFactory( + ExprMacroTable macroTable, + String name, + @Nullable final String fieldName, + @Nullable String expression + ) + { + this.macroTable = macroTable; + this.name = name; + this.fieldName = fieldName; + this.expression = expression; + this.fieldExpression = Parser.lazyParse(expression, macroTable); + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkArgument( + fieldName == null ^ expression == null, + "Must have a valid, non-null fieldName or expression" + ); + } + + protected abstract Aggregator buildAggregator(TValueSelector selector); + + protected abstract Aggregator buildStringColumnAggregatorWrapper(BaseObjectColumnValueSelector selector); + + protected abstract BufferAggregator buildBufferAggregator(TValueSelector selector); + + protected abstract BufferAggregator buildStringColumnBufferAggregatorWrapper(BaseObjectColumnValueSelector selector); + + @Override + protected Aggregator factorize(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) + { + if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { + return buildStringColumnAggregatorWrapper(selector); + } else { + return buildAggregator((TValueSelector) selector); + } + } + + @Override + protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, ColumnValueSelector selector) + { + if (shouldUseStringColumnAggregatorWrapper(metricFactory)) { + return buildStringColumnBufferAggregatorWrapper(selector); + } else { + return buildBufferAggregator((TValueSelector) selector); + } + } + + @Override + protected VectorValueSelector vectorSelector(VectorColumnSelectorFactory columnSelectorFactory) + { + return AggregatorUtil.makeVectorValueSelector(columnSelectorFactory, fieldName, expression, fieldExpression); + } + + @Override + public Object deserialize(Object object) + { + return object; + } + + @Override + @Nullable + public Object finalizeComputation(@Nullable Object object) + { + return object; + } + + @Override + public List requiredFields() + { + return fieldName != null + ? Collections.singletonList(fieldName) + : fieldExpression.get().analyzeInputs().getRequiredBindingsList(); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + return AggregatorUtil.canVectorize(columnInspector, fieldName, expression, fieldExpression); + } + + protected boolean shouldUseStringColumnAggregatorWrapper(ColumnSelectorFactory columnSelectorFactory) + { + if (fieldName != null) { + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName); + return capabilities != null && capabilities.getType() == ValueType.STRING; + } + return false; + } + + @Override + protected boolean hasNulls(ColumnInspector inspector) + { + if (fieldName != null) { + ColumnCapabilities capabilities = inspector.getColumnCapabilities(fieldName); + if (capabilities != null) { + return NullHandling.sqlCompatible() && capabilities.hasNulls().isMaybeTrue(); + } + } + // expressions are a bit more complicated, even if none of their inputs are null, the expression + // might still produce a null, so we will need more elaborate handling of this in the future. + // missing capabilities also falls through to here, and either means 'unknown', which might have nulls, + // or definitely non-existent, in which case it is all nulls (or default values, depending on the mode) + return NullHandling.sqlCompatible(); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @Nullable + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @Nullable + @JsonProperty + public String getExpression() + { + return expression; + } + + @Override + public int hashCode() + { + return Objects.hash(fieldName, expression, name); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SimpleNumericAggregatorFactory that = (SimpleNumericAggregatorFactory) o; + + if (!Objects.equals(fieldName, that.fieldName)) { + return false; + } + if (!Objects.equals(expression, that.expression)) { + return false; + } + if (!Objects.equals(name, that.name)) { + return false; + } + return true; + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java index 3945c4feb20b..c46f8b173193 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMaxAggregationTest.java @@ -63,7 +63,7 @@ public void setup() selector = new TestDoubleColumnSelectorImpl(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); VectorValueSelector vectorValueSelector = EasyMock.createMock(VectorValueSelector.class); @@ -73,7 +73,7 @@ public void setup() vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class); EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("dblFld")) - .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.DOUBLE).setDictionaryEncoded(true)).anyTimes(); + .andReturn(ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE)).anyTimes(); EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("dblFld")).andReturn(vectorValueSelector).anyTimes(); EasyMock.replay(vectorColumnSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java index 14702d1bf1f5..bbcd94aa2d81 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/DoubleMinAggregationTest.java @@ -63,7 +63,7 @@ public void setup() selector = new TestDoubleColumnSelectorImpl(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); @@ -74,7 +74,7 @@ public void setup() vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class); EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("dblFld")) - .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.DOUBLE).setDictionaryEncoded(true)).anyTimes(); + .andReturn(ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE)).anyTimes(); EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("dblFld")).andReturn(vectorValueSelector).anyTimes(); EasyMock.replay(vectorColumnSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/FloatMaxAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/FloatMaxAggregationTest.java index dc1dc15c20a6..f90c3b890b73 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/FloatMaxAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/FloatMaxAggregationTest.java @@ -59,7 +59,7 @@ public void setup() vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class); EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("fltFld")) - .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.FLOAT).setDictionaryEncoded(true)).anyTimes(); + .andReturn(ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT)).anyTimes(); EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("fltFld")).andReturn(vectorValueSelector).anyTimes(); EasyMock.replay(vectorColumnSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/FloatMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/FloatMinAggregationTest.java index a451d1c4cc59..f00444d97c96 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/FloatMinAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/FloatMinAggregationTest.java @@ -59,7 +59,7 @@ public void setup() vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class); EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("fltFld")) - .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.FLOAT).setDictionaryEncoded(true)).anyTimes(); + .andReturn(ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT)).anyTimes(); EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("fltFld")).andReturn(vectorValueSelector).anyTimes(); EasyMock.replay(vectorColumnSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java index 838d3969cd68..46a89f6c0a3b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/LongMaxAggregationTest.java @@ -62,7 +62,7 @@ public void setup() selector = new TestLongColumnSelector(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); @@ -73,7 +73,7 @@ public void setup() vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class); EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("lngFld")) - .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.LONG).setDictionaryEncoded(true)).anyTimes(); + .andReturn(ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.LONG)).anyTimes(); EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("lngFld")).andReturn(vectorValueSelector).anyTimes(); EasyMock.replay(vectorColumnSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java index 00c9ab4f8c8f..fb4e0e8c587b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/LongMinAggregationTest.java @@ -62,7 +62,7 @@ public void setup() selector = new TestLongColumnSelector(values); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(selector); - EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(null).anyTimes(); EasyMock.replay(colSelectorFactory); VectorValueSelector vectorValueSelector = EasyMock.createMock(VectorValueSelector.class); @@ -72,7 +72,7 @@ public void setup() vectorColumnSelectorFactory = EasyMock.createMock(VectorColumnSelectorFactory.class); EasyMock.expect(vectorColumnSelectorFactory.getColumnCapabilities("lngFld")) - .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.LONG).setDictionaryEncoded(true)).anyTimes(); + .andReturn(ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.LONG)).anyTimes(); EasyMock.expect(vectorColumnSelectorFactory.makeValueSelector("lngFld")).andReturn(vectorValueSelector).anyTimes(); EasyMock.replay(vectorColumnSelectorFactory); } diff --git a/server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java index fa8e931f0cdb..0f51311489be 100644 --- a/server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java @@ -30,6 +30,7 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Assume; @@ -41,7 +42,7 @@ import javax.script.ScriptEngineManager; import java.util.LinkedHashMap; -public class JavaScriptTieredBrokerSelectorStrategyTest +public class JavaScriptTieredBrokerSelectorStrategyTest extends InitializedNullHandlingTest { @Rule public ExpectedException expectedException = ExpectedException.none();