Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker
}

@Override
public QueryResponse runQuery(final DruidQuery druidQuery)
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public Response doPost(
final String sqlQueryId = stmt.sqlQueryId();
try {
final DirectStatement.ResultSet plan = stmt.plan();
final QueryResponse response = plan.run();
final Sequence sequence = response.getResults();
final QueryResponse<Object[]> response = plan.run();
final Sequence<Object[]> sequence = response.getResults();
final SqlRowTransformer rowTransformer = plan.createRowTransformer();
final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void testIncorrectInsertQuery()
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"))
"CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"))
))
.verifyPlanningErrors();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void testReplaceIncorrectSyntax()
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."))
"Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table."))
)
)
.verifyPlanningErrors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ public void testSelectOnInformationSchemaSource()
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [INFORMATION_SCHEMA.SCHEMATA] with SQL engine 'msq-task'."))
"Cannot query table INFORMATION_SCHEMA.SCHEMATA with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
Expand All @@ -712,7 +712,7 @@ public void testSelectOnSysSource()
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [sys.segments] with SQL engine 'msq-task'."))
"Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
Expand All @@ -727,7 +727,7 @@ public void testSelectOnSysSourceWithJoin()
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [sys.segments] with SQL engine 'msq-task'."))
"Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
Expand All @@ -743,7 +743,7 @@ public void testSelectOnSysSourceContainingWith()
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"Cannot query table [sys.segments] with SQL engine 'msq-task'."))
"Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
Expand Down
18 changes: 8 additions & 10 deletions server/src/main/java/org/apache/druid/server/QueryLifecycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public QueryLifecycle(
this.startNs = startNs;
}


/**
* For callers who have already authorized their query, and where simplicity is desired over flexibility. This method
* does it all in one call. Logs and metrics are emitted when the Sequence is either fully iterated or throws an
Expand All @@ -140,8 +139,7 @@ public QueryLifecycle(
*
* @return results
*/
@SuppressWarnings("unchecked")
public <T> QueryResponse runSimple(
public <T> QueryResponse<T> runSimple(
final Query<T> query,
final AuthenticationResult authenticationResult,
final Access authorizationResult
Expand All @@ -151,7 +149,7 @@ public <T> QueryResponse runSimple(

final Sequence<T> results;

final QueryResponse queryResponse;
final QueryResponse<T> queryResponse;
try {
preAuthorized(authenticationResult, authorizationResult);
if (!authorizationResult.isAllowed()) {
Expand All @@ -172,7 +170,7 @@ public <T> QueryResponse runSimple(
* cannot be moved into execute(). We leave this as an exercise for the future, however as this oddity
* was discovered while just trying to expose HTTP response headers
*/
return new QueryResponse(
return new QueryResponse<T>(
Sequences.wrap(
results,
new SequenceWrapper()
Expand All @@ -193,8 +191,7 @@ public void after(final boolean isDone, final Throwable thrown)
*
* @param baseQuery the query
*/
@SuppressWarnings("unchecked")
public void initialize(final Query baseQuery)
public void initialize(final Query<?> baseQuery)
{
transition(State.NEW, State.INITIALIZED);

Expand Down Expand Up @@ -282,17 +279,18 @@ private Access doAuthorize(final AuthenticationResult authenticationResult, fina
*
* @return result sequence and response context
*/
public QueryResponse execute()
public <T> QueryResponse<T> execute()
{
transition(State.AUTHORIZED, State.EXECUTING);

final ResponseContext responseContext = DirectDruidClient.makeResponseContextForQuery();

final Sequence<?> res = QueryPlus.wrap(baseQuery)
@SuppressWarnings("unchecked")
final Sequence<T> res = QueryPlus.wrap((Query<T>) baseQuery)
.withIdentity(authenticationResult.getIdentity())
.run(texasRanger, responseContext);

return new QueryResponse(res == null ? Sequences.empty() : res, responseContext);
return new QueryResponse<T>(res == null ? Sequences.empty() : res, responseContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public Response doPost(
throw new ForbiddenException(authResult.toString());
}

final QueryResponse queryResponse = queryLifecycle.execute();
final QueryResponse<?> queryResponse = queryLifecycle.execute();
final Sequence<?> results = queryResponse.getResults();
final ResponseContext responseContext = queryResponse.getResponseContext();
final String prevEtag = getPreviousEtag(req);
Expand Down Expand Up @@ -477,8 +477,8 @@ String getResponseType()
}

ObjectWriter newOutputWriter(
@Nullable QueryToolChest toolChest,
@Nullable Query query,
@Nullable QueryToolChest<?, Query<?>> toolChest,
@Nullable Query<?> query,
boolean serializeDateTimeAsLong
)
{
Expand Down
18 changes: 9 additions & 9 deletions server/src/main/java/org/apache/druid/server/QueryResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;

public class QueryResponse
public class QueryResponse<T>
{
public static QueryResponse withEmptyContext(Sequence results)
{
return new QueryResponse(results, ResponseContext.createEmpty());
}

private final Sequence results;
private final Sequence<T> results;
private final ResponseContext responseContext;

public QueryResponse(final Sequence results, final ResponseContext responseContext)
public QueryResponse(final Sequence<T> results, final ResponseContext responseContext)
{
this.results = results;
this.responseContext = responseContext;
}

public Sequence getResults()
public static <T> QueryResponse<T> withEmptyContext(Sequence<T> results)
{
return new QueryResponse<T>(results, ResponseContext.createEmpty());
}

public Sequence<T> getResults()
{
return results;
}
Expand Down
2 changes: 1 addition & 1 deletion sql/src/main/codegen/includes/insert.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ SqlNode DruidSqlInsertEof() :
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
Expand Down
2 changes: 1 addition & 1 deletion sql/src/main/codegen/includes/replace.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ SqlNode DruidSqlReplaceEof() :
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
Expand Down
4 changes: 2 additions & 2 deletions sql/src/main/java/org/apache/druid/sql/DirectStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public boolean runnable()
* Do the actual execute step which allows subclasses to wrap the sequence,
* as is sometimes needed for testing.
*/
public QueryResponse run()
public QueryResponse<Object[]> run()
{
try {
// Check cancellation. Required for SqlResourceTest to work.
Expand Down Expand Up @@ -176,7 +176,7 @@ public DirectStatement(
*
* @return sequence which delivers query results
*/
public QueryResponse execute()
public QueryResponse<Object[]> execute()
{
return plan().run();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.parser;

import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;

import javax.annotation.Nullable;

/**
* Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
* Allows Planner code to work with these two statements generically where they
* share common clauses.
*/
public abstract class DruidSqlIngest extends SqlInsert
Comment thread
paul-rogers marked this conversation as resolved.
{
protected final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;

@Nullable
protected final SqlNodeList clusteredBy;

public DruidSqlIngest(SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
)
{
super(pos, keywords, targetTable, source, columnList);

this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
}

public Granularity getPartitionedBy()
{
return partitionedBy;
}

@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.sql.calcite.parser;

import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
Expand All @@ -32,24 +31,16 @@

/**
* Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* This class extends the {@link DruidSqlIngest} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
public class DruidSqlInsert extends SqlInsert
public class DruidSqlInsert extends DruidSqlIngest
{
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";

// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;

private final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
private final String partitionedByStringForUnparse;

@Nullable
private final SqlNodeList clusteredBy;

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
Expand All @@ -61,35 +52,18 @@ public DruidSqlInsert(
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
) throws ParseException
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList()
insertNode.getTargetColumnList(),
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
if (partitionedBy == null) {
throw new ParseException("INSERT statements must specify PARTITIONED BY clause explicitly");
}
this.partitionedBy = partitionedBy;

Preconditions.checkNotNull(partitionedByStringForUnparse);
this.partitionedByStringForUnparse = partitionedByStringForUnparse;

this.clusteredBy = clusteredBy;
}

@Nullable
public SqlNodeList getClusteredBy()
{
return clusteredBy;
}

public Granularity getPartitionedBy()
{
return partitionedBy;
}

@Nonnull
Expand Down
Loading