From 279379ba43b1b3abab60719190511921eb6df12e Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 6 Jul 2023 16:57:24 +0530 Subject: [PATCH 01/13] initial commit --- .../druid/msq/sql/MSQTaskQueryMaker.java | 33 ++++++--- .../druid/msq/sql/MSQTaskSqlEngine.java | 73 ++++++++++--------- .../sql/resources/SqlStatementResource.java | 42 +++-------- .../msq/util/MultiStageQueryContext.java | 6 +- .../test/CalciteSelectJoinQueryMSQTest.java | 2 - .../org/apache/druid/error/Forbidden.java | 68 +++++++++++++++++ .../org/apache/druid/error/InvalidInput.java | 2 +- .../druid/error/QueryExceptionCompat.java | 3 +- .../external/ExternalOperatorConversion.java | 62 +++++++++++++--- 9 files changed, 200 insertions(+), 91 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/error/Forbidden.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index c0de08a809f8..8f5be5507435 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -26,8 +26,8 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Pair; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -136,15 +136,24 @@ public QueryResponse runQuery(final DruidQuery druidQuery) .orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY)); } catch (JsonProcessingException e) { - throw new IAE("Unable to deserialize the insert granularity. Please retry the query with a valid " - + "segment graularity"); + // This isn't populated by the user in the query context. If there was an issue with the query granularity + // entered by the user, it should have been flagged earlier + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build( + e, + "Unable to deserialize the insert granularity. Please retry the query with a " + + "valid segment graularity" + ); } final int maxNumTasks = MultiStageQueryContext.getMaxNumTasks(sqlQueryContext); if (maxNumTasks < 2) { - throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS - + " cannot be less than 2 since at least 1 controller and 1 worker is necessary."); + throw InvalidInput.exception( + "%s cannot be less than 2 since at least 1 controller and 1 worker is necessary.", + MultiStageQueryContext.CTX_MAX_NUM_TASKS + ); } // This parameter is used internally for the number of worker tasks only, so we subtract 1 @@ -201,7 +210,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) if (targetDataSource != null) { if (ctxDestination != null && !DESTINATION_DATASOURCE.equals(ctxDestination)) { - throw new IAE("Cannot INSERT with destination [%s]", ctxDestination); + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build("Cannot INSERT with destination [%s]", ctxDestination); } Granularity segmentGranularityObject; @@ -209,7 +220,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) segmentGranularityObject = jsonMapper.readValue((String) segmentGranularity, Granularity.class); } catch (Exception e) { - throw new ISE("Unable to convert %s to a segment granularity", segmentGranularity); + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build("Unable to convert %s to a segment granularity", segmentGranularity); } final List segmentSortOrder = MultiStageQueryContext.getSortOrder(sqlQueryContext); @@ -227,7 +240,9 @@ public QueryResponse runQuery(final DruidQuery druidQuery) ); } else { if (ctxDestination != null && !DESTINATION_REPORT.equals(ctxDestination)) { - throw new IAE("Cannot SELECT with destination [%s]", ctxDestination); + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build("Cannot SELECT with destination [%s]", ctxDestination); } destination = TaskReportMSQDestination.instance(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index c5fe182ea96f..cd9ca2642349 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -30,11 +30,10 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.ValidationException; import org.apache.calcite.util.Pair; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.error.InvalidSqlInput; -import org.apache.druid.java.util.common.IAE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.msq.querykit.QueryKitUtils; @@ -62,6 +61,7 @@ public class MSQTaskSqlEngine implements SqlEngine .addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS) .add(MultiStageQueryContext.CTX_DESTINATION) .add(QueryKitUtils.CTX_TIME_COLUMN_NAME) + .add(MSQTaskQueryMaker.USER_KEY) .build(); public static final List TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK"); @@ -125,7 +125,10 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case SCAN_NEEDS_SIGNATURE: return true; default: - throw new IAE("Unrecognized feature: %s", feature); + throw DruidException + .forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build("Unrecognized feature: %s", feature); } } @@ -133,9 +136,9 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon public QueryMaker buildQueryMakerForSelect( final RelRoot relRoot, final PlannerContext plannerContext - ) throws ValidationException + ) { - validateSelect(relRoot.fields, plannerContext); + validateSelect(plannerContext); return new MSQTaskQueryMaker( null, @@ -156,7 +159,7 @@ public QueryMaker buildQueryMakerForInsert( final String targetDataSource, final RelRoot relRoot, final PlannerContext plannerContext - ) throws ValidationException + ) { validateInsert(relRoot.rel, relRoot.fields, plannerContext); @@ -169,15 +172,18 @@ public QueryMaker buildQueryMakerForInsert( ); } - private static void validateSelect( - final List> fieldMappings, - final PlannerContext plannerContext - ) throws ValidationException + /** + * Checks if the SELECT contains {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a + * defensive cheeck because {@link org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the + * {@link #validateContext} + */ + private static void validateSelect(final PlannerContext plannerContext) { if (plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)) { - throw new ValidationException( - StringUtils.format("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) - ); + throw DruidException + .forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY); } } @@ -185,7 +191,7 @@ private static void validateInsert( final RelNode rootRel, final List> fieldMappings, final PlannerContext plannerContext - ) throws ValidationException + ) { validateNoDuplicateAliases(fieldMappings); @@ -199,12 +205,10 @@ private static void validateInsert( // Validate the __time field has the proper type. final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName(); if (timeType != SqlTypeName.TIMESTAMP) { - throw new ValidationException( - StringUtils.format( - "Field \"%s\" must be of type TIMESTAMP (was %s)", - ColumnHolder.TIME_COLUMN_NAME, - timeType - ) + throw InvalidSqlInput.exception( + "Field \"%s\" must be of type TIMESTAMP (was %s)", + ColumnHolder.TIME_COLUMN_NAME, + timeType ); } } @@ -220,13 +224,18 @@ private static void validateInsert( ); } catch (Exception e) { - throw new ValidationException( - StringUtils.format( - "Invalid segmentGranularity: %s", - plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) - ), - e - ); + // This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is + // populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching + // here + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build( + e, + "Invalid %s: %s", + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, + plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) + ); + } final boolean hasSegmentGranularity = !Granularities.ALL.equals(segmentGranularity); @@ -237,11 +246,9 @@ private static void validateInsert( validateLimitAndOffset(rootRel, !hasSegmentGranularity); if (hasSegmentGranularity && timeFieldIndex < 0) { - throw new ValidationException( - StringUtils.format( - "INSERT queries with segment granularity other than \"all\" must have a \"%s\" field.", - ColumnHolder.TIME_COLUMN_NAME - ) + throw InvalidInput.exception( + "INSERT queries with segment granularity other than \"all\" must have a \"%s\" field.", + ColumnHolder.TIME_COLUMN_NAME ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index ce30284e5936..325069178938 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -31,6 +31,8 @@ import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.error.ErrorResponse; +import org.apache.druid.error.Forbidden; +import org.apache.druid.error.InvalidInput; import org.apache.druid.error.QueryExceptionCompat; import org.apache.druid.guice.annotations.MSQ; import org.apache.druid.indexer.TaskStatusPlus; @@ -150,16 +152,12 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques ); if (ExecutionMode.ASYNC != executionMode) { return buildNonOkResponse( - DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - StringUtils.format( - "The statement sql api only supports sync mode[%s]. Please set context parameter [%s=%s] in the context payload", - ExecutionMode.ASYNC, - QueryContexts.CTX_EXECUTION_MODE, - ExecutionMode.ASYNC - ) - ) + InvalidInput.exception( + "The statement sql api only supports sync mode[%s]. Please set context parameter [%s=%s] in the context payload", + ExecutionMode.ASYNC, + QueryContexts.CTX_EXECUTION_MODE, + ExecutionMode.ASYNC + ) ); } @@ -192,11 +190,7 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques } catch (ForbiddenException e) { log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); - return buildNonOkResponse( - DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.FORBIDDEN) - .build(Access.DEFAULT_ERROR_MESSAGE) - ); + return buildNonOkResponse(Forbidden.exception()); } // Calcite throws java.lang.AssertionError at various points in planning/validation. catch (AssertionError | Exception e) { @@ -251,11 +245,7 @@ public Response doGetStatus( } catch (ForbiddenException e) { log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); - return buildNonOkResponse( - DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.FORBIDDEN) - .build(Access.DEFAULT_ERROR_MESSAGE) - ); + return buildNonOkResponse(Forbidden.exception()); } catch (Exception e) { log.noStackTrace().warn(e, "Failed to handle query: %s", queryId); @@ -381,11 +371,7 @@ public Response doGetResults( } catch (ForbiddenException e) { log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); - return buildNonOkResponse( - DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.FORBIDDEN) - .build(Access.DEFAULT_ERROR_MESSAGE) - ); + return buildNonOkResponse(Forbidden.exception()); } catch (Exception e) { log.noStackTrace().warn(e, "Failed to handle query: %s", queryId); @@ -448,11 +434,7 @@ public Response deleteQuery(@PathParam("id") final String queryId, @Context fina } catch (ForbiddenException e) { log.debug("Got forbidden request for reason [%s]", e.getErrorMessage()); - return buildNonOkResponse( - DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.FORBIDDEN) - .build(Access.DEFAULT_ERROR_MESSAGE) - ); + return buildNonOkResponse(Forbidden.exception()); } catch (Exception e) { log.noStackTrace().warn(e, "Failed to handle query: %s", queryId); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 3c951c7d0cc3..3b5e870cace8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -103,7 +103,6 @@ public class MultiStageQueryContext public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString(); public static final String CTX_DESTINATION = "destination"; - private static final String DEFAULT_DESTINATION = null; public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment"; static final int DEFAULT_ROWS_PER_SEGMENT = 3000000; @@ -197,10 +196,7 @@ public static int getMaxNumTasks(final QueryContext queryContext) public static Object getDestination(final QueryContext queryContext) { - return queryContext.get( - CTX_DESTINATION, - DEFAULT_DESTINATION - ); + return queryContext.get(CTX_DESTINATION); } public static int getRowsPerSegment(final QueryContext queryContext) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java index df40b00a3193..ab7b1ed7d7cf 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java @@ -23,7 +23,6 @@ import com.google.inject.Injector; import com.google.inject.Module; import org.apache.calcite.rel.RelRoot; -import org.apache.calcite.tools.ValidationException; import org.apache.druid.guice.DruidInjectorBuilder; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.sql.MSQTaskSqlEngine; @@ -142,7 +141,6 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon @Override public QueryMaker buildQueryMakerForSelect(RelRoot relRoot, PlannerContext plannerContext) - throws ValidationException { plannerContext.queryContextMap().put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, joinAlgorithm.toString()); return super.buildQueryMakerForSelect(relRoot, plannerContext); diff --git a/processing/src/main/java/org/apache/druid/error/Forbidden.java b/processing/src/main/java/org/apache/druid/error/Forbidden.java new file mode 100644 index 000000000000..99226618b86a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/error/Forbidden.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.error; + +public class Forbidden extends DruidException.Failure +{ + + public static DruidException exception() + { + return exception("Unauthorize"); + } + + public static DruidException exception(String msg, Object... args) + { + return exception(null, msg, args); + } + + public static DruidException exception(Throwable t, String msg, Object... args) + { + return DruidException.fromFailure(new Forbidden(t, msg, args)); + } + + private final Throwable t; + private final String msg; + private final Object[] args; + + private Forbidden( + Throwable t, + String msg, + Object... args + ) + { + super("forbidden"); + this.t = t; + this.msg = msg; + this.args = args; + } + + @Override + public DruidException makeException(DruidException.DruidExceptionBuilder bob) + { + bob = bob.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.FORBIDDEN); + + if (t == null) { + return bob.build(msg, args); + } else { + return bob.build(t, msg, args); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/error/InvalidInput.java b/processing/src/main/java/org/apache/druid/error/InvalidInput.java index ce50d4db3763..3d15618a1b20 100644 --- a/processing/src/main/java/org/apache/druid/error/InvalidInput.java +++ b/processing/src/main/java/org/apache/druid/error/InvalidInput.java @@ -35,7 +35,7 @@ public static DruidException exception(Throwable t, String msg, Object... args) private final String msg; private final Object[] args; - public InvalidInput( + protected InvalidInput( Throwable t, String msg, Object... args diff --git a/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java b/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java index 12e4905efae9..9894208e9f9c 100644 --- a/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java +++ b/processing/src/main/java/org/apache/druid/error/QueryExceptionCompat.java @@ -66,12 +66,11 @@ private DruidException.Category convertFailType(QueryException.FailType failType return DruidException.Category.RUNTIME_FAILURE; case CANCELED: return DruidException.Category.CANCELED; - case UNKNOWN: - return DruidException.Category.UNCATEGORIZED; case UNSUPPORTED: return DruidException.Category.UNSUPPORTED; case TIMEOUT: return DruidException.Category.TIMEOUT; + case UNKNOWN: default: return DruidException.Category.UNCATEGORIZED; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java index 26fdd514e8f6..6fcf5d94721e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java @@ -29,7 +29,7 @@ import org.apache.druid.catalog.model.table.ExternalTableSpec; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; -import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.column.RowSignature; @@ -106,28 +106,72 @@ public ExternalTableSpec apply( } final RowSignature rowSignature; if (columns != null) { - rowSignature = Columns.convertSignature(columns); + try { + rowSignature = Columns.convertSignature(columns); + } + catch (IAE e) { + throw new ArgumentAndException("columns", e); + } } else { - rowSignature = jsonMapper.readValue(sigValue, RowSignature.class); + try { + rowSignature = jsonMapper.readValue(sigValue, RowSignature.class); + } + catch (JsonProcessingException e) { + throw new ArgumentAndException("rowSignature", e); + } } String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM); - InputSource inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class); + InputSource inputSource; + try { + inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class); + } + catch (JsonProcessingException e) { + throw new ArgumentAndException("rowSignature", e); + } + InputFormat inputFormat; + try { + inputFormat = jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class); + } + catch (JsonProcessingException e) { + throw new ArgumentAndException("inputFormat", e); + } return new ExternalTableSpec( inputSource, - jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class), + inputFormat, rowSignature, inputSource::getTypes ); } - catch (JsonProcessingException e) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build(e, e.getMessage()); + catch (ArgumentAndException e) { // We can triage out the error to one of the argument passed to the EXTERN function + throw InvalidInput.exception( + e, + "Invalid value for the field [%s]. Error message: [%s]", + e.component, + e.exception + ); + } + catch (IAE e) { + throw InvalidInput.exception( + "Invalid parameters supplied to the EXTERN function. Error message: [%s]", + e.getMessage() + ); } } } + private static class ArgumentAndException extends Throwable + { + private final String component; + private final Exception exception; + + public ArgumentAndException(String component, Exception exception) + { + this.component = component; + this.exception = exception; + } + } + @Inject public ExternalOperatorConversion(@Json final ObjectMapper jsonMapper) { From bdb3911455aa482f504db8df2966d61ab4a00989 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 10 Jul 2023 16:12:10 +0530 Subject: [PATCH 02/13] review --- .../druid/msq/sql/MSQTaskQueryMaker.java | 19 ++- .../druid/msq/sql/MSQTaskSqlEngine.java | 19 +-- .../org/apache/druid/error/InvalidInput.java | 2 +- .../external/ExternalOperatorConversion.java | 118 ++++++++---------- .../sql/calcite/run/NativeSqlEngine.java | 3 +- .../druid/sql/calcite/run/SqlEngines.java | 21 ++++ .../druid/sql/calcite/view/ViewSqlEngine.java | 4 +- .../sql/calcite/IngestionTestSqlEngine.java | 4 +- 8 files changed, 103 insertions(+), 87 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 8f5be5507435..456842533ed3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -136,14 +136,15 @@ public QueryResponse runQuery(final DruidQuery druidQuery) .orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY)); } catch (JsonProcessingException e) { - // This isn't populated by the user in the query context. If there was an issue with the query granularity - // entered by the user, it should have been flagged earlier + // This would only be thrown if we are unable to serialize the DEFAULT_SEGMENT_GRANULARITY, which we don't expect + // to happen throw DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.DEFENSIVE) .build( e, - "Unable to deserialize the insert granularity. Please retry the query with a " - + "valid segment graularity" + "Unable to deserialize the DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. " + + "This shouldn't have happened since the DEFAULT_SEGMENT_GRANULARITY object is guaranteed to be " + + "serializable. Please raise an issue in case you are seeing this message while executing a query." ); } @@ -151,7 +152,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) if (maxNumTasks < 2) { throw InvalidInput.exception( - "%s cannot be less than 2 since at least 1 controller and 1 worker is necessary.", + "MSQ context maxNumTasks [%,d] cannot be less than 2, since at least 1 controller and 1 worker is necessary", MultiStageQueryContext.CTX_MAX_NUM_TASKS ); } @@ -222,7 +223,13 @@ public QueryResponse runQuery(final DruidQuery druidQuery) catch (Exception e) { throw DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.DEFENSIVE) - .build("Unable to convert %s to a segment granularity", segmentGranularity); + .build( + e, + "Unable to deserialize the provided segmentGranularity [%s]. " + + "This is populated internally by Druid and therefore should not occur. " + + "Please contact the developers if you are seeing this error message.", + segmentGranularity + ); } final List segmentSortOrder = MultiStageQueryContext.getSortOrder(sqlQueryContext); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index cd9ca2642349..51282348ae63 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -125,10 +125,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case SCAN_NEEDS_SIGNATURE: return true; default: - throw DruidException - .forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.DEFENSIVE) - .build("Unrecognized feature: %s", feature); + throw SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(), feature); } } @@ -183,7 +180,12 @@ private static void validateSelect(final PlannerContext plannerContext) throw DruidException .forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.DEFENSIVE) - .build("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY); + .build( + "The SELECT query's context contains invalid parameter [%s] which is supposed to be populated " + + "by Druid for INSERT queries. If the user is seeing this exception, that means there's a bug in Druid " + + "that is populating the query context with the segment's granularity.", + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY + ); } } @@ -206,7 +208,7 @@ private static void validateInsert( final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName(); if (timeType != SqlTypeName.TIMESTAMP) { throw InvalidSqlInput.exception( - "Field \"%s\" must be of type TIMESTAMP (was %s)", + "Field [%s] was the wrong type [%s], expected TIMESTAMP", ColumnHolder.TIME_COLUMN_NAME, timeType ); @@ -247,8 +249,9 @@ private static void validateInsert( if (hasSegmentGranularity && timeFieldIndex < 0) { throw InvalidInput.exception( - "INSERT queries with segment granularity other than \"all\" must have a \"%s\" field.", - ColumnHolder.TIME_COLUMN_NAME + "The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL." + + "Therefore, the query must specify a time column (named __time).", + segmentGranularity ); } } diff --git a/processing/src/main/java/org/apache/druid/error/InvalidInput.java b/processing/src/main/java/org/apache/druid/error/InvalidInput.java index 3d15618a1b20..ce50d4db3763 100644 --- a/processing/src/main/java/org/apache/druid/error/InvalidInput.java +++ b/processing/src/main/java/org/apache/druid/error/InvalidInput.java @@ -35,7 +35,7 @@ public static DruidException exception(Throwable t, String msg, Object... args) private final String msg; private final Object[] args; - protected InvalidInput( + public InvalidInput( Throwable t, String msg, Object... args diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java index 6fcf5d94721e..96832d2ed4b1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java @@ -29,6 +29,7 @@ import org.apache.druid.catalog.model.table.ExternalTableSpec; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; +import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.IAE; @@ -90,86 +91,71 @@ public ExternalTableSpec apply( final ObjectMapper jsonMapper ) { - try { - final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM); - if (sigValue == null && columns == null) { - throw new IAE( - "EXTERN requires either a %s value or an EXTEND clause", - SIGNATURE_PARAM - ); - } - if (sigValue != null && columns != null) { - throw new IAE( - "EXTERN requires either a %s value or an EXTEND clause, but not both", - SIGNATURE_PARAM - ); - } - final RowSignature rowSignature; - if (columns != null) { - try { - rowSignature = Columns.convertSignature(columns); - } - catch (IAE e) { - throw new ArgumentAndException("columns", e); - } - } else { - try { - rowSignature = jsonMapper.readValue(sigValue, RowSignature.class); - } - catch (JsonProcessingException e) { - throw new ArgumentAndException("rowSignature", e); - } - } - - String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM); - InputSource inputSource; + final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM); + if (sigValue == null && columns == null) { + throw InvalidInput.exception( + "EXTERN requires either a %s value or an EXTEND clause", + SIGNATURE_PARAM + ); + } + if (sigValue != null && columns != null) { + throw InvalidInput.exception( + "EXTERN requires either a %s value or an EXTEND clause, but not both", + SIGNATURE_PARAM + ); + } + final RowSignature rowSignature; + if (columns != null) { try { - inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class); + rowSignature = Columns.convertSignature(columns); } - catch (JsonProcessingException e) { - throw new ArgumentAndException("rowSignature", e); + catch (IAE e) { + throw badArgumentException(e, "columns"); } - InputFormat inputFormat; + } else { try { - inputFormat = jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class); + rowSignature = jsonMapper.readValue(sigValue, RowSignature.class); } catch (JsonProcessingException e) { - throw new ArgumentAndException("inputFormat", e); + throw badArgumentException(e, "rowSignature"); } - return new ExternalTableSpec( - inputSource, - inputFormat, - rowSignature, - inputSource::getTypes - ); } - catch (ArgumentAndException e) { // We can triage out the error to one of the argument passed to the EXTERN function - throw InvalidInput.exception( - e, - "Invalid value for the field [%s]. Error message: [%s]", - e.component, - e.exception - ); + + String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM); + InputSource inputSource; + try { + inputSource = jsonMapper.readValue(inputSrcStr, InputSource.class); } - catch (IAE e) { - throw InvalidInput.exception( - "Invalid parameters supplied to the EXTERN function. Error message: [%s]", - e.getMessage() - ); + catch (JsonProcessingException e) { + throw badArgumentException(e, "inputSource"); } + InputFormat inputFormat; + try { + inputFormat = jsonMapper.readValue(CatalogUtils.getString(args, INPUT_FORMAT_PARAM), InputFormat.class); + } + catch (JsonProcessingException e) { + throw badArgumentException(e, "inputFormat"); + } + return new ExternalTableSpec( + inputSource, + inputFormat, + rowSignature, + inputSource::getTypes + ); } } - private static class ArgumentAndException extends Throwable + private static DruidException badArgumentException( + Throwable cause, + String fieldName + ) { - private final String component; - private final Exception exception; - - public ArgumentAndException(String component, Exception exception) - { - this.component = component; - this.exception = exception; - } + return InvalidInput.exception( + cause, + "Invalid value for the field [%s]. Reason: [%s]", + fieldName, + cause.getMessage() + ); } @Inject diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index f5d9056246b3..d7fc7d043b6f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.server.QueryLifecycleFactory; @@ -116,7 +115,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon case SCAN_NEEDS_SIGNATURE: return false; default: - throw new IAE("Unrecognized feature: %s", feature); + throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java index cc7bef80f712..e8375feb3a49 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngines.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.run; import org.apache.calcite.tools.ValidationException; +import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import java.util.Map; @@ -46,4 +47,24 @@ public static void validateNoSpecialContextKeys( } } } + + /** + * This is a helper function that provides a developer-friendly exception when an engine cannot recognize the feature. + */ + public static DruidException generateUnrecognizedFeatureException( + final String engineName, + final EngineFeature unrecognizedFeature + ) + { + return DruidException + .forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.DEFENSIVE) + .build( + "Engine [%s] is unable to recognize the feature [%s] for availability. This might happen when a " + + "newer feature is added without updating all the implementations of SqlEngine(s) to either allow or disallow " + + "its availability. Please raise an issue if you encounter this exception while using Druid.", + engineName, + unrecognizedFeature + ); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index 740cac15ee46..47dae5c4d97f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -22,11 +22,11 @@ import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.run.QueryMaker; import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.run.SqlEngines; import java.util.Map; @@ -78,7 +78,7 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon return false; default: - throw new IAE("Unrecognized feature: %s", feature); + throw SqlEngines.generateUnrecognizedFeatureException(ViewSqlEngine.class.getSimpleName(), feature); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index c1773ef62a10..272fddbd8a42 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -24,12 +24,12 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.run.QueryMaker; import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.run.SqlEngines; import org.apache.druid.sql.calcite.table.RowSignatures; import java.util.Map; @@ -91,7 +91,7 @@ public boolean featureAvailable(final EngineFeature feature, final PlannerContex case ALLOW_BROADCAST_RIGHTY_JOIN: return true; default: - throw new IAE("Unrecognized feature: %s", feature); + throw SqlEngines.generateUnrecognizedFeatureException(IngestionTestSqlEngine.class.getSimpleName(), feature); } } From e8abc83c5d9fcd600c7a6022402b6b227a2f682d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 10 Jul 2023 21:10:44 +0530 Subject: [PATCH 03/13] changes --- .../java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 822f8b4d7539..dce75a7546f8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -140,8 +140,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) catch (JsonProcessingException e) { // This would only be thrown if we are unable to serialize the DEFAULT_SEGMENT_GRANULARITY, which we don't expect // to happen - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.DEFENSIVE) + throw DruidException.defensive() .build( e, "Unable to deserialize the DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. " @@ -223,8 +222,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) segmentGranularityObject = jsonMapper.readValue((String) segmentGranularity, Granularity.class); } catch (Exception e) { - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.DEFENSIVE) + throw DruidException.defensive() .build( e, "Unable to deserialize the provided segmentGranularity [%s]. " From b6b9130d44b3901c6de0d2dac1d3fb6cbabbbda7 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 11 Jul 2023 10:05:10 +0530 Subject: [PATCH 04/13] cleanup --- .../druid/msq/sql/MSQTaskQueryMaker.java | 25 ++++------------- .../druid/msq/sql/MSQTaskSqlEngine.java | 2 -- .../sql/resources/SqlStatementResource.java | 27 +++++-------------- .../msq/util/MultiStageQueryContext.java | 7 ----- .../msq/util/MultiStageQueryContextTest.java | 14 ---------- 5 files changed, 11 insertions(+), 64 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index dce75a7546f8..cf6fc43e0ca8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -48,7 +48,6 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.rpc.indexing.OverlordClient; -import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.server.QueryResponse; @@ -76,10 +75,6 @@ public class MSQTaskQueryMaker implements QueryMaker { - - private static final String DESTINATION_DATASOURCE = "dataSource"; - private static final String DESTINATION_REPORT = "taskReport"; - public static final String USER_KEY = "__user"; private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL; @@ -128,9 +123,6 @@ public QueryResponse runQuery(final DruidQuery druidQuery) MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext); } - final String ctxDestination = - DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext)); - Object segmentGranularity; try { segmentGranularity = Optional.ofNullable(plannerContext.queryContext() @@ -211,12 +203,6 @@ public QueryResponse runQuery(final DruidQuery druidQuery) final MSQDestination destination; if (targetDataSource != null) { - if (ctxDestination != null && !DESTINATION_DATASOURCE.equals(ctxDestination)) { - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.DEFENSIVE) - .build("Cannot INSERT with destination [%s]", ctxDestination); - } - Granularity segmentGranularityObject; try { segmentGranularityObject = jsonMapper.readValue((String) segmentGranularity, Granularity.class); @@ -246,18 +232,17 @@ public QueryResponse runQuery(final DruidQuery druidQuery) replaceTimeChunks ); } else { - if (ctxDestination != null && !DESTINATION_REPORT.equals(ctxDestination)) { - throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.DEFENSIVE) - .build("Cannot SELECT with destination [%s]", ctxDestination); - } final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext); if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) { destination = TaskReportMSQDestination.instance(); } else if (msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) { destination = DurableStorageMSQDestination.instance(); } else { - throw new IAE("Cannot SELECT with destination [%s]", msqSelectDestination.name()); + throw InvalidInput.exception( + "Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to " + + "\"taskReport\" and \"durableStorage\"", + msqSelectDestination.name() + ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 51282348ae63..72d1bbbaaec4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.msq.querykit.QueryKitUtils; -import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; @@ -59,7 +58,6 @@ public class MSQTaskSqlEngine implements SqlEngine public static final Set SYSTEM_CONTEXT_PARAMETERS = ImmutableSet.builder() .addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS) - .add(MultiStageQueryContext.CTX_DESTINATION) .add(QueryKitUtils.CTX_TIME_COLUMN_NAME) .add(MSQTaskQueryMaker.USER_KEY) .build(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 4c9e776c0b2a..31a192e6967f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -665,9 +665,9 @@ private Optional> getResultYielder( if (msqControllerTask.getQuerySpec().getDestination() instanceof TaskReportMSQDestination) { // Results from task report are only present as one page. if (page != null && page > 0) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build("Page number is out of range of the results."); + throw InvalidInput.exception( + "Page number [%d] is out of the range of results", page + ); } MSQTaskReportPayload msqTaskReportPayload = jsonMapper.convertValue(SqlStatementResourceHelper.getPayload( @@ -755,9 +755,7 @@ private PageInformation getPageInformationForPageId(List pages, return pageInfo; } } - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build("Invalid page id [%d] passed.", pageId); + throw InvalidInput.exception("Invalid page id [%d] passed.", pageId); } private void resultPusher( @@ -828,22 +826,9 @@ private static void throwIfQueryIsNotSuccessful(String queryId, TaskStatusPlus s private void contextChecks(QueryContext queryContext) { - ExecutionMode executionMode = queryContext.getEnum( - QueryContexts.CTX_EXECUTION_MODE, - ExecutionMode.class, - null - ); + ExecutionMode executionMode = queryContext.getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class, null); if (ExecutionMode.ASYNC != executionMode) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - StringUtils.format( - "The statement sql api only supports sync mode[%s]. Please set context parameter [%s=%s] in the context payload", - ExecutionMode.ASYNC, - QueryContexts.CTX_EXECUTION_MODE, - ExecutionMode.ASYNC - ) - ); + throw DruidException.defensive("[%s] is not supported. It should not be set", QueryContexts.CTX_EXECUTION_MODE); } MSQSelectDestination selectDestination = MultiStageQueryContext.getSelectDestination(queryContext); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 06f86cd2221f..29189bca4459 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -103,8 +103,6 @@ public class MultiStageQueryContext public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode"; public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString(); - public static final String CTX_DESTINATION = "destination"; - public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment"; static final int DEFAULT_ROWS_PER_SEGMENT = 3000000; @@ -195,11 +193,6 @@ public static int getMaxNumTasks(final QueryContext queryContext) ); } - public static Object getDestination(final QueryContext queryContext) - { - return queryContext.get(CTX_DESTINATION); - } - public static int getRowsPerSegment(final QueryContext queryContext) { return queryContext.getInt( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java index 16aad3d7b93a..6d9389267419 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; -import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DESTINATION; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS; @@ -142,19 +141,6 @@ public void getMaxNumTasks_legacyParameterSetReturnsCorrectValue() Assert.assertEquals(101, MultiStageQueryContext.getMaxNumTasks(QueryContext.of(propertyMap))); } - @Test - public void getDestination_noParameterSetReturnsDefaultValue() - { - Assert.assertNull(MultiStageQueryContext.getDestination(QueryContext.empty())); - } - - @Test - public void getDestination_parameterSetReturnsCorrectValue() - { - Map propertyMap = ImmutableMap.of(CTX_DESTINATION, "dataSource"); - Assert.assertEquals("dataSource", MultiStageQueryContext.getDestination(QueryContext.of(propertyMap))); - } - @Test public void getRowsPerSegment_noParameterSetReturnsDefaultValue() { From c8cb821a8bf5f60f4d3425bd19269e0be2b74683 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 11 Jul 2023 12:16:45 +0530 Subject: [PATCH 05/13] fix test --- .../main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 2 +- .../java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 72d1bbbaaec4..404d12873b7e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -247,7 +247,7 @@ private static void validateInsert( if (hasSegmentGranularity && timeFieldIndex < 0) { throw InvalidInput.exception( - "The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL." + "The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL. " + "Therefore, the query must specify a time column (named __time).", segmentGranularity ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index 1a19f9ddf70e..e7aa75c008cc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1635,7 +1635,7 @@ public void testErrorWithUnableToConstructColumnSignatureWithExtern() new DruidExceptionMatcher( DruidException.Persona.USER, DruidException.Category.INVALID_INPUT, - "general" + "invalidInput" ) .expectMessageContains( "Cannot construct instance of `org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be provided and non-empty" From ae71725fee5d2deb7fab2be25ca71fe2acfa2423 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 11 Jul 2023 16:22:56 +0530 Subject: [PATCH 06/13] throw better exception when issue in the msq context --- .../msq/util/MultiStageQueryContext.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 29189bca4459..b3f012d2c667 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -39,7 +39,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -157,13 +156,10 @@ public static long getMaxInputBytesPerWorker(final QueryContext queryContext) public static ClusterStatisticsMergeMode getClusterStatisticsMergeMode(QueryContext queryContext) { - return ClusterStatisticsMergeMode.valueOf( - String.valueOf( - queryContext.getString( - CTX_CLUSTER_STATISTICS_MERGE_MODE, - DEFAULT_CLUSTER_STATISTICS_MERGE_MODE - ) - ) + return QueryContexts.getAsEnum( + CTX_CLUSTER_STATISTICS_MERGE_MODE, + queryContext.getString(CTX_CLUSTER_STATISTICS_MERGE_MODE, DEFAULT_CLUSTER_STATISTICS_MERGE_MODE), + ClusterStatisticsMergeMode.class ); } @@ -177,12 +173,11 @@ public static boolean isFinalizeAggregations(final QueryContext queryContext) public static WorkerAssignmentStrategy getAssignmentStrategy(final QueryContext queryContext) { - String assignmentStrategyString = queryContext.getString( + return QueryContexts.getAsEnum( CTX_TASK_ASSIGNMENT_STRATEGY, - DEFAULT_TASK_ASSIGNMENT_STRATEGY + queryContext.getString(CTX_TASK_ASSIGNMENT_STRATEGY, DEFAULT_TASK_ASSIGNMENT_STRATEGY), + WorkerAssignmentStrategy.class ); - - return WorkerAssignmentStrategy.fromString(assignmentStrategyString); } public static int getMaxNumTasks(final QueryContext queryContext) @@ -203,22 +198,21 @@ public static int getRowsPerSegment(final QueryContext queryContext) public static MSQSelectDestination getSelectDestination(final QueryContext queryContext) { - return MSQSelectDestination.valueOf( - queryContext.getString( - CTX_SELECT_DESTINATION, - DEFAULT_SELECT_DESTINATION - ).toUpperCase(Locale.ENGLISH) + return QueryContexts.getAsEnum( + CTX_SELECT_DESTINATION, + queryContext.getString(CTX_SELECT_DESTINATION, DEFAULT_SELECT_DESTINATION), + MSQSelectDestination.class ); } @Nullable public static MSQSelectDestination getSelectDestinationOrNull(final QueryContext queryContext) { - String selectDestination = queryContext.getString(CTX_SELECT_DESTINATION); - if (selectDestination == null) { - return null; - } - return MSQSelectDestination.valueOf(selectDestination.toUpperCase(Locale.ENGLISH)); + return QueryContexts.getAsEnum( + CTX_SELECT_DESTINATION, + queryContext.getString(CTX_SELECT_DESTINATION), + MSQSelectDestination.class + ); } public static int getRowsInMemory(final QueryContext queryContext) From 7ec32cfb9c4e134c27c3d733818af88cf1f1c332 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jul 2023 16:20:40 +0530 Subject: [PATCH 07/13] tests, changes --- .../druid/msq/sql/MSQTaskQueryMaker.java | 6 +- .../sql/resources/SqlStatementResource.java | 8 ++- .../org/apache/druid/error/Forbidden.java | 2 +- .../org/apache/druid/error/ForbiddenTest.java | 59 +++++++++++++++++++ .../sql/calcite/CalciteInsertDmlTest.java | 5 +- 5 files changed, 73 insertions(+), 7 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/error/ForbiddenTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index cf6fc43e0ca8..dcb0de2bcc8d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -240,8 +240,10 @@ public QueryResponse runQuery(final DruidQuery druidQuery) } else { throw InvalidInput.exception( "Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to " - + "\"taskReport\" and \"durableStorage\"", - msqSelectDestination.name() + + "\"%s\" and \"%s\"", + msqSelectDestination.name(), + MSQSelectDestination.TASK_REPORT.toString(), + MSQSelectDestination.DURABLE_STORAGE.toString() ); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 31a192e6967f..4d3224121d7a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -828,7 +828,13 @@ private void contextChecks(QueryContext queryContext) { ExecutionMode executionMode = queryContext.getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class, null); if (ExecutionMode.ASYNC != executionMode) { - throw DruidException.defensive("[%s] is not supported. It should not be set", QueryContexts.CTX_EXECUTION_MODE); + throw InvalidInput.exception( + "The SQL statement API does not support the provided execution mode [%s]. Please set the context " + + "parameter [%s] to [%s] in the query context", + executionMode, + QueryContexts.CTX_EXECUTION_MODE, + ExecutionMode.ASYNC + ); } MSQSelectDestination selectDestination = MultiStageQueryContext.getSelectDestination(queryContext); diff --git a/processing/src/main/java/org/apache/druid/error/Forbidden.java b/processing/src/main/java/org/apache/druid/error/Forbidden.java index 99226618b86a..13470d241c4d 100644 --- a/processing/src/main/java/org/apache/druid/error/Forbidden.java +++ b/processing/src/main/java/org/apache/druid/error/Forbidden.java @@ -24,7 +24,7 @@ public class Forbidden extends DruidException.Failure public static DruidException exception() { - return exception("Unauthorize"); + return exception("Unauthorized"); } public static DruidException exception(String msg, Object... args) diff --git a/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java b/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java new file mode 100644 index 000000000000..90faeabe4285 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/error/ForbiddenTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.error; + +import org.apache.druid.matchers.DruidMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Test; + +import java.util.Map; + +public class ForbiddenTest +{ + + @Test + public void testAsErrorResponse() + { + ErrorResponse errorResponse = new ErrorResponse(Forbidden.exception()); + final Map asMap = errorResponse.getAsMap(); + + MatcherAssert.assertThat( + asMap, + DruidMatchers.mapMatcher( + "error", "druidException", + "errorCode", "forbidden", + "persona", "USER", + "category", "FORBIDDEN", + "errorMessage", "Unauthorized" + ) + ); + + ErrorResponse recomposed = ErrorResponse.fromMap(asMap); + + MatcherAssert.assertThat( + recomposed.getUnderlyingException(), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.FORBIDDEN, + "forbidden" + ).expectMessageContains("Unauthorized") + ); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index e7aa75c008cc..cce88da53da2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1636,10 +1636,9 @@ public void testErrorWithUnableToConstructColumnSignatureWithExtern() DruidException.Persona.USER, DruidException.Category.INVALID_INPUT, "invalidInput" + ).expectMessageContains( + "Cannot construct instance of `org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be provided and non-empty" ) - .expectMessageContains( - "Cannot construct instance of `org.apache.druid.segment.column.ColumnSignature`, problem: Column name must be provided and non-empty" - ) ) .verify(); } From 6a1e653c06d3de53deaeb8f62c1ec875bfe41e04 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jul 2023 18:01:50 +0530 Subject: [PATCH 08/13] static check, test fix --- .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 1 - .../main/java/org/apache/druid/query/QueryContext.java | 10 ---------- 2 files changed, 11 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 404d12873b7e..0d90c523de5c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -59,7 +59,6 @@ public class MSQTaskSqlEngine implements SqlEngine ImmutableSet.builder() .addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS) .add(QueryKitUtils.CTX_TIME_COLUMN_NAME) - .add(MSQTaskQueryMaker.USER_KEY) .build(); public static final List TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK"); diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 147ebbec18eb..403cef1fa4ff 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -106,16 +106,6 @@ public Object get(String key) return context.get(key); } - /** - * Return a value as a generic {@code Object}, returning the default value if the - * context value is not set. - */ - public Object get(String key, Object defaultValue) - { - final Object val = get(key); - return val == null ? defaultValue : val; - } - /** * Return a value as an {@code String}, returning {@link null} if the * context value is not set. From af62a3644cf5e869c77755bed1d3cac28b44b253 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 12 Jul 2023 23:29:57 +0530 Subject: [PATCH 09/13] tests fix, build green now --- .../druid/msq/sql/MSQTaskQueryMaker.java | 2 +- .../sql/resources/SqlStatementResource.java | 12 ++++- .../apache/druid/msq/exec/MSQInsertTest.java | 25 ++++++---- .../sql/SqlMSQStatementResourcePostTest.java | 48 ++++++++++++------- 4 files changed, 56 insertions(+), 31 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index dcb0de2bcc8d..7ef74709a319 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -146,7 +146,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) if (maxNumTasks < 2) { throw InvalidInput.exception( "MSQ context maxNumTasks [%,d] cannot be less than 2, since at least 1 controller and 1 worker is necessary", - MultiStageQueryContext.CTX_MAX_NUM_TASKS + maxNumTasks ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 4d3224121d7a..773de9bb803f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -827,10 +827,18 @@ private static void throwIfQueryIsNotSuccessful(String queryId, TaskStatusPlus s private void contextChecks(QueryContext queryContext) { ExecutionMode executionMode = queryContext.getEnum(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.class, null); + + if (executionMode == null) { + throw InvalidInput.exception( + "Execution mode is not provided to the SQL statement API. Please set \"%s\" in the query context", + QueryContexts.CTX_EXECUTION_MODE + ); + } + if (ExecutionMode.ASYNC != executionMode) { throw InvalidInput.exception( - "The SQL statement API does not support the provided execution mode [%s]. Please set the context " - + "parameter [%s] to [%s] in the query context", + "The SQL statement API currently does not support the provided execution mode [%s]. " + + "Please set \"%s\" to [%s] in the query context", executionMode, QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.ASYNC diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index 63170e91d4f9..dd0d2ab611f3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -26,6 +26,7 @@ import com.google.common.hash.Hashing; import org.apache.druid.common.config.NullHandling; import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -1024,11 +1025,13 @@ public void testInsertWrongTypeTimestamp() .setExpectedDataSource("foo1") .setExpectedRowSignature(rowSignature) .setQueryContext(context) - .setExpectedValidationErrorMatcher(CoreMatchers.allOf( - CoreMatchers.instanceOf(DruidException.class), - ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( - "Field \"__time\" must be of type TIMESTAMP")) - )) + .setExpectedValidationErrorMatcher( + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs("Field [__time] was the wrong type [VARCHAR], expected TIMESTAMP") + ) .verifyPlanningErrors(); } @@ -1106,11 +1109,13 @@ public void testInsertQueryWithInvalidSubtaskCount() "insert into foo1 select __time, dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1") .setQueryContext(localContext) .setExpectedExecutionErrorMatcher( - ThrowableMessageMatcher.hasMessage( - CoreMatchers.startsWith( - MultiStageQueryContext.CTX_MAX_NUM_TASKS - + " cannot be less than 2 since at least 1 controller and 1 worker is necessary." - ) + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageIs( + "MSQ context maxNumTasks [1] cannot be less than 2, since at least 1 controller " + + "and 1 worker is necessary" ) ) .verifyExecutionError(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java index e5133ee3d21b..2f38d74e24ca 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java @@ -138,24 +138,36 @@ public void testMSQSelectQueryTest() throws IOException @Test public void nonSupportedModes() { - for (ImmutableMap context : ImmutableList.of(ImmutableMap.of( - QueryContexts.CTX_EXECUTION_MODE, - ExecutionMode.SYNC.name() - ), ImmutableMap.of())) { - SqlStatementResourceTest.assertExceptionMessage( - resource.doPost(new SqlQuery( - "select * from foo", - null, - false, - false, - false, - (Map) context, - null - ), SqlStatementResourceTest.makeOkRequest()), - "The statement sql api only supports sync mode[ASYNC]. Please set context parameter [executionMode=ASYNC] in the context payload", - Response.Status.BAD_REQUEST - ); - } + + SqlStatementResourceTest.assertExceptionMessage( + resource.doPost(new SqlQuery( + "select * from foo", + null, + false, + false, + false, + ImmutableMap.of(), + null + ), SqlStatementResourceTest.makeOkRequest()), + "Execution mode is not provided to the SQL statement API. " + + "Please set \"executionMode\" in the query context", + Response.Status.BAD_REQUEST + ); + + SqlStatementResourceTest.assertExceptionMessage( + resource.doPost(new SqlQuery( + "select * from foo", + null, + false, + false, + false, + ImmutableMap.of(QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.SYNC.name()), + null + ), SqlStatementResourceTest.makeOkRequest()), + "The SQL statement API currently does not support the provided execution mode [SYNC]. " + + "Please set \"executionMode\" to [ASYNC] in the query context", + Response.Status.BAD_REQUEST + ); } From f86c1e1324107b94277cee9915933c65e55974e5 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 13 Jul 2023 13:08:14 +0530 Subject: [PATCH 10/13] review --- .../druid/msq/sql/MSQTaskQueryMaker.java | 2 +- .../druid/msq/sql/MSQTaskSqlEngine.java | 6 +-- .../sql/resources/SqlStatementResource.java | 37 ++++++++++--------- .../sql/SqlMSQStatementResourcePostTest.java | 15 +++----- .../external/ExternalOperatorConversion.java | 4 +- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 7ef74709a319..772af8524e24 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -240,7 +240,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) } else { throw InvalidInput.exception( "Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to " - + "\"%s\" and \"%s\"", + + "[%s] and [%s]", msqSelectDestination.name(), MSQSelectDestination.TASK_REPORT.toString(), MSQSelectDestination.DURABLE_STORAGE.toString() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 0d90c523de5c..37b8692cb4df 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -230,9 +230,9 @@ private static void validateInsert( .ofCategory(DruidException.Category.DEFENSIVE) .build( e, - "Invalid %s: %s", - DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY, - plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) + "[%s] is not a valid value for [%s]", + plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY), + DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index 773de9bb803f..7b92872ee6f9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -206,9 +206,9 @@ public Response doPost(final SqlQuery sqlQuery, @Context final HttpServletReques catch (AssertionError | Exception e) { stmt.reporter().failed(e); if (isDebug) { - log.warn(e, "Failed to handle query: %s", sqlQueryId); + log.warn(e, "Failed to handle query [%s]", sqlQueryId); } else { - log.noStackTrace().warn(e, "Failed to handle query: %s", sqlQueryId); + log.noStackTrace().warn(e, "Failed to handle query [%s]", sqlQueryId); } return buildNonOkResponse( DruidException.forPersona(DruidException.Persona.DEVELOPER) @@ -261,10 +261,10 @@ public Response doGetStatus( return buildNonOkResponse(Forbidden.exception()); } catch (Exception e) { - log.warn(e, "Failed to handle query: %s", queryId); + log.warn(e, "Failed to handle query [%s]", queryId); return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.UNCATEGORIZED) - .build(e, "Failed to handle query: [%s]", queryId)); + .build(e, "Failed to handle query [%s]", queryId)); } } @@ -342,10 +342,10 @@ public Response doGetResults( return buildNonOkResponse(Forbidden.exception()); } catch (Exception e) { - log.warn(e, "Failed to handle query: %s", queryId); + log.warn(e, "Failed to handle query [%s]", queryId); return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.UNCATEGORIZED) - .build(e, "Failed to handle query: [%s]", queryId)); + .build(e, "Failed to handle query [%s]", queryId)); } } @@ -405,10 +405,10 @@ public Response deleteQuery(@PathParam("id") final String queryId, @Context fina return buildNonOkResponse(Forbidden.exception()); } catch (Exception e) { - log.warn(e, "Failed to handle query: %s", queryId); + log.warn(e, "Failed to handle query [%s]", queryId); return buildNonOkResponse(DruidException.forPersona(DruidException.Persona.DEVELOPER) .ofCategory(DruidException.Category.UNCATEGORIZED) - .build(e, "Failed to handle query: [%s]", queryId)); + .build(e, "Failed to handle query [%s]", queryId)); } } @@ -816,7 +816,7 @@ private static void throwIfQueryIsNotSuccessful(String queryId, TaskStatusPlus s throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.INVALID_INPUT) .build( - "Query[%s] failed. Hit status api for more details.", + "Query[%s] failed. Check the status api for more details.", queryId ); } else { @@ -830,15 +830,17 @@ private void contextChecks(QueryContext queryContext) if (executionMode == null) { throw InvalidInput.exception( - "Execution mode is not provided to the SQL statement API. Please set \"%s\" in the query context", - QueryContexts.CTX_EXECUTION_MODE + "Execution mode is not provided to the SQL statement API. " + + "Please set [%s] to [%s] in the query context", + QueryContexts.CTX_EXECUTION_MODE, + ExecutionMode.ASYNC ); } if (ExecutionMode.ASYNC != executionMode) { throw InvalidInput.exception( "The SQL statement API currently does not support the provided execution mode [%s]. " - + "Please set \"%s\" to [%s] in the query context", + + "Please set [%s] to [%s] in the query context", executionMode, QueryContexts.CTX_EXECUTION_MODE, ExecutionMode.ASYNC @@ -858,11 +860,12 @@ private void checkForDurableStorageConnectorImpl() .ofCategory(DruidException.Category.INVALID_INPUT) .build( StringUtils.format( - "The statement sql api cannot read from select destination [%s=%s] since its not configured. " - + "Its recommended to configure durable storage as it allows the user to fetch big results. " - + "Please contact your cluster admin to configure durable storage.", - MultiStageQueryContext.CTX_SELECT_DESTINATION, - MSQSelectDestination.DURABLE_STORAGE.name() + "The SQL Statement API cannot read from the select destination [%s] provided " + + "in the query context [%s] since it is not configured. It is recommended to configure the durable storage " + + "as it allows the user to fetch large result sets. Please contact your cluster admin to " + + "configure durable storage.", + MSQSelectDestination.DURABLE_STORAGE.name(), + MultiStageQueryContext.CTX_SELECT_DESTINATION ) ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java index 2f38d74e24ca..70ac5386ba6c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlMSQStatementResourcePostTest.java @@ -150,7 +150,7 @@ public void nonSupportedModes() null ), SqlStatementResourceTest.makeOkRequest()), "Execution mode is not provided to the SQL statement API. " - + "Please set \"executionMode\" in the query context", + + "Please set [executionMode] to [ASYNC] in the query context", Response.Status.BAD_REQUEST ); @@ -165,7 +165,7 @@ public void nonSupportedModes() null ), SqlStatementResourceTest.makeOkRequest()), "The SQL statement API currently does not support the provided execution mode [SYNC]. " - + "Please set \"executionMode\" to [ASYNC] in the query context", + + "Please set [executionMode] to [ASYNC] in the query context", Response.Status.BAD_REQUEST ); } @@ -272,13 +272,10 @@ public void durableStorageDisabledTest() NilStorageConnector.getInstance() ); - String errorMessage = StringUtils.format( - "The statement sql api cannot read from select destination [%s=%s] since its not configured. " - + "Its recommended to configure durable storage as it allows the user to fetch big results. " - + "Please contact your cluster admin to configure durable storage.", - MultiStageQueryContext.CTX_SELECT_DESTINATION, - MSQSelectDestination.DURABLE_STORAGE.name() - ); + String errorMessage = "The SQL Statement API cannot read from the select destination [DURABLE_STORAGE] provided in " + + "the query context [selectDestination] since it is not configured. It is recommended to " + + "configure the durable storage as it allows the user to fetch large result sets. " + + "Please contact your cluster admin to configure durable storage."; Map context = defaultAsyncContext(); context.put(MultiStageQueryContext.CTX_SELECT_DESTINATION, MSQSelectDestination.DURABLE_STORAGE.name()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java index 96832d2ed4b1..945e452941f4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java @@ -94,13 +94,13 @@ public ExternalTableSpec apply( final String sigValue = CatalogUtils.getString(args, SIGNATURE_PARAM); if (sigValue == null && columns == null) { throw InvalidInput.exception( - "EXTERN requires either a %s value or an EXTEND clause", + "EXTERN requires either a [%s] value or an EXTEND clause", SIGNATURE_PARAM ); } if (sigValue != null && columns != null) { throw InvalidInput.exception( - "EXTERN requires either a %s value or an EXTEND clause, but not both", + "EXTERN requires either a [%s] value or an EXTEND clause, but not both", SIGNATURE_PARAM ); } From b9c1218d01476722af9b6fb13462eda501cd4855 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 13 Jul 2023 13:39:25 +0530 Subject: [PATCH 11/13] Trigger Build From 47d004e557c01fbcd8ec8c0a993c38420bc6b304 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 13 Jul 2023 15:22:46 +0530 Subject: [PATCH 12/13] build fix --- .../java/org/apache/druid/msq/sql/SqlStatementResourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java index 51b64b7eedd0..049b57c340e3 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/SqlStatementResourceTest.java @@ -783,7 +783,7 @@ public void testFailedMSQQuery() assertExceptionMessage( resource.doGetResults(queryID, 0L, makeOkRequest()), StringUtils.format( - "Query[%s] failed. Hit status api for more details.", + "Query[%s] failed. Check the status api for more details.", queryID ), Response.Status.BAD_REQUEST From 842c961fb82f938309dd8e0c85fe38c3bce6e2ac Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Thu, 13 Jul 2023 16:55:44 +0530 Subject: [PATCH 13/13] codecov --- .../sql/calcite/CalciteInsertDmlTest.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index cce88da53da2..433eb98ace50 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1642,4 +1642,90 @@ public void testErrorWithUnableToConstructColumnSignatureWithExtern() ) .verify(); } + + @Test + public void testErrorWhenBothRowSignatureAndExtendsProvidedToExtern() + { + final String sqlString = "insert into dst \n" + + "select time_parse(\"time\") as __time, * \n" + + "from table( \n" + + "extern(\n" + + "'{\"type\": \"s3\", \"uris\": [\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n" + + "'{\"type\": \"json\"}',\n" + + "'[{\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"channel\", \"type\": \"string\"}]'\n" + + ")\n" + + ") EXTEND (\"time\" VARCHAR, \"channel\" VARCHAR)\n" + + "partitioned by DAY\n" + + "clustered by channel"; + HashMap context = new HashMap<>(DEFAULT_CONTEXT); + context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); + testIngestionQuery().context(context).sql(sqlString) + .expectValidationError( + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageContains( + "EXTERN requires either a [signature] value or an EXTEND clause, but not both" + ) + ) + .verify(); + } + + @Test + public void testErrorWhenNoneOfRowSignatureAndExtendsProvidedToExtern() + { + final String sqlString = "insert into dst \n" + + "select time_parse(\"time\") as __time, * \n" + + "from table( \n" + + "extern(\n" + + "'{\"type\": \"s3\", \"uris\": [\\\"s3://imply-eng-datasets/qa/IngestionTest/wikipedia/files/wikiticker-2015-09-12-sampled.mini.json.gz\\\"]}',\n" + + "'{\"type\": \"json\"}'\n" + + ")\n" + + ")\n" + + "partitioned by DAY\n" + + "clustered by channel"; + HashMap context = new HashMap<>(DEFAULT_CONTEXT); + context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); + testIngestionQuery().context(context).sql(sqlString) + .expectValidationError( + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageContains( + "EXTERN requires either a [signature] value or an EXTEND clause" + ) + ) + .verify(); + } + + @Test + public void testErrorWhenInputSourceInvalid() + { + final String sqlString = "insert into dst \n" + + "select time_parse(\"time\") as __time, * \n" + + "from table( \n" + + "extern(\n" + + "'{\"type\": \"local\"}',\n" + + "'{\"type\": \"json\"}',\n" + + "'[{\"name\": \"time\", \"type\": \"string\"}, {\"name\": \"channel\", \"type\": \"string\"}]'\n" + + ")\n" + + ")\n" + + "partitioned by DAY\n" + + "clustered by channel"; + HashMap context = new HashMap<>(DEFAULT_CONTEXT); + context.put(PlannerContext.CTX_SQL_OUTER_LIMIT, 100); + testIngestionQuery().context(context).sql(sqlString) + .expectValidationError( + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageContains( + "Invalid value for the field [inputSource]. Reason:" + ) + ) + .verify(); + } }