Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ public class DoublesSketchBuildAggregator implements Aggregator
{

private final ColumnValueSelector<Double> valueSelector;
private final int size;

private UpdateDoublesSketch sketch;

public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)
{
this.valueSelector = valueSelector;
this.size = size;
sketch = DoublesSketch.builder().setK(size).build();
}

Expand Down Expand Up @@ -68,5 +66,4 @@ public synchronized void close()
{
sketch = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
public class DoublesSketchMergeAggregator implements Aggregator
{

private final ColumnValueSelector<DoublesSketch> selector;
private final ColumnValueSelector selector;
private DoublesUnion union;

public DoublesSketchMergeAggregator(final ColumnValueSelector<DoublesSketch> selector, final int k)
public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)
{
this.selector = selector;
union = DoublesUnion.builder().setMaxK(k).build();
Expand All @@ -39,13 +39,10 @@ public DoublesSketchMergeAggregator(final ColumnValueSelector<DoublesSketch> sel
@Override
public synchronized void aggregate()
{
final DoublesSketch sketch = selector.getObject();
if (sketch == null) {
return;
}
union.update(sketch);
updateUnion(selector, union);
}


@Override
public synchronized Object get()
{
Expand All @@ -70,4 +67,16 @@ public synchronized void close()
union = null;
}

static void updateUnion(ColumnValueSelector selector, DoublesUnion union)
{
final Object object = selector.getObject();
if (object == null) {
return;
}
if (object instanceof DoublesSketch) {
union.update((DoublesSketch) object);
} else {
union.update(selector.getDouble());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.query.aggregation.datasketches.quantiles;

import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.quantiles.DoublesSketch;
import com.yahoo.sketches.quantiles.DoublesUnion;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
Expand All @@ -34,14 +33,14 @@
public class DoublesSketchMergeBufferAggregator implements BufferAggregator
{

private final ColumnValueSelector<DoublesSketch> selector;
private final ColumnValueSelector selector;
private final int k;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();

public DoublesSketchMergeBufferAggregator(
final ColumnValueSelector<DoublesSketch> selector,
final ColumnValueSelector selector,
final int k,
final int maxIntermediateSize)
{
Expand All @@ -62,12 +61,8 @@ public synchronized void init(final ByteBuffer buffer, final int position)
@Override
public synchronized void aggregate(final ByteBuffer buffer, final int position)
{
final DoublesSketch sketch = selector.getObject();
if (sketch == null) {
return;
}
final DoublesUnion union = unions.get(buffer).get(position);
union.update(sketch);
DoublesSketchMergeAggregator.updateUnion(selector, union);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,26 @@ public void setUp() throws Exception
CalciteTests.getJsonMapper().registerModule(mod);
}

final QueryableIndex index = IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new DoublesSketchAggregatorFactory(
"qsketch_m1",
"m1",
128
)
)
.withRollup(false)
.build()
)
.rows(CalciteTests.ROWS1)
.buildMMappedIndex();
final QueryableIndex index =
IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new DoublesSketchAggregatorFactory(
"qsketch_m1",
"m1",
128
)
)
.withRollup(false)
.build()
)
.rows(CalciteTests.ROWS1)
.buildMMappedIndex();

walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
DataSegment.builder()
Expand Down Expand Up @@ -401,6 +402,73 @@ public void testQuantileOnInnerQuery() throws Exception
);
}

@Test
public void testQuantileOnInnerQuantileQuery() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+ "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1";


final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();

ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
builder.add(new Object[]{"", 1.0});
builder.add(new Object[]{"1", 4.0});
builder.add(new Object[]{"10.1", 2.0});
builder.add(new Object[]{"2", 3.0});
builder.add(new Object[]{"abc", 6.0});
builder.add(new Object[]{"def", 5.0});
final List<Object[]> expectedResults = builder.build();
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}

// Verify query
Assert.assertEquals(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.5f)
)
)
.setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
.build()
)
)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
.setAggregatorSpecs(
new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f)
)
)
.setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}

private static PostAggregator makeFieldAccessPostAgg(String name)
{
return new FieldAccessPostAggregator(name, name);
Expand Down