From 55315a24fa4808bf7ecb63a06ab5f1297ecf9e60 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Fri, 28 Jun 2024 12:42:25 -0700 Subject: [PATCH 1/8] Introduced `includeTrailerHeader` to enable `TrailerHeaders` in response. If enabled, a header `X-Error-Message` will be added to indicate partial results. --- docs/querying/query-context.md | 57 ++--- .../org/apache/druid/query/QueryContexts.java | 2 + .../apache/druid/server/QueryResource.java | 4 +- .../druid/server/QueryResultPusher.java | 30 ++- .../druid/server/QueryResourceTest.java | 203 +++++++++++++++++- .../druid/server/QueryResultPusherTest.java | 3 +- .../apache/druid/sql/http/SqlResource.java | 4 +- 7 files changed, 268 insertions(+), 35 deletions(-) diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index d5ddea04f27d..51f1df26ae17 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -38,34 +38,35 @@ Note that setting query context will override both the default value and the run Unless otherwise noted, the following parameters apply to all query types, and to both native and SQL queries. See [SQL query context](sql-query-context.md) for other query context parameters that are specific to Druid SQL planning. -|Parameter |Default | Description | -|-------------------|----------------------------------------|----------------------| -|`timeout` | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout` (up to the server-side maximum query timeout, `druid.server.http.maxQueryTimeout`). To set the default timeout and maximum timeout, see [Broker configuration](../configuration/index.md#broker) | -|`priority` | The default priority is one of the following: | Query priority. Queries with higher priority get precedence for computational resources.| -|`lane` | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.| -|`queryId` | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|`brokerService` | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *manual*. See [Router strategies](../design/router.md#router-strategies) for more details.| -|`useCache` | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | -|`populateCache` | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | -|`useResultLevelCache`| `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | -|`populateResultLevelCache` | `true` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateResultLevelCache` to determine whether or not to save the results of this query to the result-level query cache | -|`bySegment` | `false` | Native queries only. Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | -|`finalize` | `N/A` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator returns the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | -|`maxScatterGatherBytes`| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.md#broker) for more details.| -|`maxQueuedBytes` | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.| -|`maxSubqueryRows`| `druid.server.http.maxSubqueryRows` | Upper limit on the number of rows a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.| -|`maxSubqueryBytes`| `druid.server.http.maxSubqueryBytes` | Upper limit on the number of bytes a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.| -|`serializeDateTimeAsLong`| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process| -|`serializeDateTimeAsLongInner`| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process| -|`enableParallelMerge`|`true`|Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.md#broker) for more details.| -|`parallelMergeParallelism`|`druid.processing.merge.pool.parallelism`|Maximum number of parallel threads to use for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details.| -|`parallelMergeInitialYieldRows`|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.md#broker) for more details.| -|`parallelMergeSmallBatchRows`|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details.| -|`useFilterCNF`|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects.| -|`secondaryPartitionPruning`|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.| -|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | -|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | -|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| +|Parameter |Default | Description | +|-------------------|----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +|`timeout` | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout` (up to the server-side maximum query timeout, `druid.server.http.maxQueryTimeout`). To set the default timeout and maximum timeout, see [Broker configuration](../configuration/index.md#broker) | +|`priority` | The default priority is one of the following: | Query priority. Queries with higher priority get precedence for computational resources. | +|`lane` | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details. | +|`queryId` | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|`brokerService` | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *manual*. See [Router strategies](../design/router.md#router-strategies) for more details. | +|`useCache` | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | +|`populateCache` | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | +|`useResultLevelCache`| `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | +|`populateResultLevelCache` | `true` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateResultLevelCache` to determine whether or not to save the results of this query to the result-level query cache | +|`bySegment` | `false` | Native queries only. Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | +|`finalize` | `N/A` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator returns the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|`maxScatterGatherBytes`| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.md#broker) for more details. | +|`maxQueuedBytes` | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled. | +|`maxSubqueryRows`| `druid.server.http.maxSubqueryRows` | Upper limit on the number of rows a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details. | +|`maxSubqueryBytes`| `druid.server.http.maxSubqueryBytes` | Upper limit on the number of bytes a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details. | +|`serializeDateTimeAsLong`| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process | +|`serializeDateTimeAsLongInner`| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process | +|`enableParallelMerge`|`true`| Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.md#broker) for more details. | +|`parallelMergeParallelism`|`druid.processing.merge.pool.parallelism`| Maximum number of parallel threads to use for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details. | +|`parallelMergeInitialYieldRows`|`druid.processing.merge.task.initialYieldNumRows`| Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.md#broker) for more details. | +|`parallelMergeSmallBatchRows`|`druid.processing.merge.task.smallBatchNumRows`| Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details. | +|`useFilterCNF`|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects. | +|`secondaryPartitionPruning`|`true`| Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions. | +|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | +|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | +|`sqlPlannerBloat`|`1000`| Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects. | +|`includeTrailerHeader`|`false`| Whether the response can include trailer headers, which are currently used to indicate any error messages in the case of a chunked response, if any.| ## Parameters by query type diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 61520a04bc28..52b10edd6f30 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -93,6 +93,8 @@ public class QueryContexts // query's runtime public static final String QUERY_RESOURCE_ID = "queryResourceId"; + public static final String INCLUDE_TRAILER_HEADER = "includeTrailerHeader"; + // SQL query context keys public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID; public static final String CTX_SQL_STRINGIFY_ARRAYS = "sqlStringifyArrays"; diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 2db205ca0bed..ebd78e238865 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -92,6 +92,7 @@ public class QueryResource implements QueryCountStatsProvider public static final String HEADER_RESPONSE_CONTEXT = "X-Druid-Response-Context"; public static final String HEADER_IF_NONE_MATCH = "If-None-Match"; public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id"; + public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message"; public static final String HEADER_ETAG = "ETag"; protected final QueryLifecycleFactory queryLifecycleFactory; @@ -492,7 +493,8 @@ public QueryResourceQueryResultPusher( QueryResource.this.counter, queryLifecycle.getQueryId(), MediaType.valueOf(io.getResponseWriter().getResponseType()), - ImmutableMap.of() + ImmutableMap.of(), + queryLifecycle.getQuery().context().getBoolean(QueryContexts.INCLUDE_TRAILER_HEADER, false) ); this.req = req; this.queryLifecycle = queryLifecycle; diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index 074beb545b43..aaea085fb659 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -38,6 +38,8 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.ForbiddenException; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import javax.annotation.Nullable; import javax.servlet.AsyncContext; @@ -63,6 +65,8 @@ public abstract class QueryResultPusher private final QueryResource.QueryMetricCounter counter; private final MediaType contentType; private final Map extraHeaders; + private final boolean includeTrailerHeader; + private final HttpFields trailerFields; private StreamingHttpResponseAccumulator accumulator; private AsyncContext asyncContext; @@ -76,7 +80,8 @@ public QueryResultPusher( QueryResource.QueryMetricCounter counter, String queryId, MediaType contentType, - Map extraHeaders + Map extraHeaders, + boolean includeTrailerHeader ) { this.request = request; @@ -87,6 +92,8 @@ public QueryResultPusher( this.counter = counter; this.contentType = contentType; this.extraHeaders = extraHeaders; + this.includeTrailerHeader = includeTrailerHeader; + this.trailerFields = new HttpFields(); } /** @@ -121,6 +128,11 @@ public Response push() final Response.ResponseBuilder startResponse = resultsWriter.start(); if (startResponse != null) { startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId); + + if (includeTrailerHeader) { + startResponse.header(HttpHeader.TRAILER.toString(), QueryResource.ERROR_MESSAGE_TRAILER_HEADER); + } + for (Map.Entry entry : extraHeaders.entrySet()) { startResponse.header(entry.getKey(), entry.getValue()); } @@ -143,6 +155,13 @@ public Response push() response.setHeader(entry.getKey(), entry.getValue()); } + if (includeTrailerHeader && response instanceof org.eclipse.jetty.server.Response) { + org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response; + + jettyResponse.setHeader(HttpHeader.TRAILER.toString(), QueryResource.ERROR_MESSAGE_TRAILER_HEADER); + jettyResponse.setTrailers(() -> trailerFields); + } + accumulator = new StreamingHttpResponseAccumulator(queryResponse.getResponseContext(), resultsWriter); results.accumulate(null, accumulator); @@ -223,6 +242,10 @@ private Response handleDruidException(ResultsWriter resultsWriter, DruidExceptio // also throwing the exception body into the response to make it easier for the client to choke if it manages // to parse a meaningful object out, but that's potentially an API change so we leave that as an exercise for // the future. + + if (includeTrailerHeader) { + trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, e.getMessage()); + } return null; } } @@ -418,6 +441,11 @@ public void initialize() response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, serializationResult.getResult()); response.setContentType(contentType.toString()); + if (includeTrailerHeader && response instanceof org.eclipse.jetty.server.Response) { + org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response; + jettyResponse.setTrailers(() -> trailerFields); + } + try { out = new CountingOutputStream(response.getOutputStream()); writer = resultsWriter.makeWriter(out); diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 43bce13e2c89..071367539217 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -38,8 +38,13 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.LazySequence; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.java.util.common.guava.YieldingAccumulator; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.BadJsonQueryException; @@ -48,6 +53,7 @@ import org.apache.druid.query.MapQueryToolChestWarehouse; import org.apache.druid.query.Query; import org.apache.druid.query.QueryCapacityExceededException; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryRunner; @@ -64,6 +70,7 @@ import org.apache.druid.server.log.TestRequestLogger; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.mocks.ExceptionalInputStream; +import org.apache.druid.server.mocks.MockAsyncContext; import org.apache.druid.server.mocks.MockHttpServletRequest; import org.apache.druid.server.mocks.MockHttpServletResponse; import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; @@ -80,6 +87,11 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.http.HttpStatus; +import org.easymock.EasyMock; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.HttpOutput; import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; @@ -254,9 +266,10 @@ public void testGoodQuery() throws IOException @Test public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException { - String overrideConfigKey = "priority"; - String overrideConfigValue = "678"; - DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue)); + final String overrideConfigKey = "priority"; + final String overrideConfigValue = "678"; + DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue, + QueryContexts.INCLUDE_TRAILER_HEADER, true)); queryResource = new QueryResource( new QueryLifecycleFactory( WAREHOUSE, @@ -302,6 +315,10 @@ public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException overrideConfigValue, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey) ); + Assert.assertEquals( + overrideConfigValue, + testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(QueryContexts.INCLUDE_TRAILER_HEADER) + ); } @Test @@ -375,8 +392,163 @@ public QueryRunner getQueryRunnerForSegments( overrideConfigValue, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey) ); + Assert.assertFalse( + testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().containsKey(QueryContexts.INCLUDE_TRAILER_HEADER)); } + @Test + public void testResponseWithIncludeTrailerHeader() throws IOException + { + DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(QueryContexts.INCLUDE_TRAILER_HEADER, true)); + + queryResource = new QueryResource( + new QueryLifecycleFactory( + WAREHOUSE, + new QuerySegmentWalker() + { + @Override + public QueryRunner getQueryRunnerForIntervals( + Query query, + Iterable intervals + ) + { + return (queryPlus, responseContext) -> new Sequence() { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) { + try { + ((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized + } + catch (IOException ignore) { + } + } + + throw new QueryTimeoutException(); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + return Yielders.done(initValue, null); + } + }; + } + + @Override + public QueryRunner getQueryRunnerForSegments( + Query query, + Iterable specs + ) + { + throw new UnsupportedOperationException(); + } + }, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + Suppliers.ofInstance(overrideConfig) + ), + jsonMapper, + smileMapper, + queryScheduler, + new AuthConfig(), + null, + ResponseContextConfig.newConfig(true), + DRUID_NODE + ); + + expectPermissiveHappyPathAuth(); + + org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest); + Assert.assertNull(queryResource.doPost(new ByteArrayInputStream( + SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), + null /*pretty*/, + testServletRequest)); + Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString())); + Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()), QueryResource.ERROR_MESSAGE_TRAILER_HEADER); + + final HttpFields fields = response.getTrailers().get(); + Assert.assertTrue(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER)); + Assert.assertEquals(fields.get(QueryResource.ERROR_MESSAGE_TRAILER_HEADER), + "Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query."); + } + + @Test + public void testResponseWithoutIncludeTrailerHeader() throws IOException + { + queryResource = new QueryResource( + new QueryLifecycleFactory( + WAREHOUSE, + new QuerySegmentWalker() + { + @Override + public QueryRunner getQueryRunnerForIntervals( + Query query, + Iterable intervals + ) + { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) { + try { + ((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized + } + catch (IOException ignore) { + } + } + + throw new QueryTimeoutException(); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + return Yielders.done(initValue, null); + } + }; + } + + @Override + public QueryRunner getQueryRunnerForSegments( + Query query, + Iterable specs + ) + { + throw new UnsupportedOperationException(); + } + }, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) + ), + jsonMapper, + smileMapper, + queryScheduler, + new AuthConfig(), + null, + ResponseContextConfig.newConfig(true), + DRUID_NODE + ); + + expectPermissiveHappyPathAuth(); + + org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest); + Assert.assertNull(queryResource.doPost(new ByteArrayInputStream( + SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), + null /*pretty*/, + testServletRequest)); + Assert.assertFalse(response.containsHeader(HttpHeader.TRAILER.toString())); + Assert.assertNull(response.getTrailers()); + } @Test public void testQueryThrowsRuntimeExceptionFromLifecycleExecute() throws IOException @@ -1413,4 +1585,29 @@ private Response expectSynchronousRequestFlow( { return queryResource.doPost(new ByteArrayInputStream(bytes), null, req); } + + public org.eclipse.jetty.server.Response jettyResponseforRequest(MockHttpServletRequest req) throws IOException + { + HttpChannel channelMock = EasyMock.mock(HttpChannel.class); + HttpOutput outputMock = EasyMock.mock(HttpOutput.class); + org.eclipse.jetty.server.Response response = new org.eclipse.jetty.server.Response(channelMock, outputMock); + + EasyMock.expect(channelMock.isSendError()).andReturn(false); + EasyMock.expect(channelMock.isCommitted()).andReturn(true); + + outputMock.close(); + EasyMock.expectLastCall().andVoid(); + + outputMock.write(EasyMock.anyObject(byte[].class), EasyMock.anyInt(), EasyMock.anyInt()); + EasyMock.expectLastCall().andVoid(); + + EasyMock.replay(outputMock, channelMock); + + req.newAsyncContext(() -> { + final MockAsyncContext retVal = new MockAsyncContext(); + retVal.response = response; + return retVal; + }); + return response; + } } diff --git a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java index 5b123b7cc2ac..e8f8b8f8828c 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java @@ -114,7 +114,8 @@ public QueryResponse getQueryResponse() counter, queryId, contentType, - extraHeaders) + extraHeaders, + false) { @Override diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index 4adea5d8d84e..feaaf252469e 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -27,6 +27,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.QueryContexts; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryResource; import org.apache.druid.server.QueryResponse; @@ -226,7 +227,8 @@ public SqlResourceQueryResultPusher( SqlResource.QUERY_METRIC_COUNTER, sqlQueryId, MediaType.APPLICATION_JSON_TYPE, - headers + headers, + sqlQuery.queryContext().getBoolean(QueryContexts.INCLUDE_TRAILER_HEADER, false) ); this.sqlQueryId = sqlQueryId; this.stmt = stmt; From 4dbaf62d92881fdc35c3d7f87feb0c7987fd6745 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Fri, 28 Jun 2024 22:44:07 -0700 Subject: [PATCH 2/8] Fixed UT --- .../test/java/org/apache/druid/server/QueryResourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 071367539217..f7f7ffe26724 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -316,7 +316,7 @@ public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey) ); Assert.assertEquals( - overrideConfigValue, + true, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(QueryContexts.INCLUDE_TRAILER_HEADER) ); } From befcf618e905445a5a8ef126f7a0881fa37bc700 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Sat, 3 Aug 2024 09:23:37 -0700 Subject: [PATCH 3/8] Reverted formatting changes in readme file. --- docs/querying/query-context.md | 58 +++++++++++++++++----------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 51f1df26ae17..2aba725dc7c0 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -38,35 +38,35 @@ Note that setting query context will override both the default value and the run Unless otherwise noted, the following parameters apply to all query types, and to both native and SQL queries. See [SQL query context](sql-query-context.md) for other query context parameters that are specific to Druid SQL planning. -|Parameter |Default | Description | -|-------------------|----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -|`timeout` | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout` (up to the server-side maximum query timeout, `druid.server.http.maxQueryTimeout`). To set the default timeout and maximum timeout, see [Broker configuration](../configuration/index.md#broker) | -|`priority` | The default priority is one of the following:
  • Value of `priority` in the query context, if set
  • The value of the runtime property `druid.query.default.context.priority`, if set and not null
  • `0` if the priority is not set in the query context or runtime properties
| Query priority. Queries with higher priority get precedence for computational resources. | -|`lane` | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details. | -|`queryId` | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | -|`brokerService` | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *manual*. See [Router strategies](../design/router.md#router-strategies) for more details. | -|`useCache` | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | -|`populateCache` | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | -|`useResultLevelCache`| `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | -|`populateResultLevelCache` | `true` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateResultLevelCache` to determine whether or not to save the results of this query to the result-level query cache | -|`bySegment` | `false` | Native queries only. Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | -|`finalize` | `N/A` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator returns the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | -|`maxScatterGatherBytes`| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.md#broker) for more details. | -|`maxQueuedBytes` | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled. | -|`maxSubqueryRows`| `druid.server.http.maxSubqueryRows` | Upper limit on the number of rows a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details. | -|`maxSubqueryBytes`| `druid.server.http.maxSubqueryBytes` | Upper limit on the number of bytes a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details. | -|`serializeDateTimeAsLong`| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process | -|`serializeDateTimeAsLongInner`| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process | -|`enableParallelMerge`|`true`| Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.md#broker) for more details. | -|`parallelMergeParallelism`|`druid.processing.merge.pool.parallelism`| Maximum number of parallel threads to use for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details. | -|`parallelMergeInitialYieldRows`|`druid.processing.merge.task.initialYieldNumRows`| Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.md#broker) for more details. | -|`parallelMergeSmallBatchRows`|`druid.processing.merge.task.smallBatchNumRows`| Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details. | -|`useFilterCNF`|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects. | -|`secondaryPartitionPruning`|`true`| Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions. | -|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | -|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | -|`sqlPlannerBloat`|`1000`| Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects. | -|`includeTrailerHeader`|`false`| Whether the response can include trailer headers, which are currently used to indicate any error messages in the case of a chunked response, if any.| +|Parameter |Default | Description | +|-------------------|----------------------------------------|----------------------| +|`timeout` | `druid.server.http.defaultQueryTimeout`| Query timeout in millis, beyond which unfinished queries will be cancelled. 0 timeout means `no timeout` (up to the server-side maximum query timeout, `druid.server.http.maxQueryTimeout`). To set the default timeout and maximum timeout, see [Broker configuration](../configuration/index.md#broker) | +|`priority` | The default priority is one of the following:
  • Value of `priority` in the query context, if set
  • The value of the runtime property `druid.query.default.context.priority`, if set and not null
  • `0` if the priority is not set in the query context or runtime properties
| Query priority. Queries with higher priority get precedence for computational resources.| +|`lane` | `null` | Query lane, used to control usage limits on classes of queries. See [Broker configuration](../configuration/index.md#broker) for more details.| +|`queryId` | auto-generated | Unique identifier given to this query. If a query ID is set or known, this can be used to cancel the query | +|`brokerService` | `null` | Broker service to which this query should be routed. This parameter is honored only by a broker selector strategy of type *manual*. See [Router strategies](../design/router.md#router-strategies) for more details.| +|`useCache` | `true` | Flag indicating whether to leverage the query cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Apache Druid uses `druid.broker.cache.useCache` or `druid.historical.cache.useCache` to determine whether or not to read from the query cache | +|`populateCache` | `true` | Flag indicating whether to save the results of the query to the query cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateCache` or `druid.historical.cache.populateCache` to determine whether or not to save the results of this query to the query cache | +|`useResultLevelCache`| `true` | Flag indicating whether to leverage the result level cache for this query. When set to false, it disables reading from the query cache for this query. When set to true, Druid uses `druid.broker.cache.useResultLevelCache` to determine whether or not to read from the result-level query cache | +|`populateResultLevelCache` | `true` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateResultLevelCache` to determine whether or not to save the results of this query to the result-level query cache | +|`bySegment` | `false` | Native queries only. Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | +|`finalize` | `N/A` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator returns the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | +|`maxScatterGatherBytes`| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.md#broker) for more details.| +|`maxQueuedBytes` | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.| +|`maxSubqueryRows`| `druid.server.http.maxSubqueryRows` | Upper limit on the number of rows a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.| +|`maxSubqueryBytes`| `druid.server.http.maxSubqueryBytes` | Upper limit on the number of bytes a subquery can generate. See [Broker configuration](../configuration/index.md#broker) and [subquery guardrails](../configuration/index.md#Guardrails for materialization of subqueries) for more details.| +|`serializeDateTimeAsLong`| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process| +|`serializeDateTimeAsLongInner`| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process| +|`enableParallelMerge`|`true`|Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.md#broker) for more details.| +|`parallelMergeParallelism`|`druid.processing.merge.pool.parallelism`|Maximum number of parallel threads to use for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details.| +|`parallelMergeInitialYieldRows`|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.md#broker) for more details.| +|`parallelMergeSmallBatchRows`|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.md#broker) for more details.| +|`useFilterCNF`|`false`| If true, Druid will attempt to convert the query filter to Conjunctive Normal Form (CNF). During query processing, columns can be pre-filtered by intersecting the bitmap indexes of all values that match the eligible filters, often greatly reducing the raw number of rows which need to be scanned. But this effect only happens for the top level filter, or individual clauses of a top level 'and' filter. As such, filters in CNF potentially have a higher chance to utilize a large amount of bitmap indexes on string columns during pre-filtering. However, this setting should be used with great caution, as it can sometimes have a negative effect on performance, and in some cases, the act of computing CNF of a filter can be expensive. We recommend hand tuning your filters to produce an optimal form if possible, or at least verifying through experimentation that using this parameter actually improves your query performance with no ill-effects.| +|`secondaryPartitionPruning`|`true`|Enable secondary partition pruning on the Broker. The Broker will always prune unnecessary segments from the input scan based on a filter on time intervals, but if the data is further partitioned with hash or range partitioning, this option will enable additional pruning based on a filter on secondary partition dimensions.| +|`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | +|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | +|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| +|`includeTrailerHeader`|`false`|Whether the response can include trailer headers, which are currently used to indicate any error messages in the case of a chunked response, if any.| ## Parameters by query type From 1562a9ea828e6a631c08fb617f57436a0a5350d3 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Thu, 22 Aug 2024 23:03:52 -0700 Subject: [PATCH 4/8] Addressed review comments. Updated docs and unit tests. --- docs/querying/query-context.md | 1 - .../org/apache/druid/query/QueryContexts.java | 2 - .../apache/druid/server/QueryResource.java | 4 +- .../druid/server/QueryResultPusher.java | 29 ++-- .../druid/server/QueryResourceTest.java | 130 +++++++++--------- .../druid/server/QueryResultPusherTest.java | 3 +- .../apache/druid/sql/http/SqlResource.java | 4 +- 7 files changed, 81 insertions(+), 92 deletions(-) diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index 2aba725dc7c0..d5ddea04f27d 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -66,7 +66,6 @@ See [SQL query context](sql-query-context.md) for other query context parameters |`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| -|`includeTrailerHeader`|`false`|Whether the response can include trailer headers, which are currently used to indicate any error messages in the case of a chunked response, if any.| ## Parameters by query type diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index 52b10edd6f30..61520a04bc28 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -93,8 +93,6 @@ public class QueryContexts // query's runtime public static final String QUERY_RESOURCE_ID = "queryResourceId"; - public static final String INCLUDE_TRAILER_HEADER = "includeTrailerHeader"; - // SQL query context keys public static final String CTX_SQL_QUERY_ID = BaseQuery.SQL_QUERY_ID; public static final String CTX_SQL_STRINGIFY_ARRAYS = "sqlStringifyArrays"; diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index ebd78e238865..74348c4f7ca4 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -93,6 +93,7 @@ public class QueryResource implements QueryCountStatsProvider public static final String HEADER_IF_NONE_MATCH = "If-None-Match"; public static final String QUERY_ID_RESPONSE_HEADER = "X-Druid-Query-Id"; public static final String ERROR_MESSAGE_TRAILER_HEADER = "X-Error-Message"; + public static final String RESPONSE_COMPLETE_TRAILER_HEADER = "X-Druid-Response-Complete"; public static final String HEADER_ETAG = "ETag"; protected final QueryLifecycleFactory queryLifecycleFactory; @@ -493,8 +494,7 @@ public QueryResourceQueryResultPusher( QueryResource.this.counter, queryLifecycle.getQueryId(), MediaType.valueOf(io.getResponseWriter().getResponseType()), - ImmutableMap.of(), - queryLifecycle.getQuery().context().getBoolean(QueryContexts.INCLUDE_TRAILER_HEADER, false) + ImmutableMap.of() ); this.req = req; this.queryLifecycle = queryLifecycle; diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index aaea085fb659..488385a4c522 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -56,6 +56,7 @@ public abstract class QueryResultPusher { private static final Logger log = new Logger(QueryResultPusher.class); + protected static final String RESULT_TRAILER_HEADERS = QueryResource.ERROR_MESSAGE_TRAILER_HEADER + "," + QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER; private final HttpServletRequest request; private final String queryId; @@ -65,7 +66,6 @@ public abstract class QueryResultPusher private final QueryResource.QueryMetricCounter counter; private final MediaType contentType; private final Map extraHeaders; - private final boolean includeTrailerHeader; private final HttpFields trailerFields; private StreamingHttpResponseAccumulator accumulator; @@ -80,8 +80,7 @@ public QueryResultPusher( QueryResource.QueryMetricCounter counter, String queryId, MediaType contentType, - Map extraHeaders, - boolean includeTrailerHeader + Map extraHeaders ) { this.request = request; @@ -92,7 +91,6 @@ public QueryResultPusher( this.counter = counter; this.contentType = contentType; this.extraHeaders = extraHeaders; - this.includeTrailerHeader = includeTrailerHeader; this.trailerFields = new HttpFields(); } @@ -127,11 +125,8 @@ public Response push() final Response.ResponseBuilder startResponse = resultsWriter.start(); if (startResponse != null) { - startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId); - - if (includeTrailerHeader) { - startResponse.header(HttpHeader.TRAILER.toString(), QueryResource.ERROR_MESSAGE_TRAILER_HEADER); - } + startResponse.header(QueryResource.QUERY_ID_RESPONSE_HEADER, queryId) + .header(HttpHeader.TRAILER.toString(), RESULT_TRAILER_HEADERS); for (Map.Entry entry : extraHeaders.entrySet()) { startResponse.header(entry.getKey(), entry.getValue()); @@ -155,11 +150,15 @@ public Response push() response.setHeader(entry.getKey(), entry.getValue()); } - if (includeTrailerHeader && response instanceof org.eclipse.jetty.server.Response) { + if (response instanceof org.eclipse.jetty.server.Response) { org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response; - jettyResponse.setHeader(HttpHeader.TRAILER.toString(), QueryResource.ERROR_MESSAGE_TRAILER_HEADER); + jettyResponse.setHeader(HttpHeader.TRAILER.toString(), RESULT_TRAILER_HEADERS); jettyResponse.setTrailers(() -> trailerFields); + + // Start with complete status + + trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, "true"); } accumulator = new StreamingHttpResponseAccumulator(queryResponse.getResponseContext(), resultsWriter); @@ -242,10 +241,8 @@ private Response handleDruidException(ResultsWriter resultsWriter, DruidExceptio // also throwing the exception body into the response to make it easier for the client to choke if it manages // to parse a meaningful object out, but that's potentially an API change so we leave that as an exercise for // the future. - - if (includeTrailerHeader) { - trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, e.getMessage()); - } + trailerFields.put(QueryResource.ERROR_MESSAGE_TRAILER_HEADER, e.getMessage()); + trailerFields.put(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER, "false"); return null; } } @@ -441,7 +438,7 @@ public void initialize() response.setHeader(QueryResource.HEADER_RESPONSE_CONTEXT, serializationResult.getResult()); response.setContentType(contentType.toString()); - if (includeTrailerHeader && response instanceof org.eclipse.jetty.server.Response) { + if (response instanceof org.eclipse.jetty.server.Response) { org.eclipse.jetty.server.Response jettyResponse = (org.eclipse.jetty.server.Response) response; jettyResponse.setTrailers(() -> trailerFields); } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index f7f7ffe26724..12304e5c61d4 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -53,7 +53,6 @@ import org.apache.druid.query.MapQueryToolChestWarehouse; import org.apache.druid.query.Query; import org.apache.druid.query.QueryCapacityExceededException; -import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryRunner; @@ -268,8 +267,7 @@ public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException { final String overrideConfigKey = "priority"; final String overrideConfigValue = "678"; - DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue, - QueryContexts.INCLUDE_TRAILER_HEADER, true)); + DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(overrideConfigKey, overrideConfigValue)); queryResource = new QueryResource( new QueryLifecycleFactory( WAREHOUSE, @@ -315,10 +313,6 @@ public void testGoodQueryWithQueryConfigOverrideDefault() throws IOException overrideConfigValue, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey) ); - Assert.assertEquals( - true, - testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(QueryContexts.INCLUDE_TRAILER_HEADER) - ); } @Test @@ -392,15 +386,11 @@ public QueryRunner getQueryRunnerForSegments( overrideConfigValue, testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().get(overrideConfigKey) ); - Assert.assertFalse( - testRequestLogger.getNativeQuerylogs().get(0).getQuery().getContext().containsKey(QueryContexts.INCLUDE_TRAILER_HEADER)); } @Test public void testResponseWithIncludeTrailerHeader() throws IOException { - DefaultQueryConfig overrideConfig = new DefaultQueryConfig(ImmutableMap.of(QueryContexts.INCLUDE_TRAILER_HEADER, true)); - queryResource = new QueryResource( new QueryLifecycleFactory( WAREHOUSE, @@ -449,7 +439,7 @@ public QueryRunner getQueryRunnerForSegments( testRequestLogger, new AuthConfig(), AuthTestUtils.TEST_AUTHORIZER_MAPPER, - Suppliers.ofInstance(overrideConfig) + Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) ), jsonMapper, smileMapper, @@ -468,86 +458,94 @@ public QueryRunner getQueryRunnerForSegments( null /*pretty*/, testServletRequest)); Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString())); - Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()), QueryResource.ERROR_MESSAGE_TRAILER_HEADER); + Assert.assertEquals(response.getHeader(HttpHeader.TRAILER.toString()), QueryResultPusher.RESULT_TRAILER_HEADERS); final HttpFields fields = response.getTrailers().get(); Assert.assertTrue(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER)); Assert.assertEquals(fields.get(QueryResource.ERROR_MESSAGE_TRAILER_HEADER), "Query did not complete within configured timeout period. You can increase query timeout or tune the performance of query."); + + Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER)); + Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), "false"); } @Test - public void testResponseWithoutIncludeTrailerHeader() throws IOException + public void testSuccessResponseWithTrailerHeader() throws IOException { queryResource = new QueryResource( - new QueryLifecycleFactory( - WAREHOUSE, - new QuerySegmentWalker() - { - @Override - public QueryRunner getQueryRunnerForIntervals( - Query query, - Iterable intervals - ) - { - return (queryPlus, responseContext) -> new Sequence() + new QueryLifecycleFactory( + WAREHOUSE, + new QuerySegmentWalker() { @Override - public OutType accumulate(OutType initValue, Accumulator accumulator) + public QueryRunner getQueryRunnerForIntervals( + Query query, + Iterable intervals + ) { - if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) { - try { - ((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized - } - catch (IOException ignore) { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) { + try { + ((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized + } + catch (IOException ignore) { + } + } + + return initValue; } - } - throw new QueryTimeoutException(); + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + return Yielders.done(initValue, null); + } + }; } @Override - public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + public QueryRunner getQueryRunnerForSegments( + Query query, + Iterable specs + ) { - return Yielders.done(initValue, null); + throw new UnsupportedOperationException(); } - }; - } - - @Override - public QueryRunner getQueryRunnerForSegments( - Query query, - Iterable specs - ) - { - throw new UnsupportedOperationException(); - } - }, - new DefaultGenericQueryMetricsFactory(), - new NoopServiceEmitter(), - testRequestLogger, + }, + new DefaultGenericQueryMetricsFactory(), + new NoopServiceEmitter(), + testRequestLogger, + new AuthConfig(), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) + ), + jsonMapper, + smileMapper, + queryScheduler, new AuthConfig(), - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())) - ), - jsonMapper, - smileMapper, - queryScheduler, - new AuthConfig(), - null, - ResponseContextConfig.newConfig(true), - DRUID_NODE + null, + ResponseContextConfig.newConfig(true), + DRUID_NODE ); expectPermissiveHappyPathAuth(); org.eclipse.jetty.server.Response response = this.jettyResponseforRequest(testServletRequest); Assert.assertNull(queryResource.doPost(new ByteArrayInputStream( - SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), - null /*pretty*/, - testServletRequest)); - Assert.assertFalse(response.containsHeader(HttpHeader.TRAILER.toString())); - Assert.assertNull(response.getTrailers()); + SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)), + null /*pretty*/, + testServletRequest)); + Assert.assertTrue(response.containsHeader(HttpHeader.TRAILER.toString())); + + final HttpFields fields = response.getTrailers().get(); + Assert.assertFalse(fields.containsKey(QueryResource.ERROR_MESSAGE_TRAILER_HEADER)); + + Assert.assertTrue(fields.containsKey(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER)); + Assert.assertEquals(fields.get(QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER), "true"); } @Test @@ -1586,7 +1584,7 @@ private Response expectSynchronousRequestFlow( return queryResource.doPost(new ByteArrayInputStream(bytes), null, req); } - public org.eclipse.jetty.server.Response jettyResponseforRequest(MockHttpServletRequest req) throws IOException + private org.eclipse.jetty.server.Response jettyResponseforRequest(MockHttpServletRequest req) throws IOException { HttpChannel channelMock = EasyMock.mock(HttpChannel.class); HttpOutput outputMock = EasyMock.mock(HttpOutput.class); diff --git a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java index e8f8b8f8828c..5b123b7cc2ac 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResultPusherTest.java @@ -114,8 +114,7 @@ public QueryResponse getQueryResponse() counter, queryId, contentType, - extraHeaders, - false) + extraHeaders) { @Override diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java index feaaf252469e..4adea5d8d84e 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java +++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java @@ -27,7 +27,6 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.query.QueryContexts; import org.apache.druid.server.DruidNode; import org.apache.druid.server.QueryResource; import org.apache.druid.server.QueryResponse; @@ -227,8 +226,7 @@ public SqlResourceQueryResultPusher( SqlResource.QUERY_METRIC_COUNTER, sqlQueryId, MediaType.APPLICATION_JSON_TYPE, - headers, - sqlQuery.queryContext().getBoolean(QueryContexts.INCLUDE_TRAILER_HEADER, false) + headers ); this.sqlQueryId = sqlQueryId; this.stmt = stmt; From f4ed6335191d2e5549a7ff65f60a545fd88c3a50 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Tue, 3 Sep 2024 02:03:35 -0700 Subject: [PATCH 5/8] retrigger checks From fce4f48e8ddd398fca4352efce076b0da20e8c86 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Tue, 17 Sep 2024 18:04:09 -0700 Subject: [PATCH 6/8] Addressed review comments --- .../main/java/org/apache/druid/server/QueryResultPusher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index 488385a4c522..710c8ccc9199 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -56,7 +56,7 @@ public abstract class QueryResultPusher { private static final Logger log = new Logger(QueryResultPusher.class); - protected static final String RESULT_TRAILER_HEADERS = QueryResource.ERROR_MESSAGE_TRAILER_HEADER + "," + QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER; + protected static final String RESULT_TRAILER_HEADERS = QueryResource.RESPONSE_COMPLETE_TRAILER_HEADER; private final HttpServletRequest request; private final String queryId; From 398203fb023e41bc2e4515f12fe22c170de937ee Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Thu, 19 Sep 2024 11:03:45 -0700 Subject: [PATCH 7/8] Fixed style issue after merge --- .../test/java/org/apache/druid/server/QueryResourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index dc9c9f05a4c9..c0e1a19552c3 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -87,12 +87,12 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.http.HttpStatus; -import org.hamcrest.CoreMatchers; import org.easymock.EasyMock; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpOutput; +import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; From 91a72af9e2d8e7e047da8ae0281d443c15de1577 Mon Sep 17 00:00:00 2001 From: Vivek Dhiman Date: Thu, 19 Sep 2024 13:08:37 -0700 Subject: [PATCH 8/8] Fixed UT after master merge --- .../druid/server/QueryResourceTest.java | 42 +------------------ 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index c0e1a19552c3..4d827a008f3f 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -479,47 +479,7 @@ public void testSuccessResponseWithTrailerHeader() throws IOException queryResource = new QueryResource( new QueryLifecycleFactory( WAREHOUSE, - new QuerySegmentWalker() - { - @Override - public QueryRunner getQueryRunnerForIntervals( - Query query, - Iterable intervals - ) - { - return (queryPlus, responseContext) -> new Sequence() - { - @Override - public OutType accumulate(OutType initValue, Accumulator accumulator) - { - if (accumulator instanceof QueryResultPusher.StreamingHttpResponseAccumulator) { - try { - ((QueryResultPusher.StreamingHttpResponseAccumulator) accumulator).flush(); // initialized - } - catch (IOException ignore) { - } - } - - return initValue; - } - - @Override - public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) - { - return Yielders.done(initValue, null); - } - }; - } - - @Override - public QueryRunner getQueryRunnerForSegments( - Query query, - Iterable specs - ) - { - throw new UnsupportedOperationException(); - } - }, + TEST_SEGMENT_WALKER, new DefaultGenericQueryMetricsFactory(), new NoopServiceEmitter(), testRequestLogger,