Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,26 @@ public class VarianceGroupByQueryTest
private final QueryRunner<Row> runner;
private final GroupByQueryRunnerFactory factory;
private final String testName;
private final boolean compatibilityMode;

@Parameterized.Parameters(name="{0}")
public static Collection<?> constructorFeeder() throws IOException
{
return GroupByQueryRunnerTest.constructorFeeder();
}

public VarianceGroupByQueryTest(String testName, GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner)
public VarianceGroupByQueryTest(
String testName,
GroupByQueryConfig config,
GroupByQueryRunnerFactory factory,
QueryRunner runner,
boolean compatibilityMode)
{
this.testName = testName;
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
this.compatibilityMode = compatibilityMode;
}

@Test
Expand Down
24 changes: 19 additions & 5 deletions processing/src/main/java/io/druid/query/groupby/GroupByQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public boolean getContextSortByDimsFirst()
@Override
public Ordering getResultOrdering()
{
final Ordering<Row> rowOrdering = getRowOrdering(false);
final Ordering<Row> rowOrdering = getRowOrdering(false, false);

return Ordering.from(
new Comparator<Object>()
Expand All @@ -300,11 +300,11 @@ public int compare(Object lhs, Object rhs)
);
}

public Ordering<Row> getRowOrdering(final boolean granular)
public Ordering<Row> getRowOrdering(final boolean granular, final boolean compatibilityMode)
{
final boolean sortByDimsFirst = getContextSortByDimsFirst();

final Comparator<Row> timeComparator = getTimeComparator(granular);
final Comparator<Row> timeComparator = getTimeComparator(granular, compatibilityMode);

if (timeComparator == null) {
return Ordering.from(
Expand Down Expand Up @@ -353,10 +353,24 @@ public int compare(Row lhs, Row rhs)
}
}

private Comparator<Row> getTimeComparator(boolean granular)
private Comparator<Row> getTimeComparator(boolean granular, boolean compatibilityMode)
{
if (Granularities.ALL.equals(granularity)) {
return null;
if (compatibilityMode) {
return new Comparator<Row>()
{
@Override
public int compare(Row lhs, Row rhs)
{
return Longs.compare(
granularity.bucketStart(lhs.getTimestamp()).getMillis(),
granularity.bucketStart(rhs.getTimestamp()).getMillis()
);
}
};
} else {
return null;
}
} else if (granular) {
return new Comparator<Row>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public interface GroupByStrategy
*/
boolean isCacheable(boolean willMergeRunners);

/**
* Indicates this strategy is compatible with GroupByStrategyV1 or not.
*
* @param compatibilityMode indicates whether to operate in compatibility mode or not
* @return true if this strategy is compatible with GroupByStrategyV1, otherwise false.
*/
boolean isInCompatibilityMode(boolean compatibilityMode);

/**
* Decorate a runner with an interval chunking decorator.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public boolean isCacheable(boolean willMergeRunners)
return true;
}

@Override
public boolean isInCompatibilityMode(boolean compatibilityMode)
{
return false;
}

@Override
public QueryRunner<Row> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ public boolean isCacheable(boolean willMergeRunners)
return willMergeRunners;
}

@Override
public boolean isInCompatibilityMode(boolean compatibilityMode)
{
return compatibilityMode;
}

@Override
public QueryRunner<Row> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
Expand Down Expand Up @@ -209,7 +215,8 @@ public Sequence<Row> mergeResults(
@Override
protected Ordering<Row> makeOrdering(Query<Row> queryParam)
{
return ((GroupByQuery) queryParam).getRowOrdering(true);
return ((GroupByQuery) queryParam).getRowOrdering(true,
isInCompatibilityMode(query.getContextValue(GroupByQueryConfig.CTX_KEY_STRATEGY) == null ? true : false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
public class GroupByQueryRunnerTest
{
public static final ObjectMapper DEFAULT_MAPPER = new DefaultObjectMapper(new SmileFactory());
public static final List<Boolean> COMPATIBILITY_MODES = ImmutableList.of(true, false);
public static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new DruidProcessingConfig()
{
@Override
Expand Down Expand Up @@ -183,6 +184,7 @@ public int getNumThreads()
private GroupByQueryRunnerFactory factory;
private GroupByQueryConfig config;
private final String testName;
private final boolean compatibilityMode;

@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand Down Expand Up @@ -380,26 +382,34 @@ public static Collection<?> constructorFeeder() throws IOException
for (GroupByQueryConfig config : testConfigs()) {
final GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(config);
for (QueryRunner<Row> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
final String testName = String.format(
"config=%s, runner=%s",
config.toString(),
runner.toString()
);
constructors.add(new Object[]{testName, config, factory, runner});
for (boolean compatibilityMode : COMPATIBILITY_MODES) {
final String testName = String.format(
"config=%s, runner=%s, compatibilityMode=%s",
config.toString(),
runner.toString(),
compatibilityMode
);
constructors.add(new Object[] { testName, config, factory, runner, compatibilityMode });
}
}
}

return constructors;
}

public GroupByQueryRunnerTest(
String testName, GroupByQueryConfig config, GroupByQueryRunnerFactory factory, QueryRunner runner
String testName,
GroupByQueryConfig config,
GroupByQueryRunnerFactory factory,
QueryRunner runner,
boolean compatibilityMode
)
{
this.testName = testName;
this.config = config;
this.factory = factory;
this.runner = factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(runner));
this.compatibilityMode = compatibilityMode;
}

@Test
Expand Down