Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,10 @@ public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggrega
return null;
}
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) preAggregated;
if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == that.stringEncoding && Objects.equals(
fieldName,
that.fieldName
)) {
if (lgK <= that.lgK &&
stringEncoding == that.stringEncoding &&
Objects.equals(fieldName, that.fieldName)
) {
return getCombiningFactory();
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,23 @@ public byte[] getCacheKey()
return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
KllSketchAggregatorFactory<?, ?> that = (KllSketchAggregatorFactory<?, ?>) preAggregated;
if (Objects.equals(fieldName, that.fieldName) && k == that.k && maxStreamLength <= that.maxStreamLength) {
return getCombiningFactory();
}
return null;
}

@Override
public boolean equals(final Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,25 @@ public byte[] getCacheKey()
return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}

if (getClass() != preAggregated.getClass()) {
return null;
}

DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) preAggregated;
if (k <= that.k && maxStreamLength <= that.getMaxStreamLength() && Objects.equals(fieldName, that.fieldName)) {
return getCombiningFactory();
}
return null;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

public abstract class SketchAggregatorFactory extends AggregatorFactory
{
Expand Down Expand Up @@ -266,6 +267,22 @@ public byte[] getCacheKey()
.array();
}

@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does SketchMergeAggregatorFactory need to override this with an impl that checks isInputThetaSketch?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, i suppose it wouldn't hurt, though i can't imagine the field would be the same name and isInputThetaSketch be true in query agg but not in projection agg, or the other way, can add just in case since i need to make some other changes

{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) preAggregated;
if (Objects.equals(fieldName, that.fieldName) && size <= that.size) {
return getCombiningFactory();
}
return null;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.segment.column.ColumnType;

import javax.annotation.Nullable;
import java.util.Objects;

public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
{
Expand Down Expand Up @@ -165,6 +166,25 @@ public AggregatorFactory withName(String newName)
);
}

@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) preAggregated;
if (Objects.equals(fieldName, that.fieldName) &&
size <= that.size &&
isInputThetaSketch == that.isInputThetaSketch
) {
return getCombiningFactory();
}
return null;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,29 @@ public ColumnType getResultType()
return ColumnType.DOUBLE;
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}

if (getClass() != preAggregated.getClass()) {
return null;
}

ArrayOfDoublesSketchAggregatorFactory that = (ArrayOfDoublesSketchAggregatorFactory) preAggregated;
if (nominalEntries <= that.nominalEntries &&
numberOfValues == that.numberOfValues &&
Objects.equals(fieldName, that.fieldName) &&
Objects.equals(metricColumns, that.metricColumns)
) {
return getCombiningFactory();
}
return null;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,22 @@ public void testResultArraySignature()
new TimeseriesQueryQueryToolChest().resultArraySignature(query)
);
}

@Test
public void testCanSubstitute()
{
AggregatorFactory sketch = new KllDoublesSketchAggregatorFactory("sketch", "x", 200, null);
AggregatorFactory sketch2 = new KllDoublesSketchAggregatorFactory("other", "x", 200, null);
AggregatorFactory sketch3 = new KllDoublesSketchAggregatorFactory("sketch", "x", 200, 1_000L);
AggregatorFactory sketch4 = new KllDoublesSketchAggregatorFactory("sketch", "y", 200, null);
AggregatorFactory sketch5 = new KllDoublesSketchAggregatorFactory("sketch", "x", 300, null);

Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch));
Assert.assertNotNull(sketch2.substituteCombiningFactory(sketch));
Assert.assertNull(sketch.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch.substituteCombiningFactory(sketch4));
Assert.assertNull(sketch.substituteCombiningFactory(sketch5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,19 @@ public void testNullSketches()
ac.fold(new TestDoublesSketchColumnValueSelector());
Assert.assertNotNull(ac.getObject());
}

@Test
public void testCanSubstitute()
{
final DoublesSketchAggregatorFactory sketch = new DoublesSketchAggregatorFactory("sketch", "x", 1024, 1000L, null);
final DoublesSketchAggregatorFactory sketch2 = new DoublesSketchAggregatorFactory("other", "x", 1024, 2000L, null);
final DoublesSketchAggregatorFactory sketch3 = new DoublesSketchAggregatorFactory("another", "x", 2048, 1000L, null);
final DoublesSketchAggregatorFactory incompatible = new DoublesSketchAggregatorFactory("incompatible", "y", 1024, 1000L, null);

Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch2.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
Assert.assertNull(sketch3.substituteCombiningFactory(sketch));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.AggregatorAndSize;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory;
Expand Down Expand Up @@ -213,4 +214,18 @@ public void testFactorizeVectorOnUnsupportedComplexColumn()
Throwable exception = Assert.assertThrows(DruidException.class, () -> AGGREGATOR_16384.factorizeVector(vectorFactory));
Assert.assertEquals("Unsupported input [x] of type [COMPLEX<json>] for aggregator [COMPLEX<thetaSketchBuild>].", exception.getMessage());
}

@Test
public void testCanSubstitute()
{
AggregatorFactory sketch1 = new SketchMergeAggregatorFactory("sketch", "x", 16, true, false, 2);
AggregatorFactory sketch2 = new SketchMergeAggregatorFactory("other", "x", null, false, false, null);
AggregatorFactory sketch3 = new SketchMergeAggregatorFactory("sketch", "x", null, false, false, 3);
AggregatorFactory sketch4 = new SketchMergeAggregatorFactory("sketch", "y", null, false, false, null);

Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch2));
Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch3));
Assert.assertNull(sketch1.substituteCombiningFactory(sketch4));
Assert.assertNull(sketch2.substituteCombiningFactory(sketch1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,21 @@ public void testWithName()
Assert.assertEquals(factory, factory.withName("name"));
Assert.assertEquals("newTest", factory.withName("newTest").getName());
}

@Test
public void testCanSubstitute()
{
AggregatorFactory sketch = new ArrayOfDoublesSketchAggregatorFactory("sketch", "x", null, null, null);
AggregatorFactory sketch2 = new ArrayOfDoublesSketchAggregatorFactory("sketch2", "x", null, null, null);
AggregatorFactory other = new ArrayOfDoublesSketchAggregatorFactory("other", "x", 8192, null, null);
AggregatorFactory incompatible = new ArrayOfDoublesSketchAggregatorFactory("incompatible", "x", 2048, null, null);
AggregatorFactory incompatible2 = new ArrayOfDoublesSketchAggregatorFactory("sketch", "y", null, null, null);
Assert.assertNotNull(sketch.substituteCombiningFactory(other));
Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2));
Assert.assertNull(sketch.substituteCombiningFactory(incompatible));
Assert.assertNotNull(sketch.substituteCombiningFactory(sketch));
Assert.assertNull(other.substituteCombiningFactory(sketch));
Assert.assertNull(sketch.substituteCombiningFactory(incompatible2));
Assert.assertNull(other.substituteCombiningFactory(incompatible2));
}
}
Loading