Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
64479e0
add test-copy1
kgyrtkirk Aug 3, 2023
899badd
let the autoformat work
kgyrtkirk Aug 3, 2023
8be630c
add tryies
kgyrtkirk Aug 3, 2023
b63df49
some more test
kgyrtkirk Aug 3, 2023
5089f8b
build q
kgyrtkirk Aug 3, 2023
453a81d
build q
kgyrtkirk Aug 3, 2023
34eace2
update test
kgyrtkirk Aug 3, 2023
def97c0
remove tries
kgyrtkirk Aug 3, 2023
5595f45
add test for good behaiour
kgyrtkirk Aug 3, 2023
f8756c8
fix0
kgyrtkirk Aug 3, 2023
d91e134
cleanup
kgyrtkirk Aug 3, 2023
ba54ce7
possible fix
kgyrtkirk Aug 3, 2023
058f1b9
ignore test
kgyrtkirk Aug 3, 2023
00f5f8e
fix format
kgyrtkirk Aug 3, 2023
ae581a2
half-fix 1 test
kgyrtkirk Aug 7, 2023
6e75a68
test for #2
kgyrtkirk Aug 7, 2023
1ef0354
some changes
kgyrtkirk Aug 7, 2023
7e99f43
updates
kgyrtkirk Aug 7, 2023
f2e2fc5
fix a set of tests
kgyrtkirk Aug 7, 2023
5d2cdfe
fix more tests
kgyrtkirk Aug 7, 2023
bcb4b3c
unpatch
kgyrtkirk Aug 7, 2023
2231831
tries
kgyrtkirk Aug 7, 2023
6c66381
allow timeseries in ingestion
kgyrtkirk Aug 7, 2023
cb30e7c
fix more tests
kgyrtkirk Aug 7, 2023
8e4a3fc
fix a few more
kgyrtkirk Aug 7, 2023
3907672
fix-b
kgyrtkirk Aug 8, 2023
33a2293
remove some unrelated stuff
kgyrtkirk Aug 8, 2023
0f3b3a6
fix-B2
kgyrtkirk Aug 8, 2023
e88f3cf
remove test framework changes
kgyrtkirk Aug 8, 2023
794bc58
shorter; but not better
kgyrtkirk Aug 8, 2023
b4e9f24
Revert "shorter; but not better"
kgyrtkirk Aug 8, 2023
f06f650
replace returns with IAE
kgyrtkirk Aug 8, 2023
b890cd1
a slightly better patch
kgyrtkirk Aug 8, 2023
2d04efb
cleanup
kgyrtkirk Aug 8, 2023
7a71a81
remove ws
kgyrtkirk Aug 8, 2023
73c6804
move logic to iterator()
kgyrtkirk Aug 8, 2023
0aece50
add testcase for FILTER
kgyrtkirk Aug 9, 2023
1ff9a7d
correct typo
kgyrtkirk Aug 9, 2023
d50333e
use boolean to skip init
kgyrtkirk Aug 9, 2023
ecd023d
unrefactor
kgyrtkirk Aug 9, 2023
2ace00d
use isI
kgyrtkirk Aug 9, 2023
24dbdf0
grouping sets
kgyrtkirk Aug 9, 2023
cc53182
update test
kgyrtkirk Aug 10, 2023
e09d1c2
remove testcases
kgyrtkirk Aug 11, 2023
494d1cf
updates
kgyrtkirk Aug 11, 2023
a8615f2
fix again
kgyrtkirk Aug 11, 2023
30e79b8
add test at processing; fix issue with combiner
kgyrtkirk Aug 11, 2023
6df5e35
ugly-fix-joe
kgyrtkirk Aug 11, 2023
2cd309e
handle null-s in combine; not sure if this could output null or not
kgyrtkirk Aug 11, 2023
91ec3ce
Revert "handle null-s in combine; not sure if this could output null …
kgyrtkirk Aug 11, 2023
4e5daf3
viable - but doesnt work
kgyrtkirk Aug 11, 2023
7aa0f00
it works afterall
kgyrtkirk Aug 11, 2023
b30d37f
fix style
kgyrtkirk Aug 11, 2023
bc5a568
not helping change
kgyrtkirk Aug 11, 2023
89c6d18
fix aggregate 0 rows vectorization issue
kgyrtkirk Aug 16, 2023
41a7c25
fix aggregate 0 rows vectorization
kgyrtkirk Aug 16, 2023
599e217
cleanup
kgyrtkirk Aug 16, 2023
c7b8372
add missing Override
kgyrtkirk Aug 16, 2023
5f780f2
add cachekey method
kgyrtkirk Aug 16, 2023
fcb4869
fix cache id
kgyrtkirk Aug 16, 2023
a8c3f7a
add factory returning methods to CSAF
kgyrtkirk Aug 17, 2023
deb4f5c
Merge remote-tracking branch 'apache/master' into count-agg-missing0
kgyrtkirk Aug 17, 2023
726a185
fix aggregator expectations in msq
kgyrtkirk Aug 18, 2023
e94b865
Merge remote-tracking branch 'apache/master' into count-agg-missing0
kgyrtkirk Aug 25, 2023
6d24133
remove old
kgyrtkirk Aug 25, 2023
754034a
summaryrowx
kgyrtkirk Aug 25, 2023
304daed
there
kgyrtkirk Aug 25, 2023
6c09d92
cleanup
kgyrtkirk Aug 25, 2023
b0d7e44
most likely unrelated test changes
kgyrtkirk Aug 25, 2023
895c1ce
checkstyle/etc
kgyrtkirk Aug 25, 2023
0720ed4
fixup test
kgyrtkirk Aug 25, 2023
0cdffe2
lazy it-hasNext eval
kgyrtkirk Aug 26, 2023
ec98adc
added test
kgyrtkirk Aug 28, 2023
e689f48
all-gran rs
kgyrtkirk Aug 28, 2023
17cf4f0
vectorx
kgyrtkirk Aug 28, 2023
5b29f1c
try1
kgyrtkirk Aug 28, 2023
ea93ceb
Revert "try1"
kgyrtkirk Aug 28, 2023
7893c56
it does work; but..
kgyrtkirk Aug 28, 2023
0841d3e
works..but2
kgyrtkirk Aug 28, 2023
75c74e0
cleanup/etc
kgyrtkirk Aug 28, 2023
2931898
fix
kgyrtkirk Aug 28, 2023
7325f72
commented runnerfactory level
kgyrtkirk Aug 29, 2023
13a5097
updates
kgyrtkirk Aug 29, 2023
1dbb496
remove grouper approach; migrate to runnerfactory
kgyrtkirk Aug 29, 2023
3d4403a
cleanup/format/etc
kgyrtkirk Aug 29, 2023
d37bf3d
cleanup; add test for subq at processing
kgyrtkirk Aug 29, 2023
05d719c
ugly-subq handling
kgyrtkirk Aug 29, 2023
1e0f99b
updates
kgyrtkirk Aug 29, 2023
cf79d1c
clenaup
kgyrtkirk Aug 29, 2023
be18860
remove type args; add safevarags
kgyrtkirk Aug 29, 2023
cd6cc79
cleanup
kgyrtkirk Aug 29, 2023
39b0ada
move stuff to toolchest
kgyrtkirk Aug 29, 2023
2281a71
put back into factory
kgyrtkirk Aug 29, 2023
13eb306
cleanup
kgyrtkirk Aug 29, 2023
cf59ed3
move to mergeResults fn
kgyrtkirk Aug 30, 2023
faeec4e
fix NullHandling.replaceWithDefault in GroupByQueryRunnerTest
kgyrtkirk Aug 30, 2023
3fb27dc
having test+fix
kgyrtkirk Aug 30, 2023
7d4a7bf
move to GroupingEngine#applyPostProcessing
kgyrtkirk Aug 30, 2023
adcb001
processing-test
kgyrtkirk Aug 30, 2023
5332a95
make IS_FINER_THAN final
kgyrtkirk Aug 31, 2023
f85a732
fix asList
kgyrtkirk Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public abstract class Granularity implements Cacheable
{
public static Comparator<Granularity> IS_FINER_THAN = new Comparator<Granularity>()
public static final Comparator<Granularity> IS_FINER_THAN = new Comparator<Granularity>()
{
@Override
/**
Expand Down Expand Up @@ -215,6 +215,16 @@ final Integer[] getDateValues(String filePath, Formatter formatter)
return vals;
}

/**
* Decides whether this granularity is finer than the other granularity
*
* @return true if this {@link Granularity} is finer than the passed one
*/
public boolean isFinerThan(Granularity g)
{
return IS_FINER_THAN.compare(this, g) < 0;
}

/**
* Return an iterable of granular buckets that overlap a particular interval.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static <T> Sequence<T> empty()
return (Sequence<T>) EMPTY_SEQUENCE;
}

@SafeVarargs
public static <T> Sequence<T> concat(Sequence<T>... sequences)
{
return concat(Arrays.asList(sequences));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -66,16 +67,20 @@
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.join.filter.AllNullColumnSelectorFactory;
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -436,6 +441,8 @@ public Sequence<ResultRow> process(
*/
public Sequence<ResultRow> applyPostProcessing(Sequence<ResultRow> results, GroupByQuery query)
{
results = wrapSummaryRowIfNeeded(query, results);

// Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper
if (query.context().getBoolean(CTX_KEY_OUTERMOST, true)) {
return query.postProcess(results);
Expand Down Expand Up @@ -726,4 +733,57 @@ private Set<String> getAggregatorAndPostAggregatorNames(GroupByQuery query)

return aggsAndPostAggs;
}

/**
* Wraps the sequence around if for this query a summary row might be needed in case the input becomes empty.
*/
public static Sequence<ResultRow> wrapSummaryRowIfNeeded(GroupByQuery query, Sequence<ResultRow> process)
{
if (!summaryRowPreconditions(query)) {
return process;
}

final AtomicBoolean t = new AtomicBoolean();

return Sequences.concat(
Sequences.map(process, ent -> {
t.set(true);
return ent;
}),
Sequences.simple(() -> {
if (t.get()) {
return Collections.emptyIterator();
}
return summaryRowIterator(query);
}));
}

private static boolean summaryRowPreconditions(GroupByQuery query)
{
LimitSpec limit = query.getLimitSpec();
if (limit instanceof DefaultLimitSpec) {
DefaultLimitSpec limitSpec = (DefaultLimitSpec) limit;
if (limitSpec.getLimit() == 0 || limitSpec.getOffset() > 0) {
return false;
}
}
if (!query.getDimensions().isEmpty()) {
return false;
}
if (query.getGranularity().isFinerThan(Granularities.ALL)) {
return false;
}
return true;
}

private static Iterator<ResultRow> summaryRowIterator(GroupByQuery q)
{
List<AggregatorFactory> aggSpec = q.getAggregatorSpecs();
Object[] values = new Object[aggSpec.size()];
for (int i = 0; i < aggSpec.size(); i++) {
values[i] = aggSpec.get(i).factorize(new AllNullColumnSelectorFactory()).get();
}
return Collections.singleton(ResultRow.of(values)).iterator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12958,6 +12958,157 @@ public void testGroupByFloatMinExpressionVsVirtualColumnWithExplicitStringVirtua
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}

@Test
public void testSummaryrowForEmptyInput()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();

List<ResultRow> expectedResults = ImmutableList.of(
makeRow(
query,
"2011-04-01",
"rows",
0L,
"idx",
NullHandling.replaceWithDefault() ? 0L : null,
"idxFloat",
NullHandling.replaceWithDefault() ? 0.0 : null,
"idxDouble",
NullHandling.replaceWithDefault() ? 0.0 : null
)
);

StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}

@Test
public void testSummaryrowFilteredByHaving()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setHavingSpec(new GreaterThanHavingSpec("rows", 99L))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();

List<ResultRow> expectedResults = ImmutableList.of();

StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}


@Test
public void testSummaryrowForEmptySubqueryInput()
{
GroupByQuery subquery = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();

GroupByQuery query = makeQueryBuilder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.build();

List<ResultRow> expectedResults = ImmutableList.of(
makeRow(
query,
"2011-04-01",
"rows",
0L,
"idx",
NullHandling.replaceWithDefault() ? 0L : null,
"idxFloat",
NullHandling.replaceWithDefault() ? 0.0 : null,
"idxDouble",
NullHandling.replaceWithDefault() ? 0.0 : null
)
);

StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}


@Test
public void testSummaryrowForEmptyInputByDay()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimFilter(new SelectorDimFilter("placementish", "xxa", null))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();

List<ResultRow> expectedResults = ImmutableList.of();

StubServiceEmitter serviceEmitter = new StubServiceEmitter("", "");
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQueryWithEmitter(
factory,
originalRunner,
query,
serviceEmitter
);
serviceEmitter.verifyEmitted("query/wait/time", ImmutableMap.of("vectorized", vectorize), 1);
TestHelper.assertExpectedObjects(expectedResults, results, "groupBy");
}

private static ResultRow makeRow(final GroupByQuery query, final String timestamp, final Object... vals)
{
return GroupByQueryRunnerTestHelper.createExpectedRow(query, timestamp, vals);
Expand Down
Loading