Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -48,7 +48,6 @@
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
Expand Down Expand Up @@ -76,10 +75,6 @@

public class MSQTaskQueryMaker implements QueryMaker
{

private static final String DESTINATION_DATASOURCE = "dataSource";
private static final String DESTINATION_REPORT = "taskReport";

public static final String USER_KEY = "__user";

private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.ALL;
Expand Down Expand Up @@ -128,25 +123,31 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
MSQMode.populateDefaultQueryContext(msqMode, nativeQueryContext);
}

final String ctxDestination =
DimensionHandlerUtils.convertObjectToString(MultiStageQueryContext.getDestination(sqlQueryContext));

Object segmentGranularity;
try {
segmentGranularity = Optional.ofNullable(plannerContext.queryContext()
.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY))
.orElse(jsonMapper.writeValueAsString(DEFAULT_SEGMENT_GRANULARITY));
}
catch (JsonProcessingException e) {
throw new IAE("Unable to deserialize the insert granularity. Please retry the query with a valid "
+ "segment graularity");
// This would only be thrown if we are unable to serialize the DEFAULT_SEGMENT_GRANULARITY, which we don't expect
// to happen
throw DruidException.defensive()
.build(
e,
"Unable to deserialize the DEFAULT_SEGMENT_GRANULARITY in MSQTaskQueryMaker. "
+ "This shouldn't have happened since the DEFAULT_SEGMENT_GRANULARITY object is guaranteed to be "
+ "serializable. Please raise an issue in case you are seeing this message while executing a query."
);
}

final int maxNumTasks = MultiStageQueryContext.getMaxNumTasks(sqlQueryContext);

if (maxNumTasks < 2) {
throw new IAE(MultiStageQueryContext.CTX_MAX_NUM_TASKS
+ " cannot be less than 2 since at least 1 controller and 1 worker is necessary.");
throw InvalidInput.exception(
"MSQ context maxNumTasks [%,d] cannot be less than 2, since at least 1 controller and 1 worker is necessary",
maxNumTasks
);
}

// This parameter is used internally for the number of worker tasks only, so we subtract 1
Expand Down Expand Up @@ -202,16 +203,19 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
final MSQDestination destination;

if (targetDataSource != null) {
if (ctxDestination != null && !DESTINATION_DATASOURCE.equals(ctxDestination)) {
throw new IAE("Cannot INSERT with destination [%s]", ctxDestination);
}

Granularity segmentGranularityObject;
try {
segmentGranularityObject = jsonMapper.readValue((String) segmentGranularity, Granularity.class);
}
catch (Exception e) {
throw new ISE("Unable to convert %s to a segment granularity", segmentGranularity);
throw DruidException.defensive()
.build(
e,
"Unable to deserialize the provided segmentGranularity [%s]. "
+ "This is populated internally by Druid and therefore should not occur. "
+ "Please contact the developers if you are seeing this error message.",
segmentGranularity
);
}

final List<String> segmentSortOrder = MultiStageQueryContext.getSortOrder(sqlQueryContext);
Expand All @@ -228,16 +232,19 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
replaceTimeChunks
);
} else {
if (ctxDestination != null && !DESTINATION_REPORT.equals(ctxDestination)) {
throw new IAE("Cannot SELECT with destination [%s]", ctxDestination);
}
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
destination = TaskReportMSQDestination.instance();
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
destination = DurableStorageMSQDestination.instance();
} else {
throw new IAE("Cannot SELECT with destination [%s]", msqSelectDestination.name());
throw InvalidInput.exception(
"Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to "
+ "[%s] and [%s]",
msqSelectDestination.name(),
MSQSelectDestination.TASK_REPORT.toString(),
MSQSelectDestination.DURABLE_STORAGE.toString()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
Expand All @@ -60,7 +58,6 @@ public class MSQTaskSqlEngine implements SqlEngine
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
.add(MultiStageQueryContext.CTX_DESTINATION)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.build();

Expand Down Expand Up @@ -125,17 +122,17 @@ public boolean featureAvailable(EngineFeature feature, PlannerContext plannerCon
case SCAN_NEEDS_SIGNATURE:
return true;
default:
throw new IAE("Unrecognized feature: %s", feature);
throw SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(), feature);
}
}

@Override
public QueryMaker buildQueryMakerForSelect(
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateSelect(relRoot.fields, plannerContext);
validateSelect(plannerContext);

return new MSQTaskQueryMaker(
null,
Expand All @@ -156,7 +153,7 @@ public QueryMaker buildQueryMakerForInsert(
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateInsert(relRoot.rel, relRoot.fields, plannerContext);

Expand All @@ -169,23 +166,31 @@ public QueryMaker buildQueryMakerForInsert(
);
}

private static void validateSelect(
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
) throws ValidationException
/**
* Checks if the SELECT contains {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
* defensive cheeck because {@link org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
* {@link #validateContext}
*/
private static void validateSelect(final PlannerContext plannerContext)
{
if (plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)) {
throw new ValidationException(
StringUtils.format("Cannot use \"%s\" without INSERT", DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
);
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
"The SELECT query's context contains invalid parameter [%s] which is supposed to be populated "
+ "by Druid for INSERT queries. If the user is seeing this exception, that means there's a bug in Druid "
+ "that is populating the query context with the segment's granularity.",
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);
}
}

private static void validateInsert(
final RelNode rootRel,
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
) throws ValidationException
)
{
validateNoDuplicateAliases(fieldMappings);

Expand All @@ -199,12 +204,10 @@ private static void validateInsert(
// Validate the __time field has the proper type.
final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName();
if (timeType != SqlTypeName.TIMESTAMP) {
throw new ValidationException(
StringUtils.format(
"Field \"%s\" must be of type TIMESTAMP (was %s)",
ColumnHolder.TIME_COLUMN_NAME,
timeType
)
throw InvalidSqlInput.exception(
"Field [%s] was the wrong type [%s], expected TIMESTAMP",
ColumnHolder.TIME_COLUMN_NAME,
timeType
);
}
}
Expand All @@ -220,13 +223,18 @@ private static void validateInsert(
);
}
catch (Exception e) {
throw new ValidationException(
StringUtils.format(
"Invalid segmentGranularity: %s",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
),
e
);
// This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is
// populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching
// here
Comment thread
LakshSingla marked this conversation as resolved.
throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
e,
"[%s] is not a valid value for [%s]",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY),
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);

}

final boolean hasSegmentGranularity = !Granularities.ALL.equals(segmentGranularity);
Expand All @@ -237,11 +245,10 @@ private static void validateInsert(
validateLimitAndOffset(rootRel, !hasSegmentGranularity);

if (hasSegmentGranularity && timeFieldIndex < 0) {
throw new ValidationException(
StringUtils.format(
"INSERT queries with segment granularity other than \"all\" must have a \"%s\" field.",
ColumnHolder.TIME_COLUMN_NAME
)
throw InvalidInput.exception(
"The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL. "
+ "Therefore, the query must specify a time column (named __time).",
segmentGranularity
);
}
}
Expand Down
Loading