Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public void tearDown() throws Exception
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
public void querySql(Blackhole blackhole)
{
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
Expand All @@ -561,7 +561,7 @@ public void querySql(Blackhole blackhole) throws Exception
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void planSql(Blackhole blackhole) throws Exception
public void planSql(Blackhole blackhole)
{
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.openjdk.jmh.infra.Blackhole;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -349,7 +348,7 @@ public void tearDown() throws Exception
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
public void querySql(Blackhole blackhole)
{
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void tearDown() throws Exception
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
public void querySql(Blackhole blackhole)
{
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void queryNative(Blackhole blackhole)
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void queryPlanner(Blackhole blackhole) throws Exception
public void queryPlanner(Blackhole blackhole)
{
try (final DruidPlanner planner = plannerFactory.createPlannerForTesting(engine, sqlQuery, Collections.emptyMap())) {
final PlannerResult plannerResult = planner.plan();
Expand Down
5 changes: 5 additions & 0 deletions extensions-core/datasketches/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ public void testThetaSketchIntersectOnScalarExpression()
{
assertQueryIsUnplannable(
"SELECT THETA_SKETCH_INTERSECT(NULL, NULL) FROM foo",
"Possible error: THETA_SKETCH_INTERSECT can only be used on aggregates. " +
"THETA_SKETCH_INTERSECT can only be used on aggregates. " +
"It cannot be used directly on a column or on a scalar expression."
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand Down Expand Up @@ -86,7 +87,7 @@ public String name()
}

@Override
public void validateContext(Map<String, Object> queryContext) throws ValidationException
public void validateContext(Map<String, Object> queryContext)
{
SqlEngines.validateNoSpecialContextKeys(queryContext, SYSTEM_CONTEXT_PARAMETERS);
}
Expand Down Expand Up @@ -250,18 +251,17 @@ private static void validateInsert(
* queries, because we use these output names to generate columns in segments. They must be unique.
*/
private static void validateNoDuplicateAliases(final List<Pair<Integer, String>> fieldMappings)
throws ValidationException
{
final Set<String> aliasesSeen = new HashSet<>();

for (final Pair<Integer, String> field : fieldMappings) {
if (!aliasesSeen.add(field.right)) {
throw new ValidationException("Duplicate field in SELECT: [" + field.right + "]");
throw InvalidSqlInput.exception("Duplicate field in SELECT: [%s]", field.right);
}
}
}

private static void validateLimitAndOffset(final RelNode topRel, final boolean limitOk) throws ValidationException
private static void validateLimitAndOffset(final RelNode topRel, final boolean limitOk)
{
Sort sort = null;

Expand All @@ -283,13 +283,13 @@ private static void validateLimitAndOffset(final RelNode topRel, final boolean l
// The segment generator relies on shuffle statistics to determine segment intervals when PARTITIONED BY is not ALL,
// and LIMIT/OFFSET prevent shuffle statistics from being generated. This is because they always send everything
// to a single partition, so there are no shuffle statistics.
throw new ValidationException(
throw InvalidSqlInput.exception(
"INSERT and REPLACE queries cannot have a LIMIT unless PARTITIONED BY is \"ALL\"."
);
}
if (sort != null && sort.offset != null) {
// Found an outer OFFSET that is not allowed.
throw new ValidationException("INSERT and REPLACE queries cannot have an OFFSET.");
throw InvalidSqlInput.exception("INSERT and REPLACE queries cannot have an OFFSET.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.druid.common.exception.SanitizableException;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.guice.annotations.MSQ;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -69,15 +70,15 @@

/**
* Endpoint for SQL execution using MSQ tasks.
*
* <p>
* Unlike the SQL endpoint in {@link SqlResource}, this endpoint returns task IDs instead of inline results. Queries
* are executed asynchronously using MSQ tasks via the indexing service (Overlord + MM or Indexer). This endpoint
* does not provide a way for users to get the status or results of a query. That must be done using Overlord APIs
* for status and reports.
*
* <p>
* One exception: EXPLAIN query results are returned inline by this endpoint, in the same way as {@link SqlResource}
* would return them.
*
* <p>
* This endpoint does not support system tables or INFORMATION_SCHEMA. Queries on those tables result in errors.
*/
@Path("/druid/v2/sql/task/")
Expand Down Expand Up @@ -129,7 +130,7 @@ public Response doGetEnabled(@Context final HttpServletRequest request)

/**
* Post a query task.
*
* <p>
* Execution uses {@link MSQTaskSqlEngine} to ship the query off to the Overlord as an indexing task using
* {@link org.apache.druid.msq.indexing.MSQControllerTask}. The task ID is returned immediately to the caller,
* and execution proceeds asynchronously.
Expand Down Expand Up @@ -159,6 +160,13 @@ public Response doPost(
return buildStandardResponse(sequence, sqlQuery, sqlQueryId, rowTransformer);
}
}
catch (DruidException e) {
stmt.reporter().failed(e);
return Response.status(e.getStatusCode())
.type(MediaType.APPLICATION_JSON_TYPE)
.entity(new ErrorResponse(e))
.build();
}
// Kitchen-sinking the errors since they are all unchecked.
// Just copied from SqlResource.
catch (QueryCapacityExceededException cap) {
Expand All @@ -182,14 +190,6 @@ public Response doPost(
throw (ForbiddenException) serverConfig.getErrorResponseTransformStrategy()
.transformIfNeeded(e); // let ForbiddenExceptionMapper handle this
}
catch (RelOptPlanner.CannotPlanException e) {
stmt.reporter().failed(e);
SqlPlanningException spe = new SqlPlanningException(
SqlPlanningException.PlanningError.UNSUPPORTED_SQL_ERROR,
e.getMessage()
);
return buildNonOkResponse(BadQueryException.STATUS_CODE, spe, sqlQueryId);
}
// Calcite throws a java.lang.AssertionError which is type Error not Exception. Using Throwable catches both.
catch (Throwable e) {
stmt.reporter().failed(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
Expand All @@ -43,7 +44,6 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.hamcrest.CoreMatchers;
Expand Down Expand Up @@ -92,7 +92,6 @@ public static Collection<Object[]> data()
@Parameterized.Parameter(1)
public Map<String, Object> context;


@Test
public void testInsertOnFoo1()
{
Expand Down Expand Up @@ -541,11 +540,9 @@ public void testInsertOnFoo1WithMultiValueMeasureGroupBy()
"INSERT INTO foo1 SELECT count(dim3) FROM foo WHERE dim3 IS NOT NULL GROUP BY 1 PARTITIONED BY ALL TIME")
.setExpectedDataSource("foo1")
.setQueryContext(context)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Aggregate expression is illegal in GROUP BY clause"))
))
.setExpectedValidationErrorMatcher(
invalidSqlContains("Aggregate expression is illegal in GROUP BY clause")
)
.verifyPlanningErrors();
}

Expand Down Expand Up @@ -747,11 +744,9 @@ public void testInsertWithClusteredByDescendingThrowsException()
+ "PARTITIONED BY DAY "
+ "CLUSTERED BY dim1 DESC"
)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"[`dim1` DESC] is invalid. CLUSTERED BY columns cannot be sorted in descending order."))
))
.setExpectedValidationErrorMatcher(
invalidSqlIs("Invalid CLUSTERED BY clause [`dim1` DESC]: cannot sort in descending order.")
)
.verifyPlanningErrors();
}

Expand Down Expand Up @@ -967,7 +962,7 @@ public void testInsertWrongTypeTimestamp()
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
CoreMatchers.instanceOf(DruidException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Field \"__time\" must be of type TIMESTAMP"))
))
Expand All @@ -977,14 +972,14 @@ public void testInsertWrongTypeTimestamp()
@Test
public void testIncorrectInsertQuery()
{
testIngestQuery().setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 clustered by dim1")
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
"CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"))
))
.verifyPlanningErrors();
testIngestQuery()
.setSql(
"insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 clustered by dim1"
)
.setExpectedValidationErrorMatcher(invalidSqlContains(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
))
.verifyPlanningErrors();
}


Expand Down Expand Up @@ -1032,11 +1027,9 @@ public void testInsertDuplicateColumnNames()
+ " )\n"
+ ") PARTITIONED by day")
.setQueryContext(context)
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Duplicate field in SELECT: [namespace]"))
))
.setExpectedValidationErrorMatcher(
invalidSqlIs("Duplicate field in SELECT: [namespace]")
)
.verifyPlanningErrors();
}

Expand Down Expand Up @@ -1097,11 +1090,11 @@ public void testInsertLimitWithPeriodGranularityThrowsException()
+ "FROM foo "
+ "LIMIT 50 "
+ "PARTITIONED BY MONTH")
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"INSERT and REPLACE queries cannot have a LIMIT unless PARTITIONED BY is \"ALL\""))
))
.setExpectedValidationErrorMatcher(
invalidSqlContains(
"INSERT and REPLACE queries cannot have a LIMIT unless PARTITIONED BY is \"ALL\""
)
)
.setQueryContext(context)
.verifyPlanningErrors();
}
Expand All @@ -1115,11 +1108,9 @@ public void testInsertOffsetThrowsException()
+ "LIMIT 50 "
+ "OFFSET 10"
+ "PARTITIONED BY ALL TIME")
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"INSERT and REPLACE queries cannot have an OFFSET"))
))
.setExpectedValidationErrorMatcher(
invalidSqlContains("INSERT and REPLACE queries cannot have an OFFSET")
)
.setQueryContext(context)
.verifyPlanningErrors();
}
Expand Down
Loading