Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c819f0a
HIVE-26243: Add vectorized implementation of the 'ds_kll_sketch' UDAF
asolimando May 24, 2022
0cbb404
datasketches dependency added to metastore-server pom (excluding slf4…
asolimando Mar 4, 2022
92a3e4e
introduced KllUtils
asolimando May 20, 2022
3021877
introduced KllHistogramEstimator and related classes
asolimando May 24, 2022
e6a1ed4
added compute_kll_sketch.q to compare hex values with vectorized impl…
asolimando May 24, 2022
8cf1599
updated qtest out files for vectorized kll sketch
asolimando May 19, 2022
a59a0b9
review comment: unify code generation method for KLL and bitvector
asolimando Oct 26, 2022
c8cceb3
review comment: org.apache.datasketches in top-level pom with exclusion
asolimando Oct 26, 2022
4f51c66
review comment: enum for vectorizable UDAF
asolimando Oct 26, 2022
21f05a0
supporting k param (data-sketch size) in KLL UDAF for vectorization
asolimando May 24, 2022
0162da9
updated compute_kll_sketch qtest for showing k param effect
asolimando May 24, 2022
36c643a
datasketches exclusion for standalone-metastore/pom.xml not inheritin…
asolimando Oct 27, 2022
686b44d
remove unused factory methods from KllHistogramEstimatorFactory
asolimando Nov 7, 2022
3f7a22b
rename VectorUDAFComputeKLL to VectorUDAFComputeDsKllSketch
asolimando Nov 7, 2022
d994a13
use overridden toString for enums
asolimando Nov 7, 2022
503bcef
moving histogram classes to another package
asolimando Nov 17, 2022
874328c
removed datasketches dependency from metastore
asolimando Nov 17, 2022
96125de
removed some dead code and moved package
asolimando Nov 18, 2022
e2f02ea
fixed package name
asolimando Nov 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,17 @@
<artifactId>calcite-druid</artifactId>
<version>${calcite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-hive</artifactId>
<version>${datasketches.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;

import org.apache.hadoop.hive.ql.udf.datasketches.kll.KllHistogramEstimator;
import org.apache.hadoop.hive.ql.udf.datasketches.kll.KllHistogramEstimatorFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
#IF COMPLETE
import org.apache.hadoop.hive.ql.exec.vector.<InputColumnVectorType>;
#ENDIF COMPLETE
import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.util.JavaDataModel;

/**
* Generated from template VectorUDAFComputeDsKllSketch.txt.
*/
@Description(name = "ds_kll_sketch", value = "_FUNC_(x) "
+ "Returns a KllFloatsSketch in a serialized form as a binary blob."
+ " Values must be of type float.")
public class <ClassName> extends VectorAggregateExpression {

private transient int k;

public <ClassName>() {
super();
}

public <ClassName>(VectorAggregationDesc vecAggrDesc) {
this(vecAggrDesc, 200);
}

public <ClassName>(VectorAggregationDesc vecAggrDesc, int k) {
super(vecAggrDesc);
this.k = k;
}

@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
return new Aggregation();
}

@Override
public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
inputExpression.evaluate(batch);

#IF COMPLETE
<InputColumnVectorType> inputColumn = (<InputColumnVectorType>) batch.cols[this.inputExpression.getOutputColumnNum()];
#ENDIF COMPLETE
#IF MERGING
BytesColumnVector inputColumn = (BytesColumnVector) batch.cols[this.inputExpression.getOutputColumnNum()];
#ENDIF MERGING

int batchSize = batch.size;

if (batchSize == 0) {
return;
}

Aggregation myagg = (Aggregation) agg;

#IF COMPLETE
myagg.prepare(k);
if (inputColumn.noNulls) {
if (inputColumn.isRepeating) {
for (int i = 0; i < batchSize; i++) {
myagg.estimator.addToEstimator(inputColumn.vector[0]);
}
} else {
if (batch.selectedInUse) {
for (int s = 0; s < batchSize; s++) {
int i = batch.selected[s];
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
} else {
for (int i = 0; i < batchSize; i++) {
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
}
}
} else {
if (inputColumn.isRepeating) {
if (!inputColumn.isNull[0]) {
for (int i = 0; i < batchSize; i++) {
myagg.estimator.addToEstimator(inputColumn.vector[0]);
}
}
} else {
if (batch.selectedInUse) {
for (int j = 0; j < batchSize; ++j) {
int i = batch.selected[j];
if (!inputColumn.isNull[i]) {
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
if (!inputColumn.isNull[i]) {
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
}
}
}
}
#ENDIF COMPLETE
#IF MERGING
if (inputColumn.isRepeating) {
if (!inputColumn.isNull[0] && inputColumn.length[0] > 0) {
KllHistogramEstimator mergingKLL = KllHistogramEstimatorFactory.getKllHistogramEstimator(
inputColumn.vector[0], inputColumn.start[0], inputColumn.length[0]);
myagg.prepare(mergingKLL.getK());
myagg.estimator.mergeEstimators(mergingKLL);
}
} else {
for (int i = 0; i < batchSize; i++) {
int s = i;
if (batch.selectedInUse) {
s = batch.selected[i];
}
if (!inputColumn.isNull[s] && inputColumn.length[s] > 0) {
KllHistogramEstimator mergingKLL = KllHistogramEstimatorFactory.getKllHistogramEstimator(
inputColumn.vector[s], inputColumn.start[s], inputColumn.length[s]);
myagg.prepare(mergingKLL.getK());
myagg.estimator.mergeEstimators(mergingKLL);
}
}
}
#ENDIF MERGING
}

private Aggregation getAggregation(VectorAggregationBufferRow[] sets, int rowid, int bufferIndex) {
VectorAggregationBufferRow bufferRow = sets[rowid];
Aggregation myagg = (Aggregation) bufferRow.getAggregationBuffer(bufferIndex);
myagg.prepare(k);
return myagg;
}

@Override
public void aggregateInputSelection(VectorAggregationBufferRow[] aggregationBufferSets, int aggregateIndex,
VectorizedRowBatch batch) throws HiveException {
inputExpression.evaluate(batch);

#IF COMPLETE
<InputColumnVectorType> inputColumn = (<InputColumnVectorType>) batch.cols[this.inputExpression.getOutputColumnNum()];
#ENDIF COMPLETE
#IF MERGING
BytesColumnVector inputColumn = (BytesColumnVector) batch.cols[this.inputExpression.getOutputColumnNum()];
#ENDIF MERGING

int batchSize = batch.size;

if (batchSize == 0) {
return;
}

#IF COMPLETE
if (inputColumn.noNulls) {
if (inputColumn.isRepeating) {
for (int i = 0; i < batchSize; i++) {
Aggregation myagg = getAggregation(aggregationBufferSets, i, aggregateIndex);
myagg.estimator.addToEstimator(inputColumn.vector[0]);
}
} else {
if (batch.selectedInUse) {
for (int s = 0; s < batchSize; s++) {
int i = batch.selected[s];
Aggregation myagg = getAggregation(aggregationBufferSets, s, aggregateIndex);
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
} else {
for (int i = 0; i < batchSize; i++) {
Aggregation myagg = getAggregation(aggregationBufferSets, i, aggregateIndex);
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
}
}
} else {
if (inputColumn.isRepeating) {
if (!inputColumn.isNull[0]) {
for (int i = 0; i < batchSize; i++) {
Aggregation myagg = getAggregation(aggregationBufferSets, i, aggregateIndex);
myagg.estimator.addToEstimator(inputColumn.vector[0]);
}
}
} else {
if (batch.selectedInUse) {
for (int s = 0; s < batchSize; s++) {
int i = batch.selected[s];
if (!inputColumn.isNull[i]) {
Aggregation myagg = getAggregation(aggregationBufferSets, s, aggregateIndex);
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
if (!inputColumn.isNull[i]) {
Aggregation myagg = getAggregation(aggregationBufferSets, i, aggregateIndex);
myagg.estimator.addToEstimator(inputColumn.vector[i]);
}
}
}
}
}
#ENDIF COMPLETE
#IF MERGING
if (inputColumn.isRepeating) {
if (!inputColumn.isNull[0] && inputColumn.length[0] > 0) {
for (int i = 0; i < batchSize; i++) {
Aggregation myagg = getAggregation(aggregationBufferSets, i, aggregateIndex);
KllHistogramEstimator mergingKLL = KllHistogramEstimatorFactory.getKllHistogramEstimator(
inputColumn.vector[0], inputColumn.start[0], inputColumn.length[0]);
myagg.estimator.mergeEstimators(mergingKLL);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
int s = i;
if (batch.selectedInUse) {
s = batch.selected[i];
}
if (!inputColumn.isNull[s] && inputColumn.length[s] > 0) {
Aggregation myagg = getAggregation(aggregationBufferSets, i, aggregateIndex);
KllHistogramEstimator mergingKLL = KllHistogramEstimatorFactory.getKllHistogramEstimator(
inputColumn.vector[s], inputColumn.start[s], inputColumn.length[s]);
myagg.estimator.mergeEstimators(mergingKLL);
}
}
}
#ENDIF MERGING
}

@Override
public void reset(AggregationBuffer agg) throws HiveException {
agg.reset();
}

@Override
public long getAggregationBufferFixedSize() {
return 0;
}

@Override
public boolean matches(String name, ColumnVector.Type inputColVectorType, ColumnVector.Type outputColVectorType,
GenericUDAFEvaluator.Mode mode) {
return name.equals("ds_kll_sketch") &&
outputColVectorType == ColumnVector.Type.BYTES &&
#IF MERGING
inputColVectorType == ColumnVector.Type.BYTES &&
(mode == GenericUDAFEvaluator.Mode.PARTIAL2 || mode == GenericUDAFEvaluator.Mode.FINAL);
#ENDIF MERGING
#IF COMPLETE
inputColVectorType == ColumnVector.Type.<UpperCaseColumnVectorType> &&
(mode == GenericUDAFEvaluator.Mode.PARTIAL1 || mode == GenericUDAFEvaluator.Mode.COMPLETE);
#ENDIF COMPLETE
}

@Override
public void assignRowColumn(
VectorizedRowBatch batch, int batchIndex, int columnNum, AggregationBuffer agg) throws HiveException {
Aggregation myagg = (Aggregation) agg;
BytesColumnVector outputCol = (BytesColumnVector) batch.cols[columnNum];
if (myagg.estimator == null) {
outputCol.isNull[batchIndex] = true;
outputCol.noNulls = false;
} else {
outputCol.isNull[batchIndex] = false;
outputCol.isRepeating = false;
byte[] outputbuf = myagg.estimator.serialize();
outputCol.setRef(batchIndex, outputbuf, 0, outputbuf.length);
}
}

static class Aggregation implements AggregationBuffer {

KllHistogramEstimator estimator;

@Override
public int getVariableSize() {
return estimator.lengthFor(JavaDataModel.get());
}

@Override
public void reset() {
estimator = null;
}

public void prepare(int k) {
if (estimator == null) {
estimator = KllHistogramEstimatorFactory.getEmptyHistogramEstimator(k);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public static boolean isFullOuterMapEnabled(HiveConf hiveConf, JoinOperator join
HiveConf.getVar(hiveConf,
HiveConf.ConfVars.HIVE_TEST_MAPJOINFULLOUTER_OVERRIDE);
EnabledOverride mapJoinFullOuterOverride =
EnabledOverride.nameMap.get(testMapJoinFullOuterOverrideString);
EnabledOverride.NAME_MAP.get(testMapJoinFullOuterOverrideString);

final boolean isEnabled =
HiveConf.getBoolVar(
Expand Down
Loading