Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand Down Expand Up @@ -704,6 +706,71 @@ public void testReplaceOnFooWithWhere(String contextName, Map<String, Object> co
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceWithDynamicParameters(String contextName, Map<String, Object> context)

Check notice

Code scanning / CodeQL

Useless parameter

The parameter 'contextName' is never used.
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("m1", ColumnType.FLOAT)
.build();

testIngestQuery().setSql(
" REPLACE INTO foo OVERWRITE WHERE __time >= ? AND __time < ? "
+ "SELECT __time, m1 "
+ "FROM foo "
+ "WHERE __time >= ? AND __time < ? "
+ "PARTITIONED by DAY ")
.setDynamicParameters(ImmutableList.of(
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-02").getMillis()),
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-03").getMillis()),
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-02").getMillis()),
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("2000-01-03").getMillis())
))
.setExpectedDataSource("foo")
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of(
"2000-01-02T00:00:00.000Z/2000-01-03T00:00:00.000Z")))
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedSegments(ImmutableSet.of(SegmentId.of(
"foo",
Intervals.of("2000-01-02T/P1D"),
"test",
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{946771200000L, 2.0f}))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().totalFiles(1),
0, 0, "input0"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
0, 0, "shuffle"
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotMatcher
.with().rows(1).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(1),
1, 0
)
.setExpectedLastCompactionState(
expectedCompactionState(
context,
Collections.emptyList(),
Collections.singletonList(new FloatDimensionSchema("m1")),
GranularityType.DAY,
Intervals.of("2000-01-02T/P1D")
)
)
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFoo1WithAllExtern(String contextName, Map<String, Object> context) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
Expand Down Expand Up @@ -409,6 +411,53 @@ public void testSelectOnFooWhereMatchesNoSegments(String contextName, Map<String
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectWithDynamicParameters(String contextName, Map<String, Object> context)
{
RowSignature resultSignature = RowSignature.builder()
.add("cnt", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.build();

// Filter [__time >= timestamp '3000-01-01 00:00:00'] matches no segments at all.
testSelectQuery()
.setSql("select cnt,dim1 from foo where __time >= ?")
.setExpectedMSQSpec(
MSQSpec.builder()
.query(
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(
querySegmentSpec(
Intervals.utc(
DateTimes.of("3000").getMillis(),
Intervals.ETERNITY.getEndMillis()
)
)
)
.columns("cnt", "dim1")
.context(defaultScanQueryContext(context, resultSignature))
.build()
)
.columnMappings(ColumnMappings.identity(resultSignature))
.tuningConfig(MSQTuningConfig.defaultConfig())
.destination(isDurableStorageDestination(contextName, context)
? DurableStorageMSQDestination.INSTANCE
: TaskReportMSQDestination.INSTANCE)
.build()
)
.setDynamicParameters(
ImmutableList.of(
TypedValue.ofLocal(ColumnMetaData.Rep.JAVA_SQL_TIMESTAMP, DateTimes.of("3000-01-01").getMillis())
)
)
.setQueryContext(context)
.setExpectedRowSignature(resultSignature)
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
}

@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectOnFooWhereMatchesNoData(String contextName, Map<String, Object> context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
Expand Down Expand Up @@ -218,7 +219,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -786,13 +786,13 @@ public static WorkerMemoryParameters makeTestWorkerMemoryParameters()
);
}

private String runMultiStageQuery(String query, Map<String, Object> context)
private String runMultiStageQuery(String query, Map<String, Object> context, List<TypedValue> parameters)
{
final DirectStatement stmt = sqlStatementFactory.directStatement(
new SqlQueryPlus(
query,
context,
Collections.emptyList(),
parameters,
CalciteTests.REGULAR_USER_AUTH_RESULT
)
);
Expand Down Expand Up @@ -888,6 +888,7 @@ public abstract class MSQTester<Builder extends MSQTester<Builder>>
protected String sql = null;
protected MSQControllerTask taskSpec = null;
protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
protected List<TypedValue> dynamicParameters = new ArrayList<>();
protected List<MSQResultsReport.ColumnAndType> expectedRowSignature = null;
protected MSQSpec expectedMSQSpec = null;
protected MSQTuningConfig expectedTuningConfig = null;
Expand Down Expand Up @@ -926,6 +927,12 @@ public Builder setQueryContext(Map<String, Object> queryContext)
return asBuilder();
}

public Builder setDynamicParameters(List<TypedValue> dynamicParameters)
{
this.dynamicParameters = dynamicParameters;
return asBuilder();
}

public Builder setExpectedRowSignature(List<MSQResultsReport.ColumnAndType> expectedRowSignature)
{
Preconditions.checkArgument(!expectedRowSignature.isEmpty(), "Row signature cannot be empty");
Expand Down Expand Up @@ -1059,7 +1066,7 @@ public void verifyPlanningErrors()

final Throwable e = Assert.assertThrows(
Throwable.class,
() -> runMultiStageQuery(sql, queryContext)
() -> runMultiStageQuery(sql, queryContext, dynamicParameters)
);

assertThat(e, expectedValidationErrorMatcher);
Expand Down Expand Up @@ -1211,7 +1218,7 @@ public void verifyResults()
String controllerId;
if (sql != null) {
// Run the sql command.
controllerId = runMultiStageQuery(sql, queryContext);
controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
Expand Down Expand Up @@ -1428,7 +1435,7 @@ public void verifyExecutionError()
try {
String controllerId;
if (sql != null) {
controllerId = runMultiStageQuery(sql, queryContext);
controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
Expand Down Expand Up @@ -1470,7 +1477,7 @@ public Pair<MSQSpec, Pair<List<MSQResultsReport.ColumnAndType>, List<Object[]>>>
Preconditions.checkArgument(sql == null || queryContext != null, "queryContext cannot be null");

try {
String controllerId = runMultiStageQuery(sql, queryContext);
String controllerId = runMultiStageQuery(sql, queryContext, dynamicParameters);

if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
*/
public class DruidPlanner implements Closeable
{

public static final Joiner SPACE_JOINER = Joiner.on(" ");
public static final Joiner COMMA_JOINER = Joiner.on(", ");

Expand Down Expand Up @@ -148,6 +147,7 @@ public void validate()
catch (SqlParseException e1) {
throw translateException(e1);
}
root = rewriteParameters(root);
hook.captureSqlNode(root);
handler = createHandler(root);
handler.validate();
Expand All @@ -158,6 +158,7 @@ public void validate()
private SqlStatementHandler createHandler(final SqlNode node)
{
SqlNode query = node;

SqlExplain explain = null;
if (query.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) query;
Expand All @@ -179,6 +180,27 @@ private SqlStatementHandler createHandler(final SqlNode node)
throw InvalidSqlInput.exception("Unsupported SQL statement [%s]", node.getKind());
}

/**
* Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
* {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link org.apache.calcite.sql.SqlLiteral}
* replacement.
*
* @return a rewritten {@link SqlNode} with any dynamic parameters rewritten in the provided {@code original} node,
* if they were present.
*/
private SqlNode rewriteParameters(final SqlNode original)
{
// Parameter replacement is done only if the client provides parameter values.
// If this is a PREPARE-only, then there will be no values even if the statement contains
// parameters. If this is a PLAN, then we'll catch later the case that the statement
// contains parameters, but no values were provided.
if (plannerContext.getParameters().isEmpty()) {
return original;
} else {
return original.accept(new SqlParameterizerShuttle(plannerContext)); // the rewrite happens here.
}
}

/**
* Prepare a SQL query for execution, including some initial parsing and
* validation and any dynamic parameter type resolution, to support prepared
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ public ReplaceHandler(
protected static DruidSqlReplace convertQuery(DruidSqlReplace sqlNode)
{
SqlNode query = convertSourceQuery(sqlNode);

return DruidSqlReplace.create(
new SqlInsert(
sqlNode.getParserPosition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected SqlNode validate(SqlNode root)
CalcitePlanner planner = handlerContext.planner();
SqlNode validatedQueryNode;
try {
validatedQueryNode = planner.validate(rewriteParameters(root));
validatedQueryNode = planner.validate(root);
}
catch (ValidationException e) {
throw DruidPlanner.translateException(e);
Expand All @@ -129,24 +129,6 @@ protected SqlNode validate(SqlNode root)
return validatedQueryNode;
}

private SqlNode rewriteParameters(SqlNode original)
{
// Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
// {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
// replacement.
//
// Parameter replacement is done only if the client provides parameter values.
// If this is a PREPARE-only, then there will be no values even if the statement contains
// parameters. If this is a PLAN, then we'll catch later the case that the statement
// contains parameters, but no values were provided.
PlannerContext plannerContext = handlerContext.plannerContext();
if (plannerContext.getParameters().isEmpty()) {
return original;
} else {
return original.accept(new SqlParameterizerShuttle(plannerContext));
}
}

@Override
public void prepare()
{
Expand Down