diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6e0f34583c69..f6ddebcb0b24 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1892,6 +1892,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.planner.useNativeQueryExplain`|If true, `EXPLAIN PLAN FOR` will return the explain plan as a JSON representation of equivalent native query(s), else it will return the original version of explain plan generated by Calcite. It can be overridden per query with `useNativeQueryExplain` context key.|true| |`druid.sql.planner.maxNumericInFilters`|Max limit for the amount of numeric values that can be compared for a string type dimension when the entire SQL WHERE clause of a query translates to an [OR](../querying/filters.md#or) of [Bound filter](../querying/filters.md#bound-filter). By default, Druid does not restrict the amount of numeric Bound Filters on String columns, although this situation may block other queries from running. Set this property to a smaller value to prevent Druid from running queries that have prohibitively long segment processing times. The optimal limit requires some trial and error; we recommend starting with 100. Users who submit a query that exceeds the limit of `maxNumericInFilters` should instead rewrite their queries to use strings in the `WHERE` clause instead of numbers. For example, `WHERE someString IN (‘123’, ‘456’)`. If this value is disabled, `maxNumericInFilters` set through query context is ignored.|`-1` (disabled)| |`druid.sql.approxCountDistinct.function`|Implementation to use for the [`APPROX_COUNT_DISTINCT` function](../querying/sql-aggregations.md). Without extensions loaded, the only valid value is `APPROX_COUNT_DISTINCT_BUILTIN` (a HyperLogLog, or HLL, based implementation). If the [DataSketches extension](../development/extensions-core/datasketches-extension.md) is loaded, this can also be `APPROX_COUNT_DISTINCT_DS_HLL` (alternative HLL implementation) or `APPROX_COUNT_DISTINCT_DS_THETA`.

Theta sketches use significantly more memory than HLL sketches, so you should prefer one of the two HLL implementations.|`APPROX_COUNT_DISTINCT_BUILTIN`| +|`druid.sql.planner.enableSysQueriesTable`|**Experimental.** Whether to enable the [`sys.queries` table](../querying/sql-metadata-tables.md#queries-table), which provides information about currently running and recently completed SQL queries. Currently only queries from the Dart (MSQ) engine are shown.|false| :::info Previous versions of Druid had properties named `druid.sql.planner.maxQueryCount` and `druid.sql.planner.maxSemiJoinRowsInMemory`. diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index 1346faebb71c..802d88c567c9 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -335,4 +335,37 @@ For example, to retrieve properties for a specific server, use the query ```sql SELECT * FROM sys.server_properties WHERE server='192.168.1.1:8081' -``` \ No newline at end of file +``` + +### QUERIES table + +:::info + The `sys.queries` table is an experimental feature. You must enable it by setting the runtime property + `druid.sql.planner.enableSysQueriesTable=true` on Broker processes. The main reason this table is experimental + is that it only shows queries from the [Dart](dart.md) engine, which is also experimental. +::: + +The queries table provides information about currently running and recently completed SQL queries. + +|Column|Type|Notes| +|------|-----|-----| +|id|VARCHAR|Execution ID for the query. For Dart queries, this is the `dartQueryId`.| +|engine|VARCHAR|SQL engine that executed the query, e.g., `msq-dart`| +|state|VARCHAR|Query status: `ACCEPTED`, `RUNNING`, `SUCCESS`, `FAILED`, or `CANCELED`| +|info|VARCHAR|JSON-serialized query information including `sqlQueryId`, `sql`, `identity`, `startTime`, and other engine-specific details| + +For example, to retrieve all recently completed Dart queries: + +```sql +SELECT * +FROM sys.queries +WHERE + engine = 'msq-dart' + AND state IN ('SUCCESS', 'FAILED', 'CANCELED') +``` + +:::info + The retention of completed query information is controlled by Dart controller configuration. + See `druid.msq.dart.controller.maxRetainedReportCount` and `druid.msq.dart.controller.maxRetainedReportDuration` + for details on how long completed queries are retained. +::: diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml index e37426263aa9..c69db2094a7e 100644 --- a/embedded-tests/pom.xml +++ b/embedded-tests/pom.xml @@ -209,7 +209,10 @@ - + + com.opencsv + opencsv + diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java index 9cc75989ac0e..851b8caebfdf 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java @@ -1006,6 +1006,11 @@ protected void setExpectedSystemSchemaObjects(String dataSource, String taskId) ); } + protected EmbeddedCoordinator getCoordinator() + { + return coordinator; + } + protected String getCoordinatorUrl() { return getServerUrl(coordinator); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java index ce356d185f89..d80844612c9f 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java @@ -196,18 +196,14 @@ private void createUserAndRoleWithPermissions( List permissions ) { - // Setup authentication by creating user and password - postAsAdmin(null, "/authentication/db/basic/users/%s", user); - - final BasicAuthenticatorCredentialUpdate credentials - = new BasicAuthenticatorCredentialUpdate(password, 5000); - postAsAdmin(credentials, "/authentication/db/basic/users/%s/credentials", user); - - // Setup authorization by assigning a role to the user - postAsAdmin(null, "/authorization/db/basic/users/%s", user); - postAsAdmin(null, "/authorization/db/basic/roles/%s", role); - postAsAdmin(null, "/authorization/db/basic/users/%s/roles/%s", user, role); - postAsAdmin(permissions, "/authorization/db/basic/roles/%s/permissions", role); + EmbeddedBasicAuthResource.createUserWithPermissions( + getHttpClient(User.ADMIN), + getCoordinator(), + user, + password, + role, + permissions + ); } private void postAsAdmin( diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java index 3a54954287c3..798e3834c7eb 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java @@ -20,15 +20,25 @@ package org.apache.druid.testing.embedded.auth; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.security.basic.BasicSecurityDruidModule; +import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorCredentialUpdate; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.security.ResourceAction; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedDruidServer; import org.apache.druid.testing.embedded.EmbeddedResource; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.util.List; /** * Resource to enable the basic auth extension in embedded tests. */ public class EmbeddedBasicAuthResource implements EmbeddedResource { + public static final String ADMIN_USER = "admin"; public static final String ADMIN_PASSWORD = "priest"; public static final String SYSTEM_PASSWORD = "warlock"; public static final String SYSTEM_USER = "druid_system"; @@ -65,19 +75,100 @@ public void stop() { // Do nothing } - + private String authenticatorProp(String name) { return StringUtils.format("druid.auth.authenticator.%s.%s", AUTHENTICATOR_NAME, name); } - + private String authorizerProp(String name) { return StringUtils.format("druid.auth.authorizer.%s.%s", AUTHORIZER_NAME, name); } - + private String escalatorProp(String name) { return StringUtils.format("druid.escalator.%s", name); } + + /** + * Creates a user with specified permissions using the basic auth security API. + * + * @param adminClient HTTP client authenticated as admin + * @param coordinator the coordinator server to make API calls against + * @param username the username to create + * @param password the password for the user + * @param roleName the role name to create and assign + * @param permissions the permissions to grant to the role + */ + public static void createUserWithPermissions( + HttpClient adminClient, + EmbeddedDruidServer coordinator, + String username, + String password, + String roleName, + List permissions + ) + { + final DruidNode coordinatorDruidNode = coordinator.bindings().selfNode(); + final String baseUrl = StringUtils.format( + "%s://%s/druid-ext/basic-security", + coordinatorDruidNode.getServiceScheme(), + coordinatorDruidNode.getHostAndPortToUse() + ); + + // Create user in authentication DB + HttpUtil.makeRequest( + adminClient, + HttpMethod.POST, + StringUtils.format("%s/authentication/db/basic/users/%s", baseUrl, username), + null, + HttpResponseStatus.OK + ); + + // Set password + HttpUtil.makeRequest( + adminClient, + HttpMethod.POST, + StringUtils.format("%s/authentication/db/basic/users/%s/credentials", baseUrl, username), + new BasicAuthenticatorCredentialUpdate(password, 5000), + HttpResponseStatus.OK + ); + + // Create user in authorization DB + HttpUtil.makeRequest( + adminClient, + HttpMethod.POST, + StringUtils.format("%s/authorization/db/basic/users/%s", baseUrl, username), + null, + HttpResponseStatus.OK + ); + + // Create role + HttpUtil.makeRequest( + adminClient, + HttpMethod.POST, + StringUtils.format("%s/authorization/db/basic/roles/%s", baseUrl, roleName), + null, + HttpResponseStatus.OK + ); + + // Assign role to user + HttpUtil.makeRequest( + adminClient, + HttpMethod.POST, + StringUtils.format("%s/authorization/db/basic/users/%s/roles/%s", baseUrl, username, roleName), + null, + HttpResponseStatus.OK + ); + + // Grant permissions + HttpUtil.makeRequest( + adminClient, + HttpMethod.POST, + StringUtils.format("%s/authorization/db/basic/roles/%s/permissions", baseUrl, roleName), + permissions, + HttpResponseStatus.OK + ); + } } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java index 891cba4e3b75..c0787c2e21c9 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java @@ -19,15 +19,23 @@ package org.apache.druid.testing.embedded.msq; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.SleepModule; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.report.TaskReport; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.CredentialedHttpClient; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.auth.BasicCredentials; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.msq.dart.controller.http.DartQueryInfo; import org.apache.druid.msq.dart.controller.sql.DartSqlClients; import org.apache.druid.msq.indexing.report.MSQTaskReport; @@ -35,7 +43,14 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.server.metrics.LatchableEmitter; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import org.apache.druid.sql.http.GetQueriesResponse; import org.apache.druid.sql.http.GetQueryReportResponse; +import org.apache.druid.sql.http.QueryInfo; +import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedClusterApis; import org.apache.druid.testing.embedded.EmbeddedCoordinator; @@ -43,16 +58,27 @@ import org.apache.druid.testing.embedded.EmbeddedHistorical; import org.apache.druid.testing.embedded.EmbeddedIndexer; import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.auth.EmbeddedBasicAuthResource; +import org.apache.druid.testing.embedded.auth.HttpUtil; import org.apache.druid.testing.embedded.indexing.MoreResources; import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.junit.internal.matchers.ThrowableMessageMatcher; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; /** * Embedded test for the Dart report API at {@code /druid/v2/sql/queries/{id}/reports}. @@ -62,6 +88,10 @@ public class EmbeddedDartReportApiTest extends EmbeddedClusterTestBase { private static final int MAX_RETAINED_REPORT_COUNT = 10; + // Authentication constants - use shared constants from EmbeddedBasicAuthResource where available + private static final String REGULAR_USER = "regularUser"; + private static final String REGULAR_PASSWORD = "helloworld"; + private final EmbeddedBroker broker1 = new EmbeddedBroker(); private final EmbeddedBroker broker2 = new EmbeddedBroker(); private final EmbeddedIndexer indexer = new EmbeddedIndexer(); @@ -71,12 +101,15 @@ public class EmbeddedDartReportApiTest extends EmbeddedClusterTestBase private EmbeddedMSQApis msqApis; private String ingestedDataSource; + private HttpClient adminClient; + private HttpClient regularUserClient; private void configureBroker(EmbeddedBroker broker, int port) { broker.addProperty("druid.msq.dart.controller.heapFraction", "0.5") .addProperty("druid.msq.dart.controller.maxRetainedReportCount", String.valueOf(MAX_RETAINED_REPORT_COUNT)) .addProperty("druid.query.default.context.maxConcurrentStages", "1") + .addProperty("druid.sql.planner.enableSysQueriesTable", "true") .addProperty("druid.plaintextPort", String.valueOf(port)); } @@ -100,6 +133,7 @@ protected EmbeddedDruidCluster createCluster() return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .addCommonProperty("druid.msq.dart.enabled", "true") + .addResource(new EmbeddedBasicAuthResource()) .useLatchableEmitter() .addServer(coordinator) .addServer(overlord) @@ -115,6 +149,10 @@ protected void setupData() { msqApis = new EmbeddedMSQApis(cluster, overlord); + // Set up HTTP clients for admin and regular user + setupAdminClient(); + setupRegularUserAndClient(); + // Ingest test data once, using batch ingestion. ingestedDataSource = EmbeddedClusterApis.createTestDatasourceName(); final String taskId = IdUtils.getRandomId(); @@ -127,6 +165,41 @@ protected void setupData() cluster.callApi().waitForAllSegmentsToBeAvailable(ingestedDataSource, coordinator, broker2); } + private void setupAdminClient() + { + adminClient = new CredentialedHttpClient( + new BasicCredentials(EmbeddedBasicAuthResource.ADMIN_USER, EmbeddedBasicAuthResource.ADMIN_PASSWORD), + broker1.bindings().globalHttpClient() + ); + } + + /** + * Creates a regular user with only datasource read permission (no STATE READ). + * Username is {@link #REGULAR_USER}, password is {@link #REGULAR_PASSWORD}. + */ + private void setupRegularUserAndClient() + { + // Grant permissions: datasource read access to all datasources, sys.queries table access, but no STATE READ + final List permissions = ImmutableList.of( + new ResourceAction(new Resource(".*", ResourceType.DATASOURCE), Action.READ), + new ResourceAction(new Resource("queries", ResourceType.SYSTEM_TABLE), Action.READ) + ); + + EmbeddedBasicAuthResource.createUserWithPermissions( + adminClient, + coordinator, + REGULAR_USER, + REGULAR_PASSWORD, + "regularRole", + permissions + ); + + regularUserClient = new CredentialedHttpClient( + new BasicCredentials(REGULAR_USER, REGULAR_PASSWORD), + broker1.bindings().globalHttpClient() + ); + } + @Test @Timeout(60) public void test_getQueryReport_forCompletedDartQuery() @@ -174,6 +247,46 @@ public void test_getQueryReport_forCompletedDartQuery() Assertions.assertInstanceOf(MSQTaskReport.class, reportResponse.getReportMap().get(MSQTaskReport.REPORT_KEY)); } + @Test + @Timeout(60) + public void test_sysQueries_returnsRecentlyFinishedQuery() throws IOException + { + final String sqlQueryId = UUID.randomUUID().toString(); + + // Run a Dart query with a specific SQL query ID + final String result = cluster.callApi().runSql( + "SET engine = 'msq-dart';\n" + + "SET sqlQueryId = '%s';\n" + + "SELECT COUNT(*) FROM \"%s\"", + sqlQueryId, + ingestedDataSource + ); + + // Verify the query returned results. + Assertions.assertEquals("10", result); + + // Query sys.queries to find the recently-finished query. + final String sysQueriesText = cluster.callApi().runSql( + "SELECT engine, state, info FROM sys.queries\n" + + "WHERE engine = 'msq-dart'\n" + + "AND info LIKE '%%%s%%'", + sqlQueryId + ).trim(); + + // Verify the query appears in sys.queries with SUCCESS state. + final String[] sysQueriesResult = CsvInputFormat.createOpenCsvParser().parseLine(sysQueriesText); + + Assertions.assertEquals("msq-dart", sysQueriesResult[0]); + Assertions.assertEquals("SUCCESS", sysQueriesResult[1]); + + final DartQueryInfo sysQueriesQueryInfo = (DartQueryInfo) broker1.bindings().jsonMapper().readValue( + sysQueriesResult[2], + QueryInfo.class + ); + + Assertions.assertEquals(sqlQueryId, sysQueriesQueryInfo.getSqlQueryId()); + } + @Test @Timeout(60) public void test_getQueryReport_notFound() @@ -262,7 +375,7 @@ public void test_getQueryReport_forRunningAndCanceledQuery() Assertions.assertEquals(sql, runningQueryInfo.getSql()); Assertions.assertEquals(sqlQueryId, runningQueryInfo.getSqlQueryId()); - // Verify the report is an MSQTaskReport with RUNNING status + // Verify the report is an MSQTaskReport with RUNNING state final MSQTaskReport runningMsqReport = (MSQTaskReport) runningReport.getReportMap().get(MSQTaskReport.REPORT_KEY); Assertions.assertNotNull(runningMsqReport, "MSQ report should not be null"); @@ -317,7 +430,159 @@ public void test_getQueryReport_forRunningAndCanceledQuery() } /** - * Polls the report API until a report is available. + * Test that admin (with STATE READ permission) can see queries from all users, + * while regular user (without STATE READ) can only see their own queries. + */ + @Test + @Timeout(60) + public void test_getRunningQueries_authorization() + { + final String adminQueryId = "admin-query-" + UUID.randomUUID(); + final String regularUserQueryId = "regular-query-" + UUID.randomUUID(); + + // Run a query as admin + runSqlWithClient( + StringUtils.format( + "SET engine = 'msq-dart';\n" + + "SET sqlQueryId = '%s';\n" + + "SELECT COUNT(*) FROM \"%s\"", + adminQueryId, + ingestedDataSource + ), + adminClient + ); + + // Run a query as regular user + runSqlWithClient( + StringUtils.format( + "SET engine = 'msq-dart';\n" + + "SET sqlQueryId = '%s';\n" + + "SELECT COUNT(*) FROM \"%s\"", + regularUserQueryId, + ingestedDataSource + ), + regularUserClient + ); + + // Admin should see both queries + final GetQueriesResponse adminResponse = getRunningQueriesWithClient(adminClient); + Assertions.assertNotNull(adminResponse); + + final List adminVisibleSqlQueryIds = getSqlQueryIds(adminResponse); + Assertions.assertTrue(adminVisibleSqlQueryIds.contains(adminQueryId)); + Assertions.assertTrue(adminVisibleSqlQueryIds.contains(regularUserQueryId)); + + // Admin can get either query report + Assertions.assertNotNull(getReportWithClient(adminQueryId, adminClient)); + Assertions.assertNotNull(getReportWithClient(regularUserQueryId, adminClient)); + + // Regular user should only see their own query + final GetQueriesResponse regularUserResponse = getRunningQueriesWithClient(regularUserClient); + Assertions.assertNotNull(regularUserResponse); + + final List regularUserVisibleSqlQueryIds = getSqlQueryIds(regularUserResponse); + Assertions.assertFalse(regularUserVisibleSqlQueryIds.contains(adminQueryId)); + Assertions.assertTrue(regularUserVisibleSqlQueryIds.contains(regularUserQueryId)); + + // Regular user can get only their own query report + final RuntimeException e = Assertions.assertThrows( + RuntimeException.class, + () -> getReportWithClient(adminQueryId, regularUserClient) + ); + MatcherAssert.assertThat(e, ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("404 Not Found"))); + Assertions.assertNotNull(getReportWithClient(regularUserQueryId, regularUserClient)); + } + + /** + * Test that admin (with STATE READ permission) can see queries from all users in sys.queries, + * while regular user (without STATE READ) can only see their own queries. + */ + @Test + @Timeout(60) + public void test_sysQueries_authorization() throws IOException + { + final String adminQueryId = "admin-sys-query-" + UUID.randomUUID(); + final String regularUserQueryId = "regular-sys-query-" + UUID.randomUUID(); + + // Run a query as admin + runSqlWithClient( + StringUtils.format( + "SET engine = 'msq-dart';\n" + + "SET sqlQueryId = '%s';\n" + + "SELECT COUNT(*) FROM \"%s\"", + adminQueryId, + ingestedDataSource + ), + adminClient + ); + + // Run a query as regular user + runSqlWithClient( + StringUtils.format( + "SET engine = 'msq-dart';\n" + + "SET sqlQueryId = '%s';\n" + + "SELECT COUNT(*) FROM \"%s\"", + regularUserQueryId, + ingestedDataSource + ), + regularUserClient + ); + + // Admin queries sys.queries and should see both queries + final List adminVisibleSqlQueryIds = getSqlQueryIdsFromSysQueries(adminClient); + Assertions.assertTrue(adminVisibleSqlQueryIds.contains(adminQueryId)); + Assertions.assertTrue(adminVisibleSqlQueryIds.contains(regularUserQueryId)); + + // Regular user queries sys.queries and should only see their own query + final List regularUserVisibleSqlQueryIds = getSqlQueryIdsFromSysQueries(regularUserClient); + Assertions.assertFalse(regularUserVisibleSqlQueryIds.contains(adminQueryId)); + Assertions.assertTrue(regularUserVisibleSqlQueryIds.contains(regularUserQueryId)); + } + + /** + * Extracts SQL query IDs from a {@link GetQueriesResponse} for Dart queries only. + */ + private static List getSqlQueryIds(GetQueriesResponse response) + { + return response.getQueries().stream() + .filter(q -> q instanceof DartQueryInfo) + .map(q -> ((DartQueryInfo) q).getSqlQueryId()) + .collect(Collectors.toList()); + } + + /** + * Queries sys.queries and extracts SQL query IDs for Dart queries. + * The info column contains JSON with the sqlQueryId field. + */ + private List getSqlQueryIdsFromSysQueries(HttpClient httpClient) throws IOException + { + final String sysQueriesResult = runSqlWithClient( + "SELECT info FROM sys.queries WHERE engine = 'msq-dart'", + httpClient + ).trim(); + + if (sysQueriesResult.isEmpty()) { + return List.of(); + } + + final List sqlQueryIds = new ArrayList<>(); + for (String line : sysQueriesResult.split("\n")) { + // Each line is a CSV row with the info JSON field; parse it to handle proper CSV escaping + final String[] csvFields = CsvInputFormat.createOpenCsvParser().parseLine(line); + final DartQueryInfo info = (DartQueryInfo) broker1.bindings().jsonMapper().readValue( + csvFields[0], + QueryInfo.class + ); + if (info.getSqlQueryId() == null) { + throw DruidException.defensive("Missing sqlQueryId in info[%s]", info); + } + sqlQueryIds.add(info.getSqlQueryId()); + } + return sqlQueryIds; + } + + /** + * Polls the report API on {@link #broker1} until a report is available. */ private GetQueryReportResponse waitForReport(String sqlQueryId) { @@ -337,4 +602,90 @@ private GetQueryReportResponse waitForReport(String sqlQueryId) } throw new ISE("Timed out after[%,d] ms waiting for query to be in RUNNING state", timeout); } + + /** + * Gets running queries from {@link #broker1} using the provided HTTP client for authentication. + */ + private GetQueryReportResponse getReportWithClient(String queryId, HttpClient httpClient) + { + final String brokerUrl = getBrokerUrl(broker1); + final String url = StringUtils.format( + "%s/druid/v2/sql/queries/%s/reports", + brokerUrl, + StringUtils.urlEncode(queryId) + ); + + final StatusResponseHolder response = HttpUtil.makeRequest( + httpClient, + HttpMethod.GET, + url, + null, + HttpResponseStatus.OK + ); + + try { + return broker1.bindings().jsonMapper().readValue(response.getContent(), GetQueryReportResponse.class); + } + catch (JsonProcessingException e) { + throw DruidException.defensive(e, "Failed to parse GetQueryReportResponse"); + } + } + + /** + * Gets running queries from {@link #broker1} using the provided HTTP client for authentication. + */ + private GetQueriesResponse getRunningQueriesWithClient(HttpClient httpClient) + { + final String brokerUrl = getBrokerUrl(broker1); + final String url = brokerUrl + "/druid/v2/sql/queries?includeComplete"; + + final StatusResponseHolder response = HttpUtil.makeRequest( + httpClient, + HttpMethod.GET, + url, + null, + HttpResponseStatus.OK + ); + + try { + return broker1.bindings().jsonMapper().readValue(response.getContent(), GetQueriesResponse.class); + } + catch (JsonProcessingException e) { + throw DruidException.defensive(e, "Failed to parse GetQueriesResponse"); + } + } + + /** + * Submits a SQL query to {@link #broker1} using the provided HTTP client for authentication. + */ + public String runSqlWithClient( + String sql, + HttpClient httpClient + ) + { + final ClientSqlQuery query = new ClientSqlQuery( + sql, + ResultFormat.CSV.name(), + false, + false, + false, + Map.of(), + null + ); + + final String brokerUrl = getBrokerUrl(broker1); + final StatusResponseHolder response = HttpUtil.makeRequest( + httpClient, + HttpMethod.POST, + brokerUrl + "/druid/v2/sql", + query, + HttpResponseStatus.OK + ); + return response.getContent(); + } + + private String getBrokerUrl(EmbeddedBroker broker) + { + return broker.bindings().selfNode().getUriToUse().toString(); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java index 6bbffdd6fa4e..a0b9f7c34f91 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java @@ -21,13 +21,16 @@ import com.google.common.base.Preconditions; import org.apache.druid.msq.dart.worker.WorkerId; +import org.apache.druid.msq.exec.CaptureReportQueryListener; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.ControllerContext; import org.apache.druid.msq.exec.QueryListener; import org.apache.druid.msq.indexing.error.CancellationReason; import org.apache.druid.msq.indexing.error.MSQErrorReport; import org.apache.druid.msq.indexing.error.WorkerFailedFault; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.http.StandardQueryState; import org.joda.time.DateTime; import java.util.concurrent.atomic.AtomicReference; @@ -42,17 +45,39 @@ public enum State /** * Query has been accepted, but not yet {@link Controller#run(QueryListener)}. */ - ACCEPTED, + ACCEPTED(StandardQueryState.ACCEPTED), /** * Query has had {@link Controller#run(QueryListener)} called. */ - RUNNING, + RUNNING(StandardQueryState.RUNNING), /** * Query has been canceled. */ - CANCELED + CANCELED(StandardQueryState.CANCELED), + + /** + * Query has exited successfully. + */ + SUCCESS(StandardQueryState.SUCCESS), + + /** + * Query has failed. + */ + FAILED(StandardQueryState.FAILED); + + private final String statusString; + + State(String statusString) + { + this.statusString = statusString; + } + + public String getStatusString() + { + return statusString; + } } private final Controller controller; @@ -152,7 +177,12 @@ public void workerOffline(final WorkerId workerId) */ public void cancel(CancellationReason reason) { - if (state.getAndSet(State.CANCELED) == State.RUNNING) { + if (state.compareAndSet(State.ACCEPTED, State.CANCELED)) { + // No need to call stop() since run() wasn't called. + return; + } + + if (state.compareAndSet(State.RUNNING, State.CANCELED)) { controller.stop(reason); } } @@ -166,10 +196,25 @@ public void cancel(CancellationReason reason) public boolean run(final QueryListener listener) throws Exception { if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) { - controller.run(listener); + final CaptureReportQueryListener reportListener = new CaptureReportQueryListener(listener); + controller.run(reportListener); + updateStateOnQueryComplete(reportListener.getReport()); return true; } else { return false; } } + + private void updateStateOnQueryComplete(final MSQTaskReportPayload report) + { + switch (report.getStatus().getStatus()) { + case SUCCESS: + state.compareAndSet(State.RUNNING, State.SUCCESS); + break; + + case FAILED: + state.compareAndSet(State.RUNNING, State.FAILED); + break; + } + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java index b80a6b6daf5d..9beab028d1cb 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java @@ -35,10 +35,14 @@ import org.joda.time.Period; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -68,7 +72,7 @@ public class DartControllerRegistry private final LinkedHashMap completeReports = new LinkedHashMap<>(); /** - * Map of SQL query ID -> Dart query ID. Used by {@link #getQueryInfoAndReportBySqlQueryId(String)}. Contains an + * Map of SQL query ID -> Dart query ID. Used by {@link #getQueryDetailsBySqlQueryId(String)}. Contains an * entry for every query in either {@link #controllerMap} or {@link #completeReports}. * * It is possible for the same SQL query ID to map to multiple Dart query IDs, because SQL query IDs can be set @@ -195,21 +199,12 @@ public Collection getAllControllers() * Gets execution details and report for a query. */ @Nullable - public QueryInfoAndReport getQueryInfoAndReport(final String queryId) + public QueryInfoAndReport getQueryDetails(final String queryId) { final ControllerHolder runningController = getController(queryId); if (runningController != null) { - final TaskReport.ReportMap liveReportMap = runningController.getController().liveReports(); - if (liveReportMap != null) { - return new QueryInfoAndReport( - DartQueryInfo.fromControllerHolder(runningController), - liveReportMap, - DateTimes.nowUtc() - ); - } else { - return null; - } + return getQueryDetails(runningController); } else { synchronized (completeReports) { return completeReports.get(queryId); @@ -221,13 +216,40 @@ public QueryInfoAndReport getQueryInfoAndReport(final String queryId) * Gets execution details and report for a query by SQL query ID. */ @Nullable - public QueryInfoAndReport getQueryInfoAndReportBySqlQueryId(final String sqlQueryId) + public QueryInfoAndReport getQueryDetailsBySqlQueryId(final String sqlQueryId) { final String dartQueryId = sqlQueryIdToDartQueryId.get(sqlQueryId); if (dartQueryId == null) { return null; } - return getQueryInfoAndReport(dartQueryId); + return getQueryDetails(dartQueryId); + } + + /** + * Gets execution details and reports for all completed queries. + */ + public List getAllQueryDetails(final boolean includeComplete) + { + final Set queryIds = new HashSet<>(); + final List retVal = new ArrayList<>(); + + for (final ControllerHolder controllerHolder : getAllControllers()) { + if (queryIds.add(controllerHolder.getController().queryId())) { + retVal.add(getQueryDetails(controllerHolder)); + } + } + + if (includeComplete) { + synchronized (completeReports) { + for (Map.Entry entry : completeReports.entrySet()) { + if (queryIds.add(entry.getKey())) { + retVal.add(entry.getValue()); + } + } + } + } + + return retVal; } /** @@ -252,4 +274,13 @@ private void cleanupExpiredReports() } } } + + private static QueryInfoAndReport getQueryDetails(final ControllerHolder controllerHolder) + { + return new QueryInfoAndReport( + DartQueryInfo.fromControllerHolder(controllerHolder), + controllerHolder.getController().liveReports(), + DateTimes.nowUtc() + ); + } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java index de163ec6b0aa..ffb144658ec2 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java @@ -27,7 +27,7 @@ import java.util.Objects; /** - * Object returned by {@link DartControllerRegistry#getQueryInfoAndReport(String)}. + * Object returned by {@link DartControllerRegistry#getQueryDetails(String)}. */ public class QueryInfoAndReport { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java index 88610ee8f338..ca6f3861d95f 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.msq.dart.controller.ControllerHolder; +import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.util.MSQTaskQueryMakerUtils; import org.apache.druid.query.QueryContexts; import org.apache.druid.server.DruidNode; @@ -79,7 +80,7 @@ public static DartQueryInfo fromControllerHolder(final ControllerHolder holder) holder.getAuthenticationResult().getAuthenticatedBy(), holder.getAuthenticationResult().getIdentity(), holder.getStartTime(), - holder.getState().toString() + holder.getState().getStatusString() ); } @@ -151,12 +152,25 @@ public DateTime getStartTime() return startTime; } + @Override @JsonProperty - public String getState() + public String state() { return state; } + @Override + public String engine() + { + return DartSqlEngine.NAME; + } + + @Override + public String executionId() + { + return dartQueryId; + } + /** * Returns a copy of this instance with {@link #getAuthenticator()} and {@link #getIdentity()} nulled. */ diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java index e94aac0cb4a6..e64fffa2b3a6 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java @@ -34,12 +34,13 @@ public interface DartSqlClient /** * Get information about all currently-running queries on this server. * - * @param selfOnly true if only queries from this server should be returned; false if queries from all servers - * should be returned + * @param selfOnly true if only queries from this server should be returned; false if queries from all servers + * should be returned + * @param includeComplete true if completed queries should be included in the response * - * @see SqlResource#doGetRunningQueries(String, HttpServletRequest) the server side + * @see SqlResource#doGetRunningQueries(String, String, HttpServletRequest) the server side */ - ListenableFuture getRunningQueries(boolean selfOnly); + ListenableFuture getRunningQueries(boolean selfOnly, boolean includeComplete); /** * Get query report for a particular SQL query ID on this server. diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java index d2c0e8f1c269..b22fcef13121 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java @@ -49,13 +49,16 @@ public DartSqlClientImpl(final ServiceClient client, final ObjectMapper jsonMapp } @Override - public ListenableFuture getRunningQueries(final boolean selfOnly) + public ListenableFuture getRunningQueries(final boolean selfOnly, final boolean includeComplete) { try { URIBuilder builder = new URIBuilder("/queries"); if (selfOnly) { builder.addParameter("selfOnly", null); } + if (includeComplete) { + builder.addParameter("includeComplete", null); + } return FutureUtils.transform( client.asyncRequest( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java index 0dbc8307e328..954c44d23059 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java @@ -66,6 +66,7 @@ import org.apache.druid.sql.http.GetQueryReportResponse; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -275,21 +276,27 @@ public SqlStatementFactory getSqlStatementFactory() @Override public GetQueriesResponse getRunningQueries( boolean selfOnly, + boolean includeComplete, AuthenticationResult authenticationResult, AuthorizationResult stateReadAuthorization ) { - final List queries = - controllerRegistry.getAllControllers() - .stream() - .map(DartQueryInfo::fromControllerHolder) - .collect(Collectors.toList()); + final List queryDetails = controllerRegistry.getAllQueryDetails(includeComplete); + final List queries = new ArrayList<>(queryDetails.size()); + + for (final QueryInfoAndReport queryDetail : queryDetails) { + queries.add(queryDetail.getQueryInfo()); + } // Add queries from all other servers, if "selfOnly" is false. if (!selfOnly) { final List otherQueries = FutureUtils.getUnchecked( Futures.successfulAsList( - Iterables.transform(sqlClients.getAllClients(), client -> client.getRunningQueries(true))), + Iterables.transform( + sqlClients.getAllClients(), + client -> client.getRunningQueries(true, includeComplete) + ) + ), true ); @@ -329,7 +336,7 @@ public GetQueryReportResponse getQueryReport( final AuthorizationResult stateReadAuthorization ) { - QueryInfoAndReport infoAndReport = controllerRegistry.getQueryInfoAndReportBySqlQueryId(sqlQueryId); + QueryInfoAndReport infoAndReport = controllerRegistry.getQueryDetailsBySqlQueryId(sqlQueryId); if (infoAndReport == null && !selfOnly) { final List otherReports = FutureUtils.getUnchecked( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java new file mode 100644 index 000000000000..597ffc826923 --- /dev/null +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.error.DruidException; +import org.apache.druid.msq.indexing.report.MSQResultsReport; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; + +import javax.annotation.Nullable; +import java.util.List; + +/** + * A {@link QueryListener} wrapper that captures the report from {@link #onQueryComplete(MSQTaskReportPayload)}. + */ +public class CaptureReportQueryListener implements QueryListener +{ + private final QueryListener delegate; + + @Nullable + private volatile MSQTaskReportPayload report; + + public CaptureReportQueryListener(final QueryListener delegate) + { + this.delegate = delegate; + } + + /** + * Retrieves the report. Can only be called once the query is complete. + */ + public MSQTaskReportPayload getReport() + { + if (report == null) { + throw DruidException.defensive("Query not complete, cannot call getReport()"); + } + + return report; + } + + @Override + public boolean readResults() + { + return delegate.readResults(); + } + + @Override + public void onResultsStart( + final List signature, + @Nullable final List sqlTypeNames + ) + { + delegate.onResultsStart(signature, sqlTypeNames); + } + + @Override + public boolean onResultRow(final Object[] row) + { + return delegate.onResultRow(row); + } + + @Override + public void onResultsComplete() + { + delegate.onResultsComplete(); + } + + @Override + public void onQueryComplete(final MSQTaskReportPayload report) + { + this.report = report; + delegate.onQueryComplete(report); + } +} diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java index 3a19d796641a..94d5be39150a 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java @@ -97,8 +97,8 @@ public void test_deregister_noReport_removesFromRegistry() Assertions.assertEquals(0, registry.getAllControllers().size()); Assertions.assertNull(registry.getController("dart1")); - Assertions.assertNull(registry.getQueryInfoAndReport("dart1")); - Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1")); + Assertions.assertNull(registry.getQueryDetails("dart1")); + Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("sql1")); } @Test @@ -118,13 +118,13 @@ public void test_deregister_withReport_retainsReport() Assertions.assertNull(registry.getController("dart1")); // But report is retained - final QueryInfoAndReport infoAndReport = registry.getQueryInfoAndReport("dart1"); + final QueryInfoAndReport infoAndReport = registry.getQueryDetails("dart1"); Assertions.assertNotNull(infoAndReport); Assertions.assertEquals("dart1", infoAndReport.getQueryInfo().getDartQueryId()); Assertions.assertSame(report, infoAndReport.getReportMap().get(MSQTaskReport.REPORT_KEY)); // And can be looked up by SQL query ID - final QueryInfoAndReport infoAndReportBySql = registry.getQueryInfoAndReportBySqlQueryId("sql1"); + final QueryInfoAndReport infoAndReportBySql = registry.getQueryDetailsBySqlQueryId("sql1"); Assertions.assertNotNull(infoAndReportBySql); Assertions.assertEquals("dart1", infoAndReportBySql.getQueryInfo().getDartQueryId()); Assertions.assertEquals("sql1", infoAndReportBySql.getQueryInfo().getSqlQueryId()); @@ -142,8 +142,8 @@ public void test_deregister_withReport_zeroRetainedCount_doesNotRetainReport() registry.register(holder); registry.deregister(holder, reportMap); - Assertions.assertNull(registry.getQueryInfoAndReport("dart1")); - Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1")); + Assertions.assertNull(registry.getQueryDetails("dart1")); + Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("sql1")); } @Test @@ -171,12 +171,12 @@ public void test_getQueryInfoAndReport_runningQuery() registry.register(holder); - final QueryInfoAndReport infoAndReport = registry.getQueryInfoAndReport("dart1"); + final QueryInfoAndReport infoAndReport = registry.getQueryDetails("dart1"); Assertions.assertNotNull(infoAndReport); Assertions.assertEquals("dart1", infoAndReport.getQueryInfo().getDartQueryId()); // Also works by SQL query ID - final QueryInfoAndReport infoAndReportBySql = registry.getQueryInfoAndReportBySqlQueryId("sql1"); + final QueryInfoAndReport infoAndReportBySql = registry.getQueryDetailsBySqlQueryId("sql1"); Assertions.assertNotNull(infoAndReportBySql); Assertions.assertEquals("dart1", infoAndReportBySql.getQueryInfo().getDartQueryId()); @@ -204,12 +204,10 @@ public void test_getQueryInfoAndReport_runningQuery_noLiveReports() registry.register(holder); - // Returns null when no live reports are available - Assertions.assertNull(registry.getQueryInfoAndReport("dart1")); - - // But the sqlQueryId mapping should still work after deregister with report - registry.deregister(holder, null); - Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1")); + // Returns null report when no live reports are available + final QueryInfoAndReport infoAndReport = registry.getQueryDetails("dart1"); + Assertions.assertEquals("dart1", infoAndReport.getQueryInfo().getDartQueryId()); + Assertions.assertNull(infoAndReport.getReportMap()); } @Test @@ -227,22 +225,22 @@ public void test_reportEviction_byCount() } // Only the last 2 reports should be retained - Assertions.assertNull(registry.getQueryInfoAndReport("dart1")); - Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1")); + Assertions.assertNull(registry.getQueryDetails("dart1")); + Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("sql1")); - Assertions.assertNotNull(registry.getQueryInfoAndReport("dart2")); - Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql2")); + Assertions.assertNotNull(registry.getQueryDetails("dart2")); + Assertions.assertNotNull(registry.getQueryDetailsBySqlQueryId("sql2")); - Assertions.assertNotNull(registry.getQueryInfoAndReport("dart3")); - Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql3")); + Assertions.assertNotNull(registry.getQueryDetails("dart3")); + Assertions.assertNotNull(registry.getQueryDetailsBySqlQueryId("sql3")); } @Test - public void test_getQueryInfoAndReportBySqlQueryId_notFound() + public void test_getQueryDetailsBySqlQueryId_notFound() { final DartControllerRegistry registry = new DartControllerRegistry(makeConfig(10, Period.hours(1))); - Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("nonexistent")); + Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("nonexistent")); } @Test diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java index eb06ab03097b..7c09f0a170e5 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java @@ -23,9 +23,9 @@ import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.msq.dart.controller.ControllerHolder; import org.apache.druid.msq.dart.controller.sql.DartSqlEngine; import org.apache.druid.msq.dart.guice.DartWorkerModule; +import org.apache.druid.sql.http.StandardQueryState; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -44,7 +44,7 @@ void test_serde() throws Exception "", "", DateTimes.of("2000"), - ControllerHolder.State.RUNNING.toString() + StandardQueryState.RUNNING ); ObjectMapper jsonMapper = new DefaultObjectMapper().registerModules(new DartWorkerModule().getJacksonModules()); byte[] bytes = jsonMapper.writeValueAsBytes(dartQueryInfo); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java index 92f7a735052f..4cf483961426 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java @@ -91,6 +91,7 @@ import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.sql.http.SqlResource; import org.apache.druid.sql.http.SqlResourceQueryResultPusherFactory; +import org.apache.druid.sql.http.StandardQueryState; import org.apache.druid.sql.http.SupportedEnginesResponse; import org.hamcrest.CoreMatchers; import org.junit.jupiter.api.AfterEach; @@ -322,7 +323,7 @@ public void test_getRunningQueries_selfOnly_superUser() Assertions.assertEquals( new GetQueriesResponse(Collections.singletonList(DartQueryInfo.fromControllerHolder(holder))), - sqlResource.doGetRunningQueries("", httpServletRequest).getEntity() + sqlResource.doGetRunningQueries("", null, httpServletRequest).getEntity() ); controllerRegistry.deregister(holder, null); @@ -346,7 +347,7 @@ public void test_getRunningQueries_selfOnly_regularUser() Assertions.assertEquals( new GetQueriesResponse( Collections.singletonList(DartQueryInfo.fromControllerHolder(holder).withoutAuthenticationResult())), - sqlResource.doGetRunningQueries("", httpServletRequest).getEntity() + sqlResource.doGetRunningQueries("", null, httpServletRequest).getEntity() ); controllerRegistry.deregister(holder, null); @@ -374,9 +375,9 @@ public void test_getRunningQueries_global_superUser() AUTHENTICATOR_NAME, DIFFERENT_REGULAR_USER_NAME, DateTimes.of("2001"), - ControllerHolder.State.RUNNING.toString() + StandardQueryState.RUNNING ); - Mockito.when(dartSqlClient.getRunningQueries(true)) + Mockito.when(dartSqlClient.getRunningQueries(true, false)) .thenReturn(Futures.immediateFuture(new GetQueriesResponse(Collections.singletonList(remoteQueryInfo)))); // With selfOnly = null, the endpoint returns both queries. @@ -387,7 +388,7 @@ public void test_getRunningQueries_global_superUser() remoteQueryInfo ) ), - sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() + sqlResource.doGetRunningQueries(null, null, httpServletRequest).getEntity() ); controllerRegistry.deregister(localHolder, null); @@ -407,14 +408,14 @@ public void test_getRunningQueries_global_remoteError_superUser() final ControllerHolder localHolder = setUpMockRunningQuery(REGULAR_USER_NAME); // Remote call fails. - Mockito.when(dartSqlClient.getRunningQueries(true)) + Mockito.when(dartSqlClient.getRunningQueries(true, false)) .thenReturn(Futures.immediateFailedFuture(new IOException("something went wrong"))); // We only see local queries, because the remote call failed. (The entire call doesn't fail; we see what we // were able to fetch.) Assertions.assertEquals( new GetQueriesResponse(ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder))), - sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() + sqlResource.doGetRunningQueries(null, null, httpServletRequest).getEntity() ); controllerRegistry.deregister(localHolder, null); @@ -442,16 +443,16 @@ public void test_getRunningQueries_global_regularUser() AUTHENTICATOR_NAME, DIFFERENT_REGULAR_USER_NAME, DateTimes.of("2000"), - ControllerHolder.State.RUNNING.toString() + StandardQueryState.RUNNING ); - Mockito.when(dartSqlClient.getRunningQueries(true)) + Mockito.when(dartSqlClient.getRunningQueries(true, false)) .thenReturn(Futures.immediateFuture(new GetQueriesResponse(Collections.singletonList(remoteQueryInfo)))); // The endpoint returns only the query issued by REGULAR_USER_NAME. Assertions.assertEquals( new GetQueriesResponse( ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder).withoutAuthenticationResult())), - sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() + sqlResource.doGetRunningQueries(null, null, httpServletRequest).getEntity() ); controllerRegistry.deregister(localHolder, null); @@ -479,15 +480,15 @@ public void test_getRunningQueries_global_differentRegularUser() AUTHENTICATOR_NAME, DIFFERENT_REGULAR_USER_NAME, DateTimes.of("2000"), - ControllerHolder.State.RUNNING.toString() + StandardQueryState.RUNNING ); - Mockito.when(dartSqlClient.getRunningQueries(true)) + Mockito.when(dartSqlClient.getRunningQueries(true, false)) .thenReturn(Futures.immediateFuture(new GetQueriesResponse(Collections.singletonList(remoteQueryInfo)))); // The endpoint returns only the query issued by DIFFERENT_REGULAR_USER_NAME. Assertions.assertEquals( new GetQueriesResponse(ImmutableList.of(remoteQueryInfo.withoutAuthenticationResult())), - sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity() + sqlResource.doGetRunningQueries(null, null, httpServletRequest).getEntity() ); controllerRegistry.deregister(holder, null); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java index 34be09e5bb00..cdd1276c481f 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java @@ -86,7 +86,7 @@ public void test_getMessages_all() throws Exception jsonMapper.writeValueAsBytes(getQueriesResponse) ); - final ListenableFuture result = dartSqlClient.getRunningQueries(false); + final ListenableFuture result = dartSqlClient.getRunningQueries(false, false); Assertions.assertEquals(getQueriesResponse, result.get()); } @@ -115,7 +115,36 @@ public void test_getMessages_selfOnly() throws Exception jsonMapper.writeValueAsBytes(getQueriesResponse) ); - final ListenableFuture result = dartSqlClient.getRunningQueries(true); + final ListenableFuture result = dartSqlClient.getRunningQueries(true, false); + Assertions.assertEquals(getQueriesResponse, result.get()); + } + + @Test + public void test_getMessages_includeComplete() throws Exception + { + final GetQueriesResponse getQueriesResponse = new GetQueriesResponse( + ImmutableList.of( + new DartQueryInfo( + "sid", + "did", + "SELECT 1", + "localhost:1001", + "", + "", + DateTimes.of("2000"), + ControllerHolder.State.RUNNING.toString() + ) + ) + ); + + serviceClient.expectAndRespond( + new RequestBuilder(HttpMethod.GET, "/queries?includeComplete"), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + jsonMapper.writeValueAsBytes(getQueriesResponse) + ); + + final ListenableFuture result = dartSqlClient.getRunningQueries(false, true); Assertions.assertEquals(getQueriesResponse, result.get()); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 353501b78c47..d8c77c695894 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -80,6 +80,9 @@ public class PlannerConfig @JsonProperty private String nativeQuerySqlPlanningMode = QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED; // can be COUPLED or DECOUPLED + @JsonProperty + private boolean enableSysQueriesTable = false; + public int getMaxNumericInFilters() { return maxNumericInFilters; @@ -149,6 +152,14 @@ public String getNativeQuerySqlPlanningMode() return nativeQuerySqlPlanningMode; } + /** + * Returns whether the sys.queries table is enabled. + */ + public boolean isEnableSysQueriesTable() + { + return enableSysQueriesTable; + } + public PlannerConfig withOverrides(final Map queryContext) { if (queryContext.isEmpty()) { @@ -177,6 +188,7 @@ public boolean equals(Object o) && useNativeQueryExplain == that.useNativeQueryExplain && forceExpressionVirtualColumns == that.forceExpressionVirtualColumns && maxNumericInFilters == that.maxNumericInFilters + && enableSysQueriesTable == that.enableSysQueriesTable && Objects.equals(sqlTimeZone, that.sqlTimeZone) && Objects.equals(nativeQuerySqlPlanningMode, that.nativeQuerySqlPlanningMode); } @@ -197,7 +209,8 @@ public int hashCode() useNativeQueryExplain, forceExpressionVirtualColumns, maxNumericInFilters, - nativeQuerySqlPlanningMode + nativeQuerySqlPlanningMode, + enableSysQueriesTable ); } @@ -213,6 +226,7 @@ public String toString() ", sqlTimeZone=" + sqlTimeZone + ", useNativeQueryExplain=" + useNativeQueryExplain + ", nativeQuerySqlPlanningMode=" + nativeQuerySqlPlanningMode + + ", enableSysQueriesTable=" + enableSysQueriesTable + '}'; } @@ -247,6 +261,7 @@ public static class Builder private boolean forceExpressionVirtualColumns; private int maxNumericInFilters; private String nativeQuerySqlPlanningMode; + private boolean enableSysQueriesTable; public Builder(PlannerConfig base) { @@ -266,6 +281,7 @@ public Builder(PlannerConfig base) forceExpressionVirtualColumns = base.isForceExpressionVirtualColumns(); maxNumericInFilters = base.getMaxNumericInFilters(); nativeQuerySqlPlanningMode = base.getNativeQuerySqlPlanningMode(); + enableSysQueriesTable = base.isEnableSysQueriesTable(); } public Builder requireTimeCondition(boolean option) @@ -340,6 +356,12 @@ public Builder nativeQuerySqlPlanningMode(String mode) return this; } + public Builder enableSysQueriesTable(boolean option) + { + this.enableSysQueriesTable = option; + return this; + } + public Builder withOverrides(final Map queryContext) { useApproximateCountDistinct = QueryContexts.parseBoolean( @@ -436,6 +458,7 @@ public PlannerConfig build() config.maxNumericInFilters = maxNumericInFilters; config.forceExpressionVirtualColumns = forceExpressionVirtualColumns; config.nativeQuerySqlPlanningMode = nativeQuerySqlPlanningMode; + config.enableSysQueriesTable = enableSysQueriesTable; return config; } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java index 7b2902c11b89..f47328a21aaa 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java @@ -138,12 +138,15 @@ default void initContextMap(Map contextMap) * * @param selfOnly whether to only include queries running on this server. If false, this server should * contact all other servers to build a full list of all running queries. + * @param includeComplete whether to include completed queries in the response. The number of completed queries + * returned is determined by engine-specific retention settings. * @param authenticationResult implementations should use this for filtering the list of visible queries * @param stateReadAuthorization authorization for the STATE READ resource. If this is authorized, implementations * should allow all queries to be visible */ default GetQueriesResponse getRunningQueries( boolean selfOnly, + boolean includeComplete, AuthenticationResult authenticationResult, AuthorizationResult stateReadAuthorization ) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 8b15f30bab1c..4440582d9a00 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.inject.Inject; +import com.google.inject.Provider; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.calcite.DataContext; @@ -75,8 +76,13 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.table.RowSignatures; +import org.apache.druid.sql.http.GetQueriesResponse; +import org.apache.druid.sql.http.QueryInfo; +import org.apache.druid.sql.http.SqlEngineRegistry; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentStatusInCluster; @@ -102,6 +108,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; private static final String SUPERVISOR_TABLE = "supervisors"; + private static final String QUERIES_TABLE = "queries"; private static final Function> SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR = segment -> @@ -226,6 +233,24 @@ public class SystemSchema extends AbstractSchema .add("spec", ColumnType.STRING) .build(); + static final RowSignature QUERIES_SIGNATURE = RowSignature + .builder() + .add("id", ColumnType.STRING) + .add("engine", ColumnType.STRING) + .add("state", ColumnType.STRING) + .add("info", ColumnType.STRING) + .build(); + + /** + * Index of the "info" column in {@link #QUERIES_SIGNATURE}. Used for projection pushdown. + */ + private static final int QUERIES_INFO_INDEX = QUERIES_SIGNATURE.indexOf("info"); + + /** + * List of [0..n) where n is the size of {@link #QUERIES_SIGNATURE}. + */ + private static final int[] QUERIES_PROJECT_ALL = IntStream.range(0, QUERIES_SIGNATURE.size()).toArray(); + private final Map tableMap; @Inject @@ -239,13 +264,16 @@ public SystemSchema( final OverlordClient overlordClient, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final ObjectMapper jsonMapper, - @EscalatedClient final HttpClient httpClient + @EscalatedClient final HttpClient httpClient, + final Provider sqlEngineRegistryProvider, + final PlannerConfig plannerConfig ) { Preconditions.checkNotNull(serverView, "serverView"); - this.tableMap = ImmutableMap.of( - SEGMENTS_TABLE, - new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper), + + final ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper)); + builder.put( SERVERS_TABLE, new ServersTable( druidNodeDiscoveryProvider, @@ -254,16 +282,21 @@ public SystemSchema( overlordClient, coordinatorClient, jsonMapper - ), - SERVER_SEGMENTS_TABLE, - new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, - new TasksTable(overlordClient, authorizerMapper), - SUPERVISOR_TABLE, - new SupervisorsTable(overlordClient, authorizerMapper), + ) + ); + builder.put(SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper)); + builder.put(TASKS_TABLE, new TasksTable(overlordClient, authorizerMapper)); + builder.put(SUPERVISOR_TABLE, new SupervisorsTable(overlordClient, authorizerMapper)); + builder.put( SystemServerPropertiesTable.TABLE_NAME, new SystemServerPropertiesTable(druidNodeDiscoveryProvider, authorizerMapper, httpClient, jsonMapper) ); + + if (plannerConfig.isEnableSysQueriesTable()) { + builder.put(QUERIES_TABLE, new QueriesTable(sqlEngineRegistryProvider, jsonMapper, authorizerMapper)); + } + + this.tableMap = builder.build(); } @Override @@ -1184,4 +1217,136 @@ private static Object[] projectSegmentsRow( } return projectedRow; } + + /** + * This table contains currently running and recently completed queries from all SQL engines. + * Enabled based on {@link PlannerConfig#isEnableSysQueriesTable()}. + */ + static class QueriesTable extends AbstractTable implements ProjectableFilterableTable + { + private final Provider sqlEngineRegistryProvider; + private final ObjectMapper jsonMapper; + private final AuthorizerMapper authorizerMapper; + + public QueriesTable( + final Provider sqlEngineRegistryProvider, + final ObjectMapper jsonMapper, + final AuthorizerMapper authorizerMapper + ) + { + this.sqlEngineRegistryProvider = sqlEngineRegistryProvider; + this.jsonMapper = jsonMapper; + this.authorizerMapper = authorizerMapper; + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) + { + return RowSignatures.toRelDataType(QUERIES_SIGNATURE, typeFactory); + } + + @Override + public TableType getJdbcTableType() + { + return TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable scan( + final DataContext root, + final List filters, + @Nullable final int[] projects + ) + { + final AuthenticationResult authenticationResult = (AuthenticationResult) Preconditions.checkNotNull( + root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT), + "authenticationResult in dataContext" + ); + + // Check STATE READ authorization + final AuthorizationResult stateReadAuthorization = AuthorizationUtils.authorizeAllResourceActions( + authenticationResult, + Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, Action.READ)), + authorizerMapper + ); + + // Get queries from all engines + final List allQueries = new ArrayList<>(); + for (final SqlEngine sqlEngine : sqlEngineRegistryProvider.get().getAllEngines()) { + final GetQueriesResponse response = sqlEngine.getRunningQueries( + false, // selfOnly false to get queries from all servers + true, // includeComplete true to include all queries + authenticationResult, + stateReadAuthorization + ); + allQueries.addAll(response.getQueries()); + } + + // Determine if we need to serialize the info field (based on projection pushdown) + final int[] nonNullProjects = projects == null ? QUERIES_PROJECT_ALL : projects; + final boolean includeInfo = containsIndex(nonNullProjects, QUERIES_INFO_INDEX); + + // Build rows + final FluentIterable results = FluentIterable + .from(allQueries) + .transform(queryInfo -> buildQueryRow(queryInfo, includeInfo, jsonMapper)) + .transform(row -> projectQueriesRow(row, nonNullProjects)); + + return Linq4j.asEnumerable(results); + } + + /** + * Build a full row for a query. + */ + private static Object[] buildQueryRow( + final QueryInfo queryInfo, + final boolean includeInfo, + final ObjectMapper jsonMapper + ) + { + final Object[] row = new Object[QUERIES_SIGNATURE.size()]; + row[0] = queryInfo.executionId(); + row[1] = queryInfo.engine(); + row[2] = queryInfo.state(); + + // Only serialize info if it's in the projection + if (includeInfo) { + try { + row[3] = jsonMapper.writeValueAsString(queryInfo); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } else { + row[3] = null; + } + + return row; + } + + /** + * Project a row to include only the columns in the projection. + */ + private static Object[] projectQueriesRow(final Object[] row, final int[] projects) + { + final Object[] projectedRow = new Object[projects.length]; + for (int i = 0; i < projects.length; i++) { + projectedRow[i] = row[projects[i]]; + } + return projectedRow; + } + + /** + * Check if an array contains a specific index. + */ + private static boolean containsIndex(final int[] array, final int index) + { + for (final int i : array) { + if (i == index) { + return true; + } + } + return false; + } + } } diff --git a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java index d3ec14cde503..fa8da407bba9 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java +++ b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java @@ -21,7 +21,26 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; +/** + * Information about a SQL query. Implementations are engine-specific. + */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "engine") public interface QueryInfo { + /** + * Returns the engine name for this query, matching the JSON "engine" property. + */ + String engine(); + + /** + * Returns the state of this query, which may be an engine-specific string. Standard strings + * are in {@link StandardQueryState}, although engines can use additional strings if they like. + */ + String state(); + + /** + * Returns the execution ID for this query. This is the system-generated ID used internally, + * such as the dartQueryId for Dart queries. + */ + String executionId(); } diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index e5f1a513d52b..d5b6876129bb 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -140,14 +140,18 @@ public Response doPost( /** * API to list all running queries, for all engines that supports such listings. * - * @param selfOnly if true, return queries running on this server. If false, return queries running on all servers. - * @param request http request. + * @param selfOnly if present, return queries running on this server only. If absent, return queries + * running on all servers. + * @param includeComplete if present, include completed queries in the response. The number of completed queries + * returned is determined by engine-specific retention settings. + * @param request http request. */ @GET @Path("/queries") @Produces(MediaType.APPLICATION_JSON) public Response doGetRunningQueries( @QueryParam("selfOnly") final String selfOnly, + @QueryParam("includeComplete") final String includeComplete, @Context final HttpServletRequest request ) { @@ -163,7 +167,14 @@ public Response doGetRunningQueries( // Get running queries from all engines that support it. for (SqlEngine sqlEngine : engines) { - queries.addAll(sqlEngine.getRunningQueries(selfOnly != null, authenticationResult, stateReadAccess).getQueries()); + queries.addAll( + sqlEngine.getRunningQueries( + selfOnly != null, + includeComplete != null, + authenticationResult, + stateReadAccess + ).getQueries() + ); } AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request); diff --git a/sql/src/main/java/org/apache/druid/sql/http/StandardQueryState.java b/sql/src/main/java/org/apache/druid/sql/http/StandardQueryState.java new file mode 100644 index 000000000000..67645d4cbe78 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/http/StandardQueryState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.http; + +/** + * Standard strings returned by {@link QueryInfo#state()}. Engines can add their own if they like. + */ +public class StandardQueryState +{ + public static final String ACCEPTED = "ACCEPTED"; + public static final String RUNNING = "RUNNING"; + public static final String SUCCESS = "SUCCESS"; + public static final String FAILED = "FAILED"; + public static final String CANCELED = "CANCELED"; + + private StandardQueryState() + { + // No instantiation. + } +} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index 55e72a7a682a..e308a0331ef8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -55,6 +55,7 @@ import org.apache.druid.sql.calcite.planner.CatalogResolver; import org.apache.druid.sql.calcite.planner.DruidOperatorTable; import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.run.SqlEngine; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.view.ViewManager; import org.easymock.EasyMock; @@ -106,6 +107,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase @BeforeEach public void setUp() { + EasyMock.expect(plannerConfig.isEnableSysQueriesTable()).andReturn(false).anyTimes(); EasyMock.replay(plannerConfig); target = new DruidCalciteSchemaModule(); injector = Guice.createInjector( @@ -134,6 +136,7 @@ public void setUp() .toInstance(CentralizedDatasourceSchemaConfig.create()); binder.bind(HttpClient.class).toInstance(httpClient); binder.bind(HttpClient.class).annotatedWith(EscalatedClient.class).toInstance(httpClient); + binder.bind(new TypeLiteral>() {}).toInstance(ImmutableSet.of()); }, new LifecycleModule(), target); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index ad000977383b..10cada8563e0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -102,12 +102,18 @@ import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.server.security.ResourceType; import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.run.SqlEngine; +import org.apache.druid.sql.calcite.schema.SystemSchema.QueriesTable; import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable; import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.sql.calcite.util.CalciteTestBase; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.sql.calcite.util.TestTimelineServerView; +import org.apache.druid.sql.http.GetQueriesResponse; +import org.apache.druid.sql.http.QueryInfo; +import org.apache.druid.sql.http.SqlEngineRegistry; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -280,7 +286,9 @@ public void setUp(@TempDir File tmpDir) throws Exception overlordClient, druidNodeDiscoveryProvider, MAPPER, - httpClient + httpClient, + () -> new SqlEngineRegistry(Collections.emptySet()), + new PlannerConfig() ); } @@ -1587,6 +1595,76 @@ public void testPropertiesTable() } + @Test + public void testQueriesTable() + { + // Create mock SqlEngine that returns test queries + final SqlEngine mockEngine = EasyMock.createMock(SqlEngine.class); + EasyMock.expect(mockEngine.name()).andReturn("native").anyTimes(); + EasyMock.expect(mockEngine.getRunningQueries( + EasyMock.eq(false), + EasyMock.eq(true), + EasyMock.anyObject(), + EasyMock.anyObject() + )).andReturn(new GetQueriesResponse(ImmutableList.of( + createTestQueryInfo("query-1", "native", "RUNNING"), + createTestQueryInfo("query-2", "native", "COMPLETED") + ))).once(); + EasyMock.replay(mockEngine); + + final SqlEngineRegistry registry = new SqlEngineRegistry(ImmutableSet.of(mockEngine)); + final QueriesTable queriesTable = new QueriesTable(() -> registry, MAPPER, authMapper); + + final DataContext dataContext = createDataContext(Users.SUPER); + final List rows = queriesTable.scan(dataContext, Collections.emptyList(), null).toList(); + + Assert.assertEquals(2, rows.size()); + + // Verify first row + Assert.assertEquals("query-1", rows.get(0)[0]); + Assert.assertEquals("native", rows.get(0)[1]); + Assert.assertEquals("RUNNING", rows.get(0)[2]); + Assert.assertNotNull(rows.get(0)[3]); // info should be serialized JSON + + // Verify second row + Assert.assertEquals("query-2", rows.get(1)[0]); + Assert.assertEquals("native", rows.get(1)[1]); + Assert.assertEquals("COMPLETED", rows.get(1)[2]); + Assert.assertNotNull(rows.get(1)[3]); // info should be serialized JSON + + // Verify value types + verifyTypes(rows, SystemSchema.QUERIES_SIGNATURE); + + EasyMock.verify(mockEngine); + } + + /** + * Creates a test QueryInfo implementation for testing purposes. + */ + private QueryInfo createTestQueryInfo(final String executionId, final String engine, final String state) + { + return new QueryInfo() + { + @Override + public String engine() + { + return engine; + } + + @Override + public String state() + { + return state; + } + + @Override + public String executionId() + { + return executionId; + } + }; + } + private String getStatusPropertiesUrl(DiscoveryDruidNode discoveryDruidNode) { return discoveryDruidNode.getDruidNode().getUriToUse().resolve("/status/properties").toString(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 59151d352e2c..069b4ead356d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -89,6 +89,7 @@ import org.apache.druid.sql.calcite.schema.MetadataSegmentView; import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.util.testoperator.CalciteTestOperatorModule; +import org.apache.druid.sql.http.SqlEngineRegistry; import org.apache.druid.timeline.DataSegment; import org.joda.time.Duration; @@ -98,6 +99,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -419,7 +421,9 @@ private TaskStatusPlus createTaskStatus(String id, String datasource, Long durat overlordClient, provider, getJsonMapper(), - new FakeHttpClient() + new FakeHttpClient(), + () -> new SqlEngineRegistry(Collections.emptySet()), + new PlannerConfig() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java index cc650ae27253..196cf1eada9a 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java @@ -104,6 +104,24 @@ public String getAuthenticator() return authenticator; } + @Override + public String engine() + { + return "test"; + } + + @Override + public String state() + { + return "RUNNING"; + } + + @Override + public String executionId() + { + return "test-execution-id"; + } + @Override public boolean equals(Object o) {