diff --git a/extensions-core/multi-stage-query/pom.xml b/extensions-core/multi-stage-query/pom.xml index bc26993c072b..77f18c78f62b 100644 --- a/extensions-core/multi-stage-query/pom.xml +++ b/extensions-core/multi-stage-query/pom.xml @@ -196,6 +196,11 @@ commons-io provided + + org.apache.httpcomponents + httpclient + provided + diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java index 038d1b56c72b..a69174517ec6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java @@ -20,13 +20,13 @@ package org.apache.druid.msq.dart; import com.google.common.collect.ImmutableList; -import org.apache.druid.msq.dart.controller.http.DartSqlResource; import org.apache.druid.msq.dart.worker.http.DartWorkerResource; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.msq.rpc.WorkerResource; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.http.SqlResource; import java.util.List; @@ -34,7 +34,7 @@ public class DartResourcePermissionMapper implements ResourcePermissionMapper { /** * Permissions for admin APIs in {@link DartWorkerResource} and {@link WorkerResource}. Note that queries from - * end users go through {@link DartSqlResource}, which wouldn't use these mappings. + * end users go through {@link SqlResource}, which wouldn't use these mappings. */ @Override public List getAdminPermissions() @@ -47,7 +47,7 @@ public List getAdminPermissions() /** * Permissions for per-query APIs in {@link DartWorkerResource} and {@link WorkerResource}. Note that queries from - * end users go through {@link DartSqlResource}, which wouldn't use these mappings. + * end users go through {@link SqlResource}, which wouldn't use these mappings. */ @Override public List getQueryPermissions(String queryId) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java index 2bc5d08704d5..88610ee8f338 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java @@ -27,6 +27,8 @@ import org.apache.druid.msq.util.MSQTaskQueryMakerUtils; import org.apache.druid.query.QueryContexts; import org.apache.druid.server.DruidNode; +import org.apache.druid.sql.http.GetQueriesResponse; +import org.apache.druid.sql.http.QueryInfo; import org.joda.time.DateTime; import java.util.Objects; @@ -34,7 +36,7 @@ /** * Class included in {@link GetQueriesResponse}. */ -public class DartQueryInfo +public class DartQueryInfo implements QueryInfo { private final String sqlQueryId; private final String dartQueryId; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java deleted file mode 100644 index 5c9b3e5ff721..000000000000 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.dart.controller.http; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Futures; -import com.google.inject.Inject; -import com.sun.jersey.api.core.HttpContext; -import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.msq.dart.Dart; -import org.apache.druid.msq.dart.controller.ControllerHolder; -import org.apache.druid.msq.dart.controller.DartControllerRegistry; -import org.apache.druid.msq.dart.controller.sql.DartSqlClients; -import org.apache.druid.msq.indexing.error.CancellationReason; -import org.apache.druid.query.QueryContexts; -import org.apache.druid.server.DruidNode; -import org.apache.druid.server.ResponseContextConfig; -import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.security.Action; -import org.apache.druid.server.security.AuthenticationResult; -import org.apache.druid.server.security.AuthorizationResult; -import org.apache.druid.server.security.AuthorizationUtils; -import org.apache.druid.server.security.AuthorizerMapper; -import org.apache.druid.server.security.Resource; -import org.apache.druid.server.security.ResourceAction; -import org.apache.druid.sql.HttpStatement; -import org.apache.druid.sql.SqlLifecycleManager; -import org.apache.druid.sql.SqlStatementFactory; -import org.apache.druid.sql.http.SqlQuery; -import org.apache.druid.sql.http.SqlResource; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.DELETE; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * Resource for Dart queries. API-compatible with {@link SqlResource}, so clients can be pointed from - * {@code /druid/v2/sql/} to {@code /druid/v2/sql/dart/} without code changes. - */ -@Path(DartSqlResource.PATH + '/') -public class DartSqlResource extends SqlResource -{ - public static final String PATH = "/druid/v2/sql/dart"; - - private static final Logger log = new Logger(DartSqlResource.class); - - private final DartControllerRegistry controllerRegistry; - private final SqlLifecycleManager sqlLifecycleManager; - private final DartSqlClients sqlClients; - private final AuthorizerMapper authorizerMapper; - - // make dartqueryId a prefix the {{queeryid}}-{{startupTime}}-{{queryIndex} - @Inject - public DartSqlResource( - final ObjectMapper jsonMapper, - final AuthorizerMapper authorizerMapper, - @Dart final SqlStatementFactory sqlStatementFactory, - final DartControllerRegistry controllerRegistry, - final SqlLifecycleManager sqlLifecycleManager, - final DartSqlClients sqlClients, - final ServerConfig serverConfig, - final ResponseContextConfig responseContextConfig, - @Self final DruidNode selfNode - ) - { - super( - jsonMapper, - authorizerMapper, - sqlStatementFactory, - sqlLifecycleManager, - serverConfig, - responseContextConfig, - selfNode - ); - this.controllerRegistry = controllerRegistry; - this.sqlLifecycleManager = sqlLifecycleManager; - this.sqlClients = sqlClients; - this.authorizerMapper = authorizerMapper; - } - - /** - * API that allows callers to check if this resource is installed without actually issuing a query. If installed, - * this call returns 200 OK. If not installed, callers get 404 Not Found. - */ - @GET - @Path("/enabled") - @Produces(MediaType.APPLICATION_JSON) - public Response doGetEnabled(@Context final HttpServletRequest request) - { - AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request); - return Response.ok(ImmutableMap.of("enabled", true)).build(); - } - - /** - * API to list all running queries. - * - * @param selfOnly if true, return queries running on this server. If false, return queries running on all servers. - * @param req http request - */ - @GET - @Produces(MediaType.APPLICATION_JSON) - public GetQueriesResponse doGetRunningQueries( - @QueryParam("selfOnly") final String selfOnly, - @Context final HttpServletRequest req - ) - { - final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(req); - final AuthorizationResult stateReadAccess = AuthorizationUtils.authorizeAllResourceActions( - authenticationResult, - Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, Action.READ)), - authorizerMapper - ); - - final List queries = - controllerRegistry.getAllHolders() - .stream() - .map(DartQueryInfo::fromControllerHolder) - .collect(Collectors.toList()); - - // Add queries from all other servers, if "selfOnly" is not set. - if (selfOnly == null) { - final List otherQueries = FutureUtils.getUnchecked( - Futures.successfulAsList( - Iterables.transform(sqlClients.getAllClients(), client -> client.getRunningQueries(true))), - true - ); - - for (final GetQueriesResponse response : otherQueries) { - if (response != null) { - queries.addAll(response.getQueries()); - } - } - } - - // Sort queries by start time, breaking ties by query ID, so the list comes back in a consistent and nice order. - queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId)); - - final GetQueriesResponse response; - if (stateReadAccess.allowAccessWithNoRestriction()) { - // User can READ STATE, so they can see all running queries, as well as authentication details. - response = new GetQueriesResponse(queries); - } else { - // User cannot READ STATE, so they can see only their own queries, without authentication details. - response = new GetQueriesResponse( - queries.stream() - .filter( - query -> - authenticationResult.getAuthenticatedBy() != null - && authenticationResult.getIdentity() != null - && Objects.equals(authenticationResult.getAuthenticatedBy(), query.getAuthenticator()) - && Objects.equals(authenticationResult.getIdentity(), query.getIdentity())) - .map(DartQueryInfo::withoutAuthenticationResult) - .collect(Collectors.toList()) - ); - } - - AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(req); - return response; - } - - /** - * API to issue a query. - */ - @POST - @Produces(MediaType.APPLICATION_JSON) - @Override - public Response doPost( - @Context final HttpServletRequest req, - @Context final HttpContext httpContext - ) - { - return this.doPost(SqlQuery.from(httpContext), req); - } - - @Override - public Response doPost( - final SqlQuery sqlQuery, - final HttpServletRequest req - ) - { - final Map context = new HashMap<>(sqlQuery.getContext()); - - return super.doPost(sqlQuery.withOverridenContext(context), req); - } - - /** - * API to cancel a query. - */ - @DELETE - @Path("{id}") - @Produces(MediaType.APPLICATION_JSON) - @Override - public Response cancelQuery( - @PathParam("id") String sqlQueryId, - @Context final HttpServletRequest req - ) - { - log.debug("Received cancel request for query[%s]", sqlQueryId); - - List cancelables = sqlLifecycleManager.getAll(sqlQueryId); - if (cancelables.isEmpty()) { - // Return ACCEPTED even if the query wasn't found. When the Router broadcasts cancellation requests to all - // Brokers, this ensures the user sees a successful request. - AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(req); - return Response.status(Response.Status.ACCEPTED).build(); - } - - final AuthorizationResult authResult = authorizeCancellation(req, cancelables); - - if (authResult.allowAccessWithNoRestriction()) { - sqlLifecycleManager.removeAll(sqlQueryId, cancelables); - - // Don't call cancel() on the cancelables. That just cancels native queries, which is useless here. Instead, - // get the controller and stop it. - for (SqlLifecycleManager.Cancelable cancelable : cancelables) { - final HttpStatement stmt = (HttpStatement) cancelable; - final Object dartQueryId = stmt.context().get(QueryContexts.CTX_DART_QUERY_ID); - if (dartQueryId instanceof String) { - final ControllerHolder holder = controllerRegistry.get((String) dartQueryId); - if (holder != null) { - holder.cancel(CancellationReason.USER_REQUEST); - } - } else { - log.warn( - "%s[%s] for query[%s] is not a string, cannot cancel.", - QueryContexts.CTX_DART_QUERY_ID, - dartQueryId, - sqlQueryId - ); - } - } - - // Return ACCEPTED even if the query wasn't found. When the Router broadcasts cancellation requests to all - // Brokers, this ensures the user sees a successful request. - return Response.status(Response.Status.ACCEPTED).build(); - } else { - return Response.status(Response.Status.FORBIDDEN).build(); - } - } -} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java index 447da229d05e..c8da492237a8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java @@ -20,13 +20,13 @@ package org.apache.druid.msq.dart.controller.sql; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.druid.msq.dart.controller.http.DartSqlResource; -import org.apache.druid.msq.dart.controller.http.GetQueriesResponse; +import org.apache.druid.sql.http.GetQueriesResponse; +import org.apache.druid.sql.http.SqlResource; import javax.servlet.http.HttpServletRequest; /** - * Client for the {@link DartSqlResource} resource. + * Client for the {@link SqlResource} resource for Dart queries. */ public interface DartSqlClient { @@ -36,7 +36,7 @@ public interface DartSqlClient * @param selfOnly true if only queries from this server should be returned; false if queries from all servers * should be returned * - * @see DartSqlResource#doGetRunningQueries(String, HttpServletRequest) the server side + * @see SqlResource#doGetRunningQueries(String, HttpServletRequest) the server side */ ListenableFuture getRunningQueries(boolean selfOnly); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java index c2355a43e31a..3562f5c2ff88 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java @@ -24,13 +24,13 @@ import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.msq.dart.controller.http.DartSqlResource; import org.apache.druid.rpc.FixedServiceLocator; import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.ServiceLocation; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.server.DruidNode; +import org.apache.druid.sql.http.SqlResource; /** * Production implementation of {@link DartSqlClientFactory}. @@ -55,7 +55,7 @@ public DartSqlClient makeClient(DruidNode node) { final ServiceClient client = clientFactory.makeClient( StringUtils.format("%s[dart-sql]", node.getHostAndPortToUse()), - new FixedServiceLocator(ServiceLocation.fromDruidNode(node).withBasePath(DartSqlResource.PATH)), + new FixedServiceLocator(ServiceLocation.fromDruidNode(node).withBasePath(SqlResource.PATH)), StandardRetryPolicy.noRetries() ); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java index aebf7e4b90fa..7b0542205214 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java @@ -24,11 +24,14 @@ import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; -import org.apache.druid.msq.dart.controller.http.GetQueriesResponse; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.sql.http.GetQueriesResponse; +import org.apache.http.client.utils.URIBuilder; import org.jboss.netty.handler.codec.http.HttpMethod; +import java.net.URISyntaxException; + /** * Production implementation of {@link DartSqlClient}. */ @@ -46,12 +49,22 @@ public DartSqlClientImpl(final ServiceClient client, final ObjectMapper jsonMapp @Override public ListenableFuture getRunningQueries(final boolean selfOnly) { - return FutureUtils.transform( - client.asyncRequest( - new RequestBuilder(HttpMethod.GET, selfOnly ? "/?selfOnly" : "/"), - new BytesFullResponseHandler() - ), - holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), GetQueriesResponse.class) - ); + try { + URIBuilder builder = new URIBuilder("/queries"); + if (selfOnly) { + builder.addParameter("selfOnly", null); + } + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, builder.toString()), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), GetQueriesResponse.class) + ); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java index 733f69ee4bf9..f28d895cf60f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java @@ -30,8 +30,8 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.msq.dart.controller.http.DartSqlResource; import org.apache.druid.server.DruidNode; +import org.apache.druid.sql.http.SqlResource; import javax.servlet.http.HttpServletRequest; import java.util.Collection; @@ -41,7 +41,7 @@ /** * Keeps {@link DartSqlClient} for all servers except ourselves. Currently the purpose of this is to power - * the "get all queries" API at {@link DartSqlResource#doGetRunningQueries(String, HttpServletRequest)}. + * the "get all queries" API at {@link SqlResource#doGetRunningQueries(String, String, HttpServletRequest)}. */ @ManageLifecycle public class DartSqlClients implements DruidNodeDiscovery.Listener diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java index a75415f7d057..2836a1e3cebc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java @@ -20,26 +20,38 @@ package org.apache.druid.msq.dart.controller.sql; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; import com.google.inject.Inject; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.guice.LazySingleton; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.dart.Dart; +import org.apache.druid.msq.dart.controller.ControllerHolder; import org.apache.druid.msq.dart.controller.DartControllerContextFactory; import org.apache.druid.msq.dart.controller.DartControllerRegistry; +import org.apache.druid.msq.dart.controller.http.DartQueryInfo; import org.apache.druid.msq.dart.guice.DartControllerConfig; import org.apache.druid.msq.exec.QueryKitSpecFactory; +import org.apache.druid.msq.indexing.error.CancellationReason; import org.apache.druid.msq.sql.DartQueryKitSpecFactory; import org.apache.druid.msq.sql.MSQTaskSqlEngine; import org.apache.druid.query.DefaultQueryConfig; import org.apache.druid.query.QueryContext; import org.apache.druid.query.QueryContexts; +import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizationResult; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.EngineFeature; @@ -47,15 +59,22 @@ import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.run.SqlEngines; import org.apache.druid.sql.destination.IngestDestination; +import org.apache.druid.sql.http.GetQueriesResponse; +import org.apache.druid.sql.http.QueryInfo; +import java.util.Comparator; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; @LazySingleton public class DartSqlEngine implements SqlEngine { - private static final String NAME = "msq-dart"; + public static final String NAME = "msq-dart"; + private static final Logger log = new Logger(DartSqlEngine.class); private final DartControllerContextFactory controllerContextFactory; private final DartControllerRegistry controllerRegistry; @@ -64,6 +83,8 @@ public class DartSqlEngine implements SqlEngine private final ServerConfig serverConfig; private final QueryKitSpecFactory queryKitSpecFactory; private final DefaultQueryConfig dartQueryConfig; + private final SqlToolbox toolbox; + private final DartSqlClients sqlClients; @Inject public DartSqlEngine( @@ -72,7 +93,9 @@ public DartSqlEngine( DartControllerConfig controllerConfig, DartQueryKitSpecFactory queryKitSpecFactory, ServerConfig serverConfig, - @Dart DefaultQueryConfig dartQueryConfig + @Dart DefaultQueryConfig dartQueryConfig, + SqlToolbox toolbox, + DartSqlClients sqlClients ) { this( @@ -82,7 +105,9 @@ public DartSqlEngine( Execs.multiThreaded(controllerConfig.getConcurrentQueries(), "dart-controller-%s"), queryKitSpecFactory, serverConfig, - dartQueryConfig + dartQueryConfig, + toolbox, + sqlClients ); } @@ -93,7 +118,9 @@ public DartSqlEngine( ExecutorService controllerExecutor, QueryKitSpecFactory queryKitSpecFactory, ServerConfig serverConfig, - DefaultQueryConfig dartQueryConfig + DefaultQueryConfig dartQueryConfig, + SqlToolbox toolbox, + DartSqlClients sqlClients ) { this.controllerContextFactory = controllerContextFactory; @@ -103,6 +130,8 @@ public DartSqlEngine( this.queryKitSpecFactory = queryKitSpecFactory; this.serverConfig = serverConfig; this.dartQueryConfig = dartQueryConfig; + this.toolbox = toolbox; + this.sqlClients = sqlClients; } @Override @@ -227,4 +256,85 @@ public void initContextMap(Map contextMap) final String dartQueryId = UUID.randomUUID().toString(); contextMap.put(QueryContexts.CTX_DART_QUERY_ID, dartQueryId); } + + @Override + public SqlStatementFactory getSqlStatementFactory() + { + return new SqlStatementFactory(toolbox.withEngine(this)); + } + + @Override + public List getRunningQueries( + boolean selfOnly, + AuthenticationResult authenticationResult, + AuthorizationResult authorizationResult + ) + { + final List queries = controllerRegistry.getAllHolders() + .stream() + .map(DartQueryInfo::fromControllerHolder) + .collect(Collectors.toList()); + + // Add queries from all other servers, if "selfOnly" is false. + if (!selfOnly) { + final List otherQueries = FutureUtils.getUnchecked( + Futures.successfulAsList( + Iterables.transform(sqlClients.getAllClients(), client -> client.getRunningQueries(true))), + true + ); + + for (final GetQueriesResponse response : otherQueries) { + if (response != null) { + response.getQueries().stream() + .filter(queryInfo -> (queryInfo instanceof DartQueryInfo)) + .map(queryInfo -> (DartQueryInfo) queryInfo) + .forEach(queries::add); + } + } + } + + // Sort queries by start time, breaking ties by query ID, so the list comes back in a consistent and nice order. + queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId)); + + if (authorizationResult.allowAccessWithNoRestriction()) { + // User can READ STATE, so they can see all running queries, as well as authentication details. + return List.copyOf(queries); + } else { + // User cannot READ STATE, so they can see only their own queries, without authentication details. + return queries.stream() + .filter( + query -> + authenticationResult.getAuthenticatedBy() != null + && authenticationResult.getIdentity() != null + && Objects.equals( + authenticationResult.getAuthenticatedBy(), + query.getAuthenticator() + ) + && Objects.equals( + authenticationResult.getIdentity(), + query.getIdentity() + )) + .map(DartQueryInfo::withoutAuthenticationResult) + .collect(Collectors.toList()); + } + } + + @Override + public void cancelQuery(PlannerContext plannerContext, QueryScheduler queryScheduler) + { + final Object dartQueryId = plannerContext.queryContext().get(QueryContexts.CTX_DART_QUERY_ID); + if (dartQueryId instanceof String) { + final ControllerHolder holder = controllerRegistry.get((String) dartQueryId); + if (holder != null) { + holder.cancel(CancellationReason.USER_REQUEST); + } + } else { + log.warn( + "%s[%s] for query[%s] is not a string, cannot cancel.", + QueryContexts.CTX_DART_QUERY_ID, + dartQueryId, + plannerContext.getSqlQueryId() + ); + } + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java index 0dae32f7cf09..3ee647e65ba6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java @@ -19,13 +19,15 @@ package org.apache.druid.msq.dart.guice; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.multibindings.Multibinder; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; -import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; @@ -40,7 +42,7 @@ import org.apache.druid.msq.dart.controller.DartControllerRegistry; import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl; import org.apache.druid.msq.dart.controller.DartMessageRelays; -import org.apache.druid.msq.dart.controller.http.DartSqlResource; +import org.apache.druid.msq.dart.controller.http.DartQueryInfo; import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory; import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl; import org.apache.druid.msq.dart.controller.sql.DartSqlClients; @@ -49,7 +51,10 @@ import org.apache.druid.query.DefaultQueryConfig; import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlToolbox; +import org.apache.druid.sql.calcite.run.SqlEngine; +import java.util.Collections; +import java.util.List; import java.util.Properties; /** @@ -77,8 +82,6 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, DartModules.DART_PROPERTY_BASE + ".controller", DartControllerConfig.class); JsonConfigProvider.bind(binder, DartModules.DART_PROPERTY_BASE + ".query", DefaultQueryConfig.class, Dart.class); - Jerseys.addResource(binder, DartSqlResource.class); - LifecycleModule.register(binder, DartSqlClients.class); LifecycleModule.register(binder, DartMessageRelays.class); @@ -94,6 +97,10 @@ public void configure(Binder binder) binder.bind(ResourcePermissionMapper.class) .annotatedWith(Dart.class) .to(DartResourcePermissionMapper.class); + Multibinder.newSetBinder(binder, SqlEngine.class) + .addBinding() + .to(DartSqlEngine.class) + .in(LazySingleton.class); } @Provides @@ -114,4 +121,17 @@ public DartMessageRelays makeMessageRelays( return new DartMessageRelays(discoveryProvider, messageRelayFactory); } } + + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("DartModule").registerSubtypes( + new NamedType( + DartQueryInfo.class, + DartSqlEngine.NAME + ) + ) + ); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java index e9bd59f53d8f..bd565078f9bf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java @@ -19,6 +19,8 @@ package org.apache.druid.msq.dart.guice; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Key; @@ -43,7 +45,9 @@ import org.apache.druid.messages.server.OutboxImpl; import org.apache.druid.msq.dart.Dart; import org.apache.druid.msq.dart.DartResourcePermissionMapper; +import org.apache.druid.msq.dart.controller.http.DartQueryInfo; import org.apache.druid.msq.dart.controller.messages.ControllerMessage; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.dart.worker.DartDataSegmentProvider; import org.apache.druid.msq.dart.worker.DartWorkerFactory; import org.apache.druid.msq.dart.worker.DartWorkerFactoryImpl; @@ -57,6 +61,8 @@ import org.apache.druid.server.security.AuthorizerMapper; import java.io.File; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -150,4 +156,17 @@ public Outbox createOutbox() return new OutboxImpl<>(); } } + + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("DartModule").registerSubtypes( + new NamedType( + DartQueryInfo.class, + DartSqlEngine.NAME + ) + ) + ); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java index bf3b0197c29d..d2fc2bd5b433 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java @@ -22,7 +22,6 @@ import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.msq.counters.CounterSnapshots; import org.apache.druid.msq.counters.CounterSnapshotsTree; -import org.apache.druid.msq.dart.controller.http.DartSqlResource; import org.apache.druid.msq.indexing.MSQControllerTask; import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.error.CancellationReason; @@ -30,9 +29,11 @@ import org.apache.druid.msq.kernel.StageId; import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; import org.apache.druid.query.QueryContexts; +import org.apache.druid.sql.calcite.run.SqlEngine; import javax.annotation.Nullable; import java.util.List; +import java.util.Map; /** * Interface for the controller of a multi-stage query. Each Controller is specific to a particular query. @@ -45,7 +46,7 @@ public interface Controller * Unique task/query ID for the batch query run by this controller. * * Controller IDs must be globally unique. For tasks, this is the task ID from {@link MSQControllerTask#getId()}. - * For Dart, this is {@link QueryContexts#CTX_DART_QUERY_ID}, set by {@link DartSqlResource}. + * For Dart, this is {@link QueryContexts#CTX_DART_QUERY_ID}, set by {@link SqlEngine#initContextMap(Map)}. */ String queryId(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java index 52ee833a110d..1f1d0b1334fb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java @@ -54,6 +54,8 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.parser.DruidSqlIngest; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.planner.Calcites; @@ -87,26 +89,29 @@ public class MSQTaskSqlEngine implements SqlEngine .build(); public static final List TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK"); - private static final String NAME = "msq-task"; + public static final String NAME = "msq-task"; private final OverlordClient overlordClient; private final ObjectMapper jsonMapper; private final MSQTerminalStageSpecFactory terminalStageSpecFactory; private final QueryKitSpecFactory queryKitSpecFactory; + private final SqlToolbox sqlToolbox; @Inject public MSQTaskSqlEngine( final OverlordClient overlordClient, final ObjectMapper jsonMapper, final MSQTerminalStageSpecFactory terminalStageSpecFactory, - final MSQTaskQueryKitSpecFactory queryKitSpecFactory + final MSQTaskQueryKitSpecFactory queryKitSpecFactory, + final SqlToolbox sqlToolbox ) { this.overlordClient = overlordClient; this.jsonMapper = jsonMapper; this.terminalStageSpecFactory = terminalStageSpecFactory; this.queryKitSpecFactory = queryKitSpecFactory; + this.sqlToolbox = sqlToolbox; } @Override @@ -224,6 +229,12 @@ public QueryMaker buildQueryMakerForInsert( ); } + @Override + public SqlStatementFactory getSqlStatementFactory() + { + return new SqlStatementFactory(sqlToolbox.withEngine(this)); + } + /** * Checks if the SELECT contains {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a * defensive cheeck because {@link org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java index 980038723532..eb06ab03097b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java @@ -19,11 +19,43 @@ package org.apache.druid.msq.dart.controller.http; +import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.msq.dart.controller.ControllerHolder; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Map; + public class DartQueryInfoTest { + @Test + void test_serde() throws Exception + { + DartQueryInfo dartQueryInfo = new DartQueryInfo( + "sid", + "did", + "SELECT 1", + "localhost:1001", + "", + "", + DateTimes.of("2000"), + ControllerHolder.State.RUNNING.toString() + ); + ObjectMapper jsonMapper = new DefaultObjectMapper().registerModules(new DartWorkerModule().getJacksonModules()); + byte[] bytes = jsonMapper.writeValueAsBytes(dartQueryInfo); + DartQueryInfo deserialized = jsonMapper.readValue(bytes, DartQueryInfo.class); + Assertions.assertEquals(dartQueryInfo, deserialized); + + // Assert that the engine is present. + Map map = jsonMapper.readValue(bytes, Map.class); + Assertions.assertEquals(DartSqlEngine.NAME, map.get("engine")); + } + @Test public void test_equals() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java index 87efb900cb9a..389e7c8abc42 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java @@ -70,7 +70,6 @@ import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.sql.SqlLifecycleManager; -import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.planner.CalciteRulesManager; import org.apache.druid.sql.calcite.planner.CatalogResolver; @@ -83,8 +82,13 @@ import org.apache.druid.sql.calcite.util.TestTimelineServerView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.hook.DruidHookDispatcher; +import org.apache.druid.sql.http.EngineInfo; +import org.apache.druid.sql.http.GetQueriesResponse; import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.sql.http.SqlEngineRegistry; import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.sql.http.SqlResource; +import org.apache.druid.sql.http.SupportedEnginesResponse; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -100,6 +104,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -109,7 +114,7 @@ import static org.hamcrest.MatcherAssert.assertThat; /** - * Functional test of {@link DartSqlResource}, {@link DartSqlEngine}, and {@link DartQueryMaker}. + * Functional test of {@link SqlResource}, {@link DartSqlEngine}, and {@link DartQueryMaker}. * Other classes are mocked when possible. */ public class DartSqlResourceTest extends MSQTestBase @@ -138,7 +143,7 @@ public class DartSqlResourceTest extends MSQTestBase // Objects created in setUp() below this line. - private DartSqlResource sqlResource; + private SqlResource sqlResource; private DartControllerRegistry controllerRegistry; private ExecutorService controllerExecutor; private AutoCloseable mockCloser; @@ -146,13 +151,13 @@ public class DartSqlResourceTest extends MSQTestBase // Mocks below this line. /** - * Mock for {@link DartSqlClients}, which is used in tests of {@link DartSqlResource#doGetRunningQueries}. + * Mock for {@link DartSqlClients}, which is used in tests of {@link SqlResource#doGetRunningQueries}. */ @Mock private DartSqlClients dartSqlClients; /** - * Mock for {@link DartSqlClient}, which is used in tests of {@link DartSqlResource#doGetRunningQueries}. + * Mock for {@link DartSqlClient}, which is used in tests of {@link SqlResource#doGetRunningQueries}. */ @Mock private DartSqlClient dartSqlClient; @@ -174,42 +179,6 @@ void setUp() { mockCloser = MockitoAnnotations.openMocks(this); - final DartSqlEngine engine = new DartSqlEngine( - new MSQTestControllerContext( - objectMapper, - injector, - null /* not used in this test */, - workerMemoryParameters, - loadedSegmentsMetadata, - TaskLockType.APPEND, - QueryContext.empty() - ) { - @Override - public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec) - { - return super.queryKernelConfig(queryId, querySpec).toBuilder() - .workerIds(ImmutableList.of("some")).build(); - } - }, - controllerRegistry = new DartControllerRegistry() - { - @Override - public void register(ControllerHolder holder) - { - super.register(holder); - controllerRegistered.countDown(); - } - }, - objectMapper.convertValue(ImmutableMap.of(), DartControllerConfig.class), - controllerExecutor = Execs.multiThreaded( - MAX_CONTROLLERS, - StringUtils.encodeForFormat(getClass().getSimpleName() + "-controller-exec") - ), - new DartQueryKitSpecFactory(new TestTimelineServerView(Collections.emptyList())), - new ServerConfig(), - new DefaultQueryConfig(ImmutableMap.of("foo", "bar")) - ); - final DruidSchemaCatalog rootSchema = QueryFrameworkUtils.createMockRootSchema( CalciteTests.INJECTOR, queryFramework().conglomerate(), @@ -239,7 +208,7 @@ public void register(ControllerHolder holder) final SqlLifecycleManager lifecycleManager = new SqlLifecycleManager(); final SqlToolbox toolbox = new SqlToolbox( - engine, + null, plannerFactory, NoopServiceEmitter.instance(), NoopRequestLogger.instance(), @@ -248,14 +217,51 @@ public void register(ControllerHolder holder) lifecycleManager ); - sqlResource = new DartSqlResource( + + final DartSqlEngine engine = new DartSqlEngine( + new MSQTestControllerContext( + objectMapper, + injector, + null /* not used in this test */, + workerMemoryParameters, + loadedSegmentsMetadata, + TaskLockType.APPEND, + QueryContext.empty() + ) { + @Override + public ControllerQueryKernelConfig queryKernelConfig(String queryId, MSQSpec querySpec) + { + return super.queryKernelConfig(queryId, querySpec).toBuilder() + .workerIds(ImmutableList.of("some")).build(); + } + }, + controllerRegistry = new DartControllerRegistry() + { + @Override + public void register(ControllerHolder holder) + { + super.register(holder); + controllerRegistered.countDown(); + } + }, + objectMapper.convertValue(ImmutableMap.of(), DartControllerConfig.class), + controllerExecutor = Execs.multiThreaded( + MAX_CONTROLLERS, + StringUtils.encodeForFormat(getClass().getSimpleName() + "-controller-exec") + ), + new DartQueryKitSpecFactory(new TestTimelineServerView(Collections.emptyList())), + new ServerConfig(), + new DefaultQueryConfig(ImmutableMap.of("foo", "bar")), + toolbox, + dartSqlClients + ); + + sqlResource = new SqlResource( objectMapper, CalciteTests.TEST_AUTHORIZER_MAPPER, - new SqlStatementFactory(toolbox), - controllerRegistry, - lifecycleManager, - dartSqlClients, new ServerConfig() /* currently only used for error transform strategy */, + lifecycleManager, + new SqlEngineRegistry(Set.of(engine)), ResponseContextConfig.newConfig(false), SELF_NODE ); @@ -284,12 +290,13 @@ void tearDown() throws Exception @Test public void test_getEnabled() { - final Response response = sqlResource.doGetEnabled(httpServletRequest); - Assertions.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + Response response = sqlResource.getSupportedEngines(httpServletRequest); + Set supportedEngines = ((SupportedEnginesResponse) response.getEntity()).getEngines(); + Assertions.assertTrue(supportedEngines.contains(new EngineInfo(DartSqlEngine.NAME))); } /** - * Test where a superuser calls {@link DartSqlResource#doGetRunningQueries} with selfOnly enabled. + * Test where a superuser calls {@link SqlResource#doGetRunningQueries} with selfOnly enabled. */ @Test public void test_getRunningQueries_selfOnly_superUser() @@ -301,7 +308,7 @@ public void test_getRunningQueries_selfOnly_superUser() Assertions.assertEquals( new GetQueriesResponse(Collections.singletonList(DartQueryInfo.fromControllerHolder(holder))), - sqlResource.doGetRunningQueries("", httpServletRequest) + sqlResource.doGetRunningQueries("", httpServletRequest).getEntity() ); controllerRegistry.deregister(holder); @@ -309,7 +316,7 @@ public void test_getRunningQueries_selfOnly_superUser() /** * Test where {@link #REGULAR_USER_NAME} and {@link #DIFFERENT_REGULAR_USER_NAME} issue queries, and - * {@link #REGULAR_USER_NAME} calls {@link DartSqlResource#doGetRunningQueries} with selfOnly enabled. + * {@link #REGULAR_USER_NAME} calls {@link SqlResource#doGetRunningQueries} with selfOnly enabled. */ @Test public void test_getRunningQueries_selfOnly_regularUser() @@ -325,7 +332,7 @@ public void test_getRunningQueries_selfOnly_regularUser() Assertions.assertEquals( new GetQueriesResponse( Collections.singletonList(DartQueryInfo.fromControllerHolder(holder).withoutAuthenticationResult())), - sqlResource.doGetRunningQueries("", httpServletRequest) + sqlResource.doGetRunningQueries("", httpServletRequest).getEntity() ); controllerRegistry.deregister(holder); @@ -333,7 +340,7 @@ public void test_getRunningQueries_selfOnly_regularUser() } /** - * Test where a superuser calls {@link DartSqlResource#doGetRunningQueries} with selfOnly disabled. + * Test where a superuser calls {@link SqlResource#doGetRunningQueries} with selfOnly disabled. */ @Test public void test_getRunningQueries_global_superUser() @@ -366,14 +373,14 @@ public void test_getRunningQueries_global_superUser() remoteQueryInfo ) ), - sqlResource.doGetRunningQueries(null, httpServletRequest) + sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() ); controllerRegistry.deregister(localHolder); } /** - * Test where a superuser calls {@link DartSqlResource#doGetRunningQueries} with selfOnly disabled, and where the + * Test where a superuser calls {@link SqlResource#doGetRunningQueries} with selfOnly disabled, and where the * remote server has a problem. */ @Test @@ -393,7 +400,7 @@ public void test_getRunningQueries_global_remoteError_superUser() // were able to fetch.) Assertions.assertEquals( new GetQueriesResponse(ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder))), - sqlResource.doGetRunningQueries(null, httpServletRequest) + sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() ); controllerRegistry.deregister(localHolder); @@ -401,7 +408,7 @@ public void test_getRunningQueries_global_remoteError_superUser() /** * Test where {@link #REGULAR_USER_NAME} and {@link #DIFFERENT_REGULAR_USER_NAME} issue queries, and - * {@link #REGULAR_USER_NAME} calls {@link DartSqlResource#doGetRunningQueries} with selfOnly disabled. + * {@link #REGULAR_USER_NAME} calls {@link SqlResource#doGetRunningQueries} with selfOnly disabled. */ @Test public void test_getRunningQueries_global_regularUser() @@ -430,7 +437,7 @@ public void test_getRunningQueries_global_regularUser() Assertions.assertEquals( new GetQueriesResponse( ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder).withoutAuthenticationResult())), - sqlResource.doGetRunningQueries(null, httpServletRequest) + sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() ); controllerRegistry.deregister(localHolder); @@ -438,7 +445,7 @@ public void test_getRunningQueries_global_regularUser() /** * Test where {@link #REGULAR_USER_NAME} and {@link #DIFFERENT_REGULAR_USER_NAME} issue queries, and - * {@link #DIFFERENT_REGULAR_USER_NAME} calls {@link DartSqlResource#doGetRunningQueries} with selfOnly disabled. + * {@link #DIFFERENT_REGULAR_USER_NAME} calls {@link SqlResource#doGetRunningQueries} with selfOnly disabled. */ @Test public void test_getRunningQueries_global_differentRegularUser() @@ -466,7 +473,7 @@ public void test_getRunningQueries_global_differentRegularUser() // The endpoint returns only the query issued by DIFFERENT_REGULAR_USER_NAME. Assertions.assertEquals( new GetQueriesResponse(ImmutableList.of(remoteQueryInfo.withoutAuthenticationResult())), - sqlResource.doGetRunningQueries(null, httpServletRequest) + sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() ); controllerRegistry.deregister(holder); @@ -490,7 +497,7 @@ public void test_doPost_regularUser() false, false, false, - Collections.emptyMap(), + Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME), Collections.emptyList() ); @@ -517,7 +524,7 @@ public void test_doPost_regularUser_forbidden() false, false, false, - Collections.emptyMap(), + Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME), Collections.emptyList() ); @@ -545,7 +552,7 @@ public void test_doPost_regularUser_runtimeError() throws IOException false, false, false, - Collections.emptyMap(), + Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME), Collections.emptyList() ); @@ -562,7 +569,6 @@ public void test_doPost_regularUser_runtimeError() throws IOException assertThat((String) e.get("errorMessage"), CoreMatchers.startsWith("InvalidNullByte: ")); } - @Test public void test_doPost_regularUser_fullReport() throws Exception { final MockAsyncContext asyncContext = new MockAsyncContext(); @@ -580,7 +586,7 @@ public void test_doPost_regularUser_fullReport() throws Exception false, false, false, - ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true), + ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true, QueryContexts.ENGINE, DartSqlEngine.NAME), Collections.emptyList() ); @@ -621,7 +627,11 @@ public void test_doPost_queryTimeout() throws Exception false, false, false, - ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true, QueryContexts.TIMEOUT_KEY, 1), + ImmutableMap.of( + QueryContexts.CTX_FULL_REPORT, true, + QueryContexts.TIMEOUT_KEY, 1, + QueryContexts.ENGINE, DartSqlEngine.NAME + ), Collections.emptyList() ); @@ -662,7 +672,7 @@ public void test_doPost_regularUser_runtimeError_fullReport() throws Exception false, false, false, - ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true), + ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true, QueryContexts.ENGINE, DartSqlEngine.NAME), Collections.emptyList() ); @@ -734,7 +744,11 @@ private void run_test_doPost_regularUser_fullReport_thenCancelQuery(final boolea false, false, false, - ImmutableMap.of(QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId, QueryContexts.CTX_FULL_REPORT, fullReport), + ImmutableMap.of( + QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId, + QueryContexts.CTX_FULL_REPORT, fullReport, + QueryContexts.ENGINE, DartSqlEngine.NAME + ), Collections.emptyList() ); @@ -790,12 +804,12 @@ public void test_cancelQuery_regularUser_unknownQuery() .thenReturn(makeAuthenticationResult(REGULAR_USER_NAME)); final Response cancellationResponse = sqlResource.cancelQuery("nonexistent", httpServletRequest); - Assertions.assertEquals(Response.Status.ACCEPTED.getStatusCode(), cancellationResponse.getStatus()); + Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), cancellationResponse.getStatus()); } /** * Add a mock {@link ControllerHolder} to {@link #controllerRegistry}, with a query run by the given user. - * Used by methods that test {@link DartSqlResource#doGetRunningQueries}. + * Used by methods that test {@link SqlResource#doGetRunningQueries}. * * @return the mock holder */ diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java deleted file mode 100644 index bffaace57459..000000000000 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.msq.dart.controller.http; - -import com.fasterxml.jackson.databind.ObjectMapper; -import nl.jqno.equalsverifier.EqualsVerifier; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.msq.dart.controller.ControllerHolder; -import org.apache.druid.segment.TestHelper; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.Collections; - -public class GetQueriesResponseTest -{ - @Test - public void test_serde() throws Exception - { - final ObjectMapper jsonMapper = TestHelper.JSON_MAPPER; - final GetQueriesResponse response = new GetQueriesResponse( - Collections.singletonList( - new DartQueryInfo( - "xyz", - "abc", - "SELECT 1", - "localhost:1001", - "auth", - "anon", - DateTimes.of("2000"), - ControllerHolder.State.RUNNING.toString() - ) - ) - ); - final GetQueriesResponse response2 = - jsonMapper.readValue(jsonMapper.writeValueAsBytes(response), GetQueriesResponse.class); - Assertions.assertEquals(response, response2); - } - - @Test - public void test_equals() - { - EqualsVerifier.forClass(GetQueriesResponse.class).usingGetClass().verify(); - } -} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java index 114ea9c72070..34be09e5bb00 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java @@ -27,9 +27,10 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.msq.dart.controller.ControllerHolder; import org.apache.druid.msq.dart.controller.http.DartQueryInfo; -import org.apache.druid.msq.dart.controller.http.GetQueriesResponse; +import org.apache.druid.msq.dart.guice.DartWorkerModule; import org.apache.druid.rpc.MockServiceClient; import org.apache.druid.rpc.RequestBuilder; +import org.apache.druid.sql.http.GetQueriesResponse; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.jupiter.api.AfterEach; @@ -49,7 +50,7 @@ public class DartSqlClientImplTest @BeforeEach public void setup() { - jsonMapper = new DefaultObjectMapper(); + jsonMapper = new DefaultObjectMapper().registerModules(new DartWorkerModule().getJacksonModules()); serviceClient = new MockServiceClient(); dartSqlClient = new DartSqlClientImpl(serviceClient, jsonMapper); } @@ -79,7 +80,7 @@ public void test_getMessages_all() throws Exception ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.GET, "/"), + new RequestBuilder(HttpMethod.GET, "/queries"), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), jsonMapper.writeValueAsBytes(getQueriesResponse) @@ -108,7 +109,7 @@ public void test_getMessages_selfOnly() throws Exception ); serviceClient.expectAndRespond( - new RequestBuilder(HttpMethod.GET, "/?selfOnly"), + new RequestBuilder(HttpMethod.GET, "/queries?selfOnly"), HttpResponseStatus.OK, ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), jsonMapper.writeValueAsBytes(getQueriesResponse) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java index 792287f7fcc4..0d72c6f2b503 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java @@ -56,7 +56,7 @@ public MSQTaskSqlEngine createEngine( MSQTestOverlordServiceClient indexingServiceClient, MSQTaskQueryKitSpecFactory queryKitSpecFactory) { - return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper, new SegmentGenerationTerminalStageSpecFactory(), queryKitSpecFactory); + return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper, new SegmentGenerationTerminalStageSpecFactory(), queryKitSpecFactory, null); } @Provides diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java index c45c540b1c50..641836f5a265 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java @@ -44,11 +44,13 @@ import org.apache.druid.sql.avatica.DruidMeta; import org.apache.druid.sql.calcite.TempDirProducer; import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.DruidModuleCollection; import org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier; import org.apache.druid.sql.calcite.util.datasets.TestDataSet; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -113,7 +115,7 @@ final ServiceClientFactory getServiceClientFactory(HttpClient ht) @Provides final DruidNodeDiscoveryProvider getDiscoveryProvider() { - return null; + return new CalciteTests.FakeDruidNodeDiscoveryProvider(Collections.emptyMap()); } @Override diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 9e5b407a66f0..2495251dec06 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -217,7 +217,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -581,7 +580,8 @@ public String getFormatString() indexingServiceClient, qf.queryJsonMapper().copy().registerModules(new MSQSqlModule().getJacksonModules()), new SegmentGenerationTerminalStageSpecFactory(), - injector.getInstance(MSQTaskQueryKitSpecFactory.class) + injector.getInstance(MSQTaskQueryKitSpecFactory.class), + null ); PlannerFactory plannerFactory = new PlannerFactory( diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java index d3fea48f56e3..6f3b049896a7 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java @@ -36,6 +36,8 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.auth.BasicCredentials; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; @@ -113,6 +115,10 @@ public abstract class AbstractAuthConfigurationTest new Resource("auth_test", ResourceType.DATASOURCE), Action.READ ), + new ResourceAction( + new Resource(QueryContexts.ENGINE, ResourceType.QUERY_CONTEXT), + Action.WRITE + ), new ResourceAction( new Resource("auth_test_ctx", ResourceType.QUERY_CONTEXT), Action.WRITE @@ -815,7 +821,9 @@ protected StatusResponseHolder makeDartQueryRequest( HttpResponseStatus expectedStatus ) throws Exception { - return makeSQLQueryRequest(httpClient, query, "/druid/v2/sql/dart", context, expectedStatus); + final Map dartContext = new HashMap<>(context); + dartContext.put(QueryContexts.ENGINE, DartSqlEngine.NAME); + return makeSQLQueryRequest(httpClient, query, "/druid/v2/sql", dartContext, expectedStatus); } protected StatusResponseHolder makeSQLQueryRequest( diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 7dfa55699465..d5368425ed07 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -471,6 +471,15 @@ public long getMaxScatterGatherBytes() return getLong(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE); } + public String getEngine() + { + return QueryContexts.parseString( + context, + QueryContexts.ENGINE, + QueryContexts.DEFAULT_ENGINE + ); + } + public boolean hasTimeout() { return getTimeout() != QueryContexts.NO_TIMEOUT; diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 6ecfaddaa78d..d288775977c4 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -29,7 +29,6 @@ import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; - import java.math.BigDecimal; import java.util.Arrays; import java.util.HashMap; @@ -90,6 +89,7 @@ public class QueryContexts public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit"; public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold"; public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled"; + public static final String ENGINE = "engine"; // this flag controls whether the topN engine can use the 'pooled' algorithm when query granularity is set to // anything other than 'ALL' and the cardinality + number of aggregators would require more size than is available // in the buffers and so must reset the cursor to use multiple passes. This is likely slower than the default @@ -158,6 +158,7 @@ public class QueryContexts public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true; public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false; public static final CloneQueryMode DEFAULT_CLONE_QUERY_MODE = CloneQueryMode.EXCLUDECLONES; + public static final String DEFAULT_ENGINE = "native"; public static final boolean DEFAULT_ENABLE_REWRITE_JOIN_TO_FILTER = true; public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000; public static final boolean DEFAULT_ENABLE_SQL_JOIN_LEFT_SCAN_DIRECT = false; diff --git a/server/src/main/java/org/apache/druid/server/security/AuthConfig.java b/server/src/main/java/org/apache/druid/server/security/AuthConfig.java index c099e766f7e2..3a664594d899 100644 --- a/server/src/main/java/org/apache/druid/server/security/AuthConfig.java +++ b/server/src/main/java/org/apache/druid/server/security/AuthConfig.java @@ -57,7 +57,7 @@ public class AuthConfig public static final Set ALLOWED_CONTEXT_KEYS = ImmutableSet.of( // Set in the Avatica server path QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, - // Set in DartSqlResource + // Set in DartSqlEngine QueryContexts.CTX_DART_QUERY_ID, // Set by the Router QueryContexts.CTX_SQL_QUERY_ID diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java index faf8d64375c9..f4f36c28b73f 100644 --- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java +++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java @@ -25,7 +25,6 @@ import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.server.QueryResponse; @@ -36,7 +35,6 @@ import org.apache.druid.sql.calcite.planner.PrepareResult; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; /** * Lifecycle for direct SQL statement execution, which means that the query @@ -68,8 +66,6 @@ */ public class DirectStatement extends AbstractStatement implements Cancelable { - private static final Logger log = new Logger(DirectStatement.class); - /** * Represents the execution plan for a query with the ability to run * that plan (once). @@ -302,12 +298,9 @@ public void cancel() return; } state = State.CANCELLED; - final CopyOnWriteArrayList nativeQueryIds = plannerContext.getNativeQueryIds(); - for (String nativeQueryId : nativeQueryIds) { - log.debug("Canceling native query [%s]", nativeQueryId); - sqlToolbox.queryScheduler.cancelQuery(nativeQueryId); - } + // Give control to the engine to do engine specific things. + sqlToolbox.engine.cancelQuery(plannerContext, sqlToolbox.queryScheduler); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index 16228a19e127..1562da6d92cf 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.run; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import org.apache.calcite.rel.RelRoot; @@ -27,10 +28,14 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.druid.error.InvalidSqlInput; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.JoinAlgorithm; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.QueryScheduler; +import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; import org.apache.druid.sql.calcite.parser.DruidSqlReplace; import org.apache.druid.sql.calcite.planner.PlannerContext; @@ -39,10 +44,13 @@ import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; @LazySingleton public class NativeSqlEngine implements SqlEngine { + private static final Logger LOG = new Logger(NativeSqlEngine.class); + public static final Set SYSTEM_CONTEXT_PARAMETERS = ImmutableSet.of( TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME, TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME, @@ -54,19 +62,34 @@ public class NativeSqlEngine implements SqlEngine DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS ); - private static final String NAME = "native"; + public static final String NAME = "native"; private final QueryLifecycleFactory queryLifecycleFactory; private final ObjectMapper jsonMapper; + private final SqlStatementFactory sqlStatementFactory; @Inject public NativeSqlEngine( final QueryLifecycleFactory queryLifecycleFactory, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final SqlToolbox toolbox + ) + { + this.queryLifecycleFactory = queryLifecycleFactory; + this.jsonMapper = jsonMapper; + this.sqlStatementFactory = new SqlStatementFactory(toolbox.withEngine(this)); + } + + @VisibleForTesting + public NativeSqlEngine( + final QueryLifecycleFactory queryLifecycleFactory, + final ObjectMapper jsonMapper, + final SqlStatementFactory sqlStatementFactory ) { this.queryLifecycleFactory = queryLifecycleFactory; this.jsonMapper = jsonMapper; + this.sqlStatementFactory = sqlStatementFactory; } @Override @@ -164,4 +187,21 @@ private static void validateJoinAlgorithm(final Map queryContext throw InvalidSqlInput.exception("Join algorithm [%s] is not supported by engine [%s]", joinAlgorithm, NAME); } } + + @Override + public SqlStatementFactory getSqlStatementFactory() + { + return sqlStatementFactory; + } + + @Override + public void cancelQuery(PlannerContext plannerContext, QueryScheduler queryScheduler) + { + final CopyOnWriteArrayList nativeQueryIds = plannerContext.getNativeQueryIds(); + + for (String nativeQueryId : nativeQueryIds) { + LOG.debug("Canceling native query [%s]", nativeQueryId); + queryScheduler.cancelQuery(nativeQueryId); + } + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java index 3f906b9ea402..9e2a3930ab52 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java @@ -23,9 +23,16 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.tools.ValidationException; +import org.apache.druid.error.DruidException; +import org.apache.druid.server.QueryScheduler; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.AuthorizationResult; +import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.destination.IngestDestination; +import org.apache.druid.sql.http.QueryInfo; +import java.util.List; import java.util.Map; /** @@ -116,4 +123,32 @@ QueryMaker buildQueryMakerForInsert( default void initContextMap(Map contextMap) { } + + /** + * Returns a {@link SqlStatementFactory} which uses this engine to create statements. + */ + SqlStatementFactory getSqlStatementFactory(); + + /** + * Returns a list of {@link QueryInfo} containing the currently running queries using this engine. Returns an empty + * list if the operation is not supported. + */ + default List getRunningQueries( + boolean selfOnly, + AuthenticationResult authenticationResult, + AuthorizationResult authorizationResult + ) + { + return List.of(); + } + + /** + * Cancels a currently running query given the {@link PlannerContext} for the query. + */ + default void cancelQuery(PlannerContext plannerContext, QueryScheduler queryScheduler) + { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.UNSUPPORTED) + .build("Engine[%s] does not support canceling queries", name()); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java index 7563b45d52bc..340649166555 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java @@ -22,6 +22,7 @@ import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.run.QueryMaker; @@ -126,4 +127,11 @@ public QueryMaker buildQueryMakerForInsert(IngestDestination destination, RelRoo // Can't have views of INSERT or REPLACE statements. throw new UnsupportedOperationException(); } + + @Override + public SqlStatementFactory getSqlStatementFactory() + { + // View engine does not execute queries. + throw new UnsupportedOperationException(); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java index 56d0d2d5d41f..06b3b8b8b198 100644 --- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java +++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java @@ -26,6 +26,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.multibindings.Multibinder; import org.apache.druid.catalog.model.TableDefnRegistry; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.PolyBind; @@ -44,6 +45,7 @@ import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.run.NativeSqlEngine; +import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule; import org.apache.druid.sql.calcite.schema.DruidSchemaManager; import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager; @@ -124,6 +126,12 @@ public void configure(Binder binder) // Default do-nothing catalog resolver binder.bind(CatalogResolver.class).toInstance(CatalogResolver.NULL_RESOLVER); + + // Bind the engine + Multibinder.newSetBinder(binder, SqlEngine.class) + .addBinding() + .to(NativeSqlEngine.class) + .in(LazySingleton.class); } private boolean isEnabled() diff --git a/sql/src/main/java/org/apache/druid/sql/http/EngineInfo.java b/sql/src/main/java/org/apache/druid/sql/http/EngineInfo.java new file mode 100644 index 000000000000..cd969a18a10f --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/http/EngineInfo.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class EngineInfo +{ + private final String name; + + @JsonCreator + public EngineInfo(@JsonProperty("name") String name) + { + this.name = name; + } + + @JsonProperty + public String getName() + { + return name; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EngineInfo that = (EngineInfo) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() + { + return Objects.hashCode(name); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java b/sql/src/main/java/org/apache/druid/sql/http/GetQueriesResponse.java similarity index 83% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java rename to sql/src/main/java/org/apache/druid/sql/http/GetQueriesResponse.java index 2d1f87f860c5..6f889be45723 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java +++ b/sql/src/main/java/org/apache/druid/sql/http/GetQueriesResponse.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.msq.dart.controller.http; +package org.apache.druid.sql.http; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -26,20 +26,20 @@ import java.util.Objects; /** - * Class returned by {@link DartSqlResource#doGetRunningQueries}, the "list all queries" API. + * Class returned by {@link SqlResource#doGetRunningQueries}, the "list all queries" API. */ public class GetQueriesResponse { - private final List queries; + private final List queries; @JsonCreator - public GetQueriesResponse(@JsonProperty("queries") List queries) + public GetQueriesResponse(@JsonProperty("queries") List queries) { this.queries = queries; } @JsonProperty - public List getQueries() + public List getQueries() { return queries; } diff --git a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java new file mode 100644 index 000000000000..d3ec14cde503 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "engine") +public interface QueryInfo +{ +} diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlEngineRegistry.java b/sql/src/main/java/org/apache/druid/sql/http/SqlEngineRegistry.java new file mode 100644 index 000000000000..1cf5b23ef24e --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlEngineRegistry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.google.inject.Inject; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.server.initialization.jetty.BadRequestException; +import org.apache.druid.sql.calcite.run.SqlEngine; + +import javax.validation.constraints.NotNull; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class SqlEngineRegistry +{ + + private final Map engines; + + @Inject + public SqlEngineRegistry(Set engineSet) + { + engines = engineSet.stream().collect(Collectors.toMap(SqlEngine::name, engine -> engine)); + } + + @NotNull + public SqlEngine getEngine(final String engineName) + { + SqlEngine engine = engines.getOrDefault(engineName == null ? QueryContexts.DEFAULT_ENGINE : engineName, null); + if (engine == null) { + throw new BadRequestException("Unsupported engine"); + } + return engine; + } + + public Set getSupportedEngines() + { + return engines.keySet(); + } + + public Collection getAllEngines() + { + return engines.values(); + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java index 1fc64ccded0d..2a12de5f66c8 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java @@ -21,8 +21,10 @@ import com.google.inject.Binder; import com.google.inject.Module; +import com.google.inject.multibindings.Multibinder; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.sql.calcite.run.SqlEngine; /** * The Module responsible for providing bindings to the SQL http endpoint @@ -33,6 +35,7 @@ public class SqlHttpModule implements Module public void configure(Binder binder) { binder.bind(SqlResource.class).in(LazySingleton.class); + Multibinder.newSetBinder(binder, SqlEngine.class); Jerseys.addResource(binder, SqlResource.class); } } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index 5199500bd494..2c4ed3c38a0e 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -20,11 +20,11 @@ package org.apache.druid.sql.http; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.sun.jersey.api.core.HttpContext; import org.apache.druid.common.exception.SanitizableException; -import org.apache.druid.guice.annotations.NativeQuery; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -35,71 +35,128 @@ import org.apache.druid.server.QueryResultPusher; import org.apache.druid.server.ResponseContextConfig; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthenticationResult; import org.apache.druid.server.security.AuthorizationResult; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.DirectStatement.ResultSet; import org.apache.druid.sql.HttpStatement; import org.apache.druid.sql.SqlLifecycleManager; import org.apache.druid.sql.SqlLifecycleManager.Cancelable; import org.apache.druid.sql.SqlRowTransformer; -import org.apache.druid.sql.SqlStatementFactory; +import org.apache.druid.sql.calcite.run.SqlEngine; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DELETE; +import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -@Path("/druid/v2/sql/") +@Path(SqlResource.PATH) public class SqlResource { + public static final String PATH = "/druid/v2/sql/"; public static final String SQL_QUERY_ID_RESPONSE_HEADER = "X-Druid-SQL-Query-Id"; public static final String SQL_HEADER_RESPONSE_HEADER = "X-Druid-SQL-Header-Included"; public static final String SQL_HEADER_VALUE = "yes"; + private static final Logger log = new Logger(SqlResource.class); - public static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER = new SqlResourceQueryMetricCounter(); + private static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER = new SqlResourceQueryMetricCounter(); private final ObjectMapper jsonMapper; private final AuthorizerMapper authorizerMapper; - private final SqlStatementFactory sqlStatementFactory; - private final SqlLifecycleManager sqlLifecycleManager; private final ServerConfig serverConfig; private final ResponseContextConfig responseContextConfig; private final DruidNode selfNode; + private final SqlLifecycleManager sqlLifecycleManager; + private final SqlEngineRegistry sqlEngineRegistry; + @VisibleForTesting @Inject - protected SqlResource( + public SqlResource( final ObjectMapper jsonMapper, final AuthorizerMapper authorizerMapper, - final @NativeQuery SqlStatementFactory sqlStatementFactory, - final SqlLifecycleManager sqlLifecycleManager, final ServerConfig serverConfig, + final SqlLifecycleManager sqlLifecycleManager, + final SqlEngineRegistry sqlEngineRegistry, ResponseContextConfig responseContextConfig, @Self DruidNode selfNode ) { + this.sqlEngineRegistry = Preconditions.checkNotNull(sqlEngineRegistry, "sqlEngineRegistry"); this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper"); this.authorizerMapper = Preconditions.checkNotNull(authorizerMapper, "authorizerMapper"); - this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory, "sqlStatementFactory"); - this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager"); this.serverConfig = Preconditions.checkNotNull(serverConfig, "serverConfig"); this.responseContextConfig = responseContextConfig; this.selfNode = selfNode; + this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, "sqlLifecycleManager"); + } + + @GET + @Path("/engines") + @Produces(MediaType.APPLICATION_JSON) + public Response getSupportedEngines(@Context final HttpServletRequest request) + { + AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request); + Set engines = sqlEngineRegistry.getSupportedEngines() + .stream() + .map(EngineInfo::new) + .collect(Collectors.toSet()); + return Response.ok(new SupportedEnginesResponse(engines)).build(); + } + + /** + * API to list all running queries, for all engines that supports such listings. + * + * @param selfOnly if true, return queries running on this server. If false, return queries running on all servers. + * @param request http request. + */ + @GET + @Path("/queries") + @Produces(MediaType.APPLICATION_JSON) + public Response doGetRunningQueries( + @QueryParam("selfOnly") final String selfOnly, + @Context final HttpServletRequest request + ) + { + final AuthenticationResult authenticationResult = AuthorizationUtils.authenticationResultFromRequest(request); + final AuthorizationResult stateReadAccess = AuthorizationUtils.authorizeAllResourceActions( + authenticationResult, + Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, Action.READ)), + authorizerMapper + ); + + final Collection engines = sqlEngineRegistry.getAllEngines(); + final List queries = new ArrayList<>(); + + // Get running queries from all engines that support it. + for (SqlEngine sqlEngine : engines) { + queries.addAll(sqlEngine.getRunningQueries(selfOnly != null, authenticationResult, stateReadAccess)); + } + + AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request); + return Response.ok().entity(new GetQueriesResponse(queries)).build(); } @POST @@ -114,14 +171,16 @@ public Response doPost( } /** - * This method is defined as public so that subclasses like Dart or test can access it + * This method is defined as public so that tests can access it */ public Response doPost( final SqlQuery sqlQuery, final HttpServletRequest req ) { - final HttpStatement stmt = sqlStatementFactory.httpStatement(sqlQuery, req); + final String engineName = sqlQuery.queryContext().getEngine(); + final SqlEngine engine = sqlEngineRegistry.getEngine(engineName); + final HttpStatement stmt = engine.getSqlStatementFactory().httpStatement(sqlQuery, req); final String sqlQueryId = stmt.sqlQueryId(); final String currThreadName = Thread.currentThread().getName(); diff --git a/sql/src/main/java/org/apache/druid/sql/http/SupportedEnginesResponse.java b/sql/src/main/java/org/apache/druid/sql/http/SupportedEnginesResponse.java new file mode 100644 index 000000000000..89108f4ab17d --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/http/SupportedEnginesResponse.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Set; + +/** + * Class returned by {@link SqlResource#getSupportedEngines}, the supported engines API. + */ +public class SupportedEnginesResponse +{ + private final Set engines; + + @JsonCreator + public SupportedEnginesResponse(@JsonProperty("engines") Set engines) + { + this.engines = engines; + } + + @JsonProperty + public Set getEngines() + { + return engines; + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java index 02a2a168034d..2010c335480c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java @@ -27,6 +27,7 @@ import org.apache.calcite.tools.ValidationException; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.CalciteScanSignatureTest.ScanSignatureComponentSupplier; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.planner.PlannerContext; @@ -183,6 +184,12 @@ public QueryMaker buildQueryMakerForInsert(IngestDestination destination, RelRoo { throw new UnsupportedOperationException(); } + + @Override + public SqlStatementFactory getSqlStatementFactory() + { + throw new UnsupportedOperationException(); + } } } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java index 8cb2a974e6d1..f2b9a26239dc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.run.QueryMaker; @@ -126,4 +127,10 @@ public QueryMaker buildQueryMakerForInsert(IngestDestination destination, RelRoo return new TestInsertQueryMaker(destination, signature); } + + @Override + public SqlStatementFactory getSqlStatementFactory() + { + throw new UnsupportedOperationException(); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java index 52aadd54839c..eb803186d4c5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java @@ -41,6 +41,7 @@ import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceType; +import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.aggregation.SqlAggregator; import org.apache.druid.sql.calcite.expression.SqlOperatorConversion; import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider; @@ -186,7 +187,7 @@ public void testExtensionCalciteRule() PlannerContext context = PlannerContext.create( toolbox, "SELECT 1", - new NativeSqlEngine(queryLifecycleFactory, mapper), + new NativeSqlEngine(queryLifecycleFactory, mapper, (SqlStatementFactory) null), Collections.emptyMap(), null ); @@ -206,7 +207,7 @@ public void testConfigurableBloat() PlannerContext contextWithBloat = PlannerContext.create( toolbox, "SELECT 1", - new NativeSqlEngine(queryLifecycleFactory, mapper), + new NativeSqlEngine(queryLifecycleFactory, mapper, (SqlStatementFactory) null), Collections.singletonMap(BLOAT_PROPERTY, BLOAT), null ); @@ -214,7 +215,7 @@ public void testConfigurableBloat() PlannerContext contextWithoutBloat = PlannerContext.create( toolbox, "SELECT 1", - new NativeSqlEngine(queryLifecycleFactory, mapper), + new NativeSqlEngine(queryLifecycleFactory, mapper, (SqlStatementFactory) null), Collections.emptyMap(), null ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index c314a4e075a3..5e59e8a1e03f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -79,6 +79,7 @@ import org.apache.druid.server.security.Escalator; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.ResourceType; +import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.calcite.BaseCalciteQueryTest; import org.apache.druid.sql.calcite.aggregation.SqlAggregationModule; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; @@ -280,7 +281,16 @@ public static NativeSqlEngine createMockSqlEngine( final QueryRunnerFactoryConglomerate conglomerate ) { - return new NativeSqlEngine(createMockQueryLifecycleFactory(walker, conglomerate), getJsonMapper()); + return createMockSqlEngine(walker, conglomerate, null); + } + + public static NativeSqlEngine createMockSqlEngine( + final QuerySegmentWalker walker, + final QueryRunnerFactoryConglomerate conglomerate, + final SqlStatementFactory sqlStatementFactory + ) + { + return new NativeSqlEngine(createMockQueryLifecycleFactory(walker, conglomerate), getJsonMapper(), sqlStatementFactory); } public static QueryLifecycleFactory createMockQueryLifecycleFactory( @@ -522,7 +532,7 @@ public ListenableFuture go( /** * A fake {@link DruidNodeDiscoveryProvider} for {@link #createMockSystemSchema}. */ - private static class FakeDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider + public static class FakeDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider { private final Map nodeDiscoveries; diff --git a/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java new file mode 100644 index 000000000000..cc650ae27253 --- /dev/null +++ b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.segment.TestHelper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class GetQueriesResponseTest +{ + private static ObjectMapper jsonMapper; + + @BeforeAll + static void setUp() + { + jsonMapper = TestHelper.makeJsonMapper().registerModules(getJacksonModules()); + } + + @Test + public void test_serde() throws Exception + { + final GetQueriesResponse response = new GetQueriesResponse( + Collections.singletonList( + new TestQueryInfo( + "query", + "xyz", + "abc" + ) + ) + ); + final GetQueriesResponse response2 = + jsonMapper.readValue(jsonMapper.writeValueAsBytes(response), GetQueriesResponse.class); + Assertions.assertEquals(response, response2); + } + + @Test + public void test_equals() + { + EqualsVerifier.forClass(GetQueriesResponse.class).usingGetClass().verify(); + } + + static class TestQueryInfo implements QueryInfo + { + private final String query; + private final String identity; + private final String authenticator; + + @JsonCreator + public TestQueryInfo( + @JsonProperty("query") String query, + @JsonProperty("identity") String identity, + @JsonProperty("authenticator") String authenticator + ) + { + this.query = query; + this.identity = identity; + this.authenticator = authenticator; + } + + @JsonProperty + public String getQuery() + { + return query; + } + + @JsonProperty + public String getIdentity() + { + return identity; + } + + @JsonProperty + public String getAuthenticator() + { + return authenticator; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestQueryInfo that = (TestQueryInfo) o; + return Objects.equals(query, that.query) + && Objects.equals(identity, that.identity) + && Objects.equals(authenticator, that.authenticator); + } + + @Override + public int hashCode() + { + return Objects.hash(query, identity, authenticator); + } + } + + private static List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("TestModule").registerSubtypes( + new NamedType( + TestQueryInfo.class, + "test" + ) + ) + ); + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java index ff9c41eab9a1..483ca1a09d12 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java @@ -66,7 +66,7 @@ public void setUp() binder -> { binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper); binder.bind(AuthorizerMapper.class).toInstance(new AuthorizerMapper(Collections.emptyMap())); - binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new NativeSqlEngine(null, null))); + binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new NativeSqlEngine(null, null, (SqlStatementFactory) null))); binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(SqlResourceTest.DUMMY_DRUID_NODE); binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG); binder.bind(SqlStatementFactory.class) diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index f18873668ea2..dce1953a9d82 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -278,9 +278,8 @@ public void add(String sqlQueryId, Cancelable lifecycle) stubServiceEmitter = new StubServiceEmitter("test", "test"); final AuthConfig authConfig = new AuthConfig(); final DefaultQueryConfig defaultQueryConfig = new DefaultQueryConfig(ImmutableMap.of()); - engine = CalciteTests.createMockSqlEngine(walker, conglomerate); final SqlToolbox sqlToolbox = new SqlToolbox( - engine, + null, plannerFactory, stubServiceEmitter, testRequestLogger, @@ -297,7 +296,7 @@ public HttpStatement httpStatement( ) { TestHttpStatement stmt = new TestHttpStatement( - sqlToolbox, + sqlToolbox.withEngine(engine), sqlQuery, req, validateAndAuthorizeLatchSupplier, @@ -323,12 +322,13 @@ public PreparedStatement preparedStatement(SqlQueryPlus sqlRequest) throw new UnsupportedOperationException(); } }; + engine = CalciteTests.createMockSqlEngine(walker, conglomerate, sqlStatementFactory); resource = new SqlResource( JSON_MAPPER, CalciteTests.TEST_AUTHORIZER_MAPPER, - sqlStatementFactory, - lifecycleManager, new ServerConfig(), + lifecycleManager, + new SqlEngineRegistry(Set.of(engine)), TEST_RESPONSE_CONTEXT_CONFIG, DUMMY_DRUID_NODE ); @@ -402,6 +402,14 @@ public void testCountStar() throws Exception Assert.assertTrue(lifecycleManager.getAll("id").isEmpty()); } + @Test + public void test_getEnabled() + { + Response response = resource.getSupportedEngines(req); + Set supportedEngines = ((SupportedEnginesResponse) response.getEntity()).getEngines(); + Assert.assertTrue(supportedEngines.contains(new EngineInfo(NativeSqlEngine.NAME))); + } + @Test public void testCountStarWithMissingIntervalsContext() throws Exception { @@ -1632,8 +1640,6 @@ public void testUnsupportedQueryThrowsExceptionWithFilterResponse() throws Excep resource = new SqlResource( JSON_MAPPER, CalciteTests.TEST_AUTHORIZER_MAPPER, - sqlStatementFactory, - lifecycleManager, new ServerConfig() { @Override @@ -1648,6 +1654,8 @@ public ErrorResponseTransformStrategy getErrorResponseTransformStrategy() return new AllowedRegexErrorResponseTransformStrategy(ImmutableList.of()); } }, + lifecycleManager, + new SqlEngineRegistry(Set.of(engine)), TEST_RESPONSE_CONTEXT_CONFIG, DUMMY_DRUID_NODE );