From 21af9699abe6033ff279329bc824b0435e2747d8 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Aug 2016 13:48:09 -0500 Subject: [PATCH 1/5] introduce /druid/v3 query endpoint that gives query response context in response json instead of HTTP header --- .../java/io/druid/server/QueryResource.java | 156 +++++++++-------- .../java/io/druid/server/QueryResourceV3.java | 161 ++++++++++++++++++ .../src/main/java/io/druid/cli/CliBroker.java | 4 +- 3 files changed, 249 insertions(+), 72 deletions(-) create mode 100644 server/src/main/java/io/druid/server/QueryResourceV3.java diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 06681e658115..5ccc074130b8 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -241,76 +241,7 @@ public Object accumulate(Object accumulated, Object in) } ); - try { - final Query theQuery = query; - final QueryToolChest theToolChest = toolChest; - Response.ResponseBuilder builder = Response - .ok( - new StreamingOutput() - { - @Override - public void write(OutputStream outputStream) throws IOException, WebApplicationException - { - // json serializer will always close the yielder - CountingOutputStream os = new CountingOutputStream(outputStream); - jsonWriter.writeValue(os, yielder); - - os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. - os.close(); - - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .setDimension("success", "true") - .build("query/time", queryTime) - ); - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .build("query/bytes", os.getCount()) - ); - - requestLogger.log( - new RequestLogLine( - new DateTime(start), - req.getRemoteAddr(), - theQuery, - new QueryStats( - ImmutableMap.of( - "query/time", queryTime, - "query/bytes", os.getCount(), - "success", true - ) - ) - ) - ); - } - }, - contentType - ) - .header("X-Druid-Query-Id", queryId); - - //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 - //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() - //and encodes the string using ASCII, so 1 char is = 1 byte - String responseCtxString = jsonMapper.writeValueAsString(responseContext); - if (responseCtxString.length() > RESPONSE_CTX_HEADER_LEN_LIMIT) { - log.warn("Response Context truncated for id [%s] . Full context is [%s].", queryId, responseCtxString); - responseCtxString = responseCtxString.substring(0, RESPONSE_CTX_HEADER_LEN_LIMIT); - } - - return builder - .header("X-Druid-Response-Context", responseCtxString) - .build(); - } - catch (Exception e) { - // make sure to close yielder if anything happened before starting to serialize the response. - yielder.close(); - throw Throwables.propagate(e); - } - finally { - // do not close yielder here, since we do not want to close the yielder prior to - // StreamingOutput having iterated over all the results - } + return buildQueryResponse(req, query, toolChest, jsonWriter, contentType, start, yielder, responseContext); } catch (QueryInterruptedException e) { try { @@ -398,4 +329,89 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE Thread.currentThread().setName(currThreadName); } } + + protected Response buildQueryResponse( + final HttpServletRequest req, + final Query query, + final QueryToolChest toolChest, + final ObjectWriter jsonWriter, + final String contentType, + final long startTime, + final Yielder yielder, + final Map responseContext + ) + throws IOException + { + try { + Response.ResponseBuilder builder = Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + // json serializer will always close the yielder + CountingOutputStream os = new CountingOutputStream(outputStream); + + try { + jsonWriter.writeValue(os, yielder); + } finally { + os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. + os.close(); + } + + final long queryTime = System.currentTimeMillis() - startTime; + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .setDimension("success", "true") + .build("query/time", queryTime) + ); + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .build("query/bytes", os.getCount()) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(startTime), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "query/time", queryTime, + "query/bytes", os.getCount(), + "success", true + ) + ) + ) + ); + } + }, + contentType + ) + .header("X-Druid-Query-Id", query.getId()); + + //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 + //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() + //and encodes the string using ASCII, so 1 char is = 1 byte + String responseCtxString = jsonMapper.writeValueAsString(responseContext); + if (responseCtxString.length() > RESPONSE_CTX_HEADER_LEN_LIMIT) { + log.warn("Response Context truncated for id [%s] . Full context is [%s].", query.getId(), responseCtxString); + responseCtxString = responseCtxString.substring(0, RESPONSE_CTX_HEADER_LEN_LIMIT); + } + + return builder + .header("X-Druid-Response-Context", responseCtxString) + .build(); + } + catch (Exception e) { + // make sure to close yielder if anything happened before starting to serialize the response. + yielder.close(); + throw Throwables.propagate(e); + } + finally { + // do not close yielder here, since we do not want to close the yielder prior to + // StreamingOutput having iterated over all the results + } + } } diff --git a/server/src/main/java/io/druid/server/QueryResourceV3.java b/server/src/main/java/io/druid/server/QueryResourceV3.java new file mode 100644 index 000000000000..8904727f5c8e --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryResourceV3.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.CountingOutputStream; +import com.google.inject.Inject; +import com.metamx.common.guava.Yielder; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.query.DruidMetrics; +import io.druid.query.Query; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.server.security.AuthConfig; +import org.joda.time.DateTime; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Path; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + */ +@Path("/druid/v3/") +public class QueryResourceV3 extends QueryResource +{ + private final ServiceEmitter emitter; + private final ObjectMapper jsonMapper; + private final RequestLogger requestLogger; + + @Inject + public QueryResourceV3( + QueryToolChestWarehouse warehouse, + ServerConfig config, + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QuerySegmentWalker texasRanger, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryManager queryManager, + AuthConfig authConfig + ) + { + super(warehouse, config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager, authConfig); + this.emitter = emitter; + this.jsonMapper = jsonMapper; + this.requestLogger = requestLogger; + } + + @Override + protected Response buildQueryResponse( + final HttpServletRequest req, + final Query query, + final QueryToolChest toolChest, + final ObjectWriter jsonWriter, + final String contentType, + final long startTime, + final Yielder yielder, + final Map responseContext + ) + throws IOException + { + try { + return Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + CountingOutputStream os = new CountingOutputStream(outputStream); + + //jsonSerializer would close the yielder. + try { + jsonWriter.writeValue( + os, + ImmutableMap.of( + "result", yielder, + "context", responseContext + ) + ); + } finally { + os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. + os.close(); + } + + final long queryTime = System.currentTimeMillis() - startTime; + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .setDimension("success", "true") + .build("query/time", queryTime) + ); + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .build("query/bytes", os.getCount()) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(startTime), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "query/time", queryTime, + "query/bytes", os.getCount(), + "success", true + ) + ) + ) + ); + } + }, + contentType + ) + .header("X-Druid-Query-Id", query.getId()) + .build(); + } + catch (Exception e) { + // make sure to close yielder if anything happened before starting to serialize the response. + yielder.close(); + throw Throwables.propagate(e); + } + finally { + // do not close yielder here, since we do not want to close the yielder prior to + // StreamingOutput having iterated over all the results + } + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 3748c832bdaa..a74aaf39aac3 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -39,14 +39,13 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; -import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.lookup.LookupModule; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; import io.druid.server.QueryResource; +import io.druid.server.QueryResourceV3; import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.http.BrokerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -102,6 +101,7 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, QueryResourceV3.class); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); LifecycleModule.register(binder, QueryResource.class); From 834d0d1a9b9ccb70c8daf2e6bacab275caedbc23 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Aug 2016 22:02:48 -0500 Subject: [PATCH 2/5] define static vars for result and context in QueryResourceV3 --- server/src/main/java/io/druid/server/QueryResourceV3.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/QueryResourceV3.java b/server/src/main/java/io/druid/server/QueryResourceV3.java index 8904727f5c8e..f143d521409e 100644 --- a/server/src/main/java/io/druid/server/QueryResourceV3.java +++ b/server/src/main/java/io/druid/server/QueryResourceV3.java @@ -56,6 +56,9 @@ @Path("/druid/v3/") public class QueryResourceV3 extends QueryResource { + public static final String KEY_RESULT = "result"; + public static final String KEY_CONTEXT = "context"; + private final ServiceEmitter emitter; private final ObjectMapper jsonMapper; private final RequestLogger requestLogger; @@ -107,8 +110,8 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE jsonWriter.writeValue( os, ImmutableMap.of( - "result", yielder, - "context", responseContext + KEY_RESULT, yielder, + KEY_CONTEXT, responseContext ) ); } finally { From d04b19898c3ebbde4da86e0b8374338255b460fb Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Aug 2016 22:03:24 -0500 Subject: [PATCH 3/5] JsonParserV3ResponseIterator and its UTs --- .../io/druid/client/DirectDruidClient.java | 138 +++++++++++++++++- .../JsonParserV3ResponseIteratorTest.java | 116 +++++++++++++++ 2 files changed, 252 insertions(+), 2 deletions(-) create mode 100644 server/src/test/java/io/druid/client/JsonParserV3ResponseIteratorTest.java diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index efceff75fb29..5d368b9661e5 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -29,7 +29,6 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Charsets; -import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.ByteSource; @@ -54,7 +53,6 @@ import com.metamx.http.client.response.StatusResponseHolder; import io.druid.query.BaseQuery; import io.druid.query.BySegmentResultValueClass; -import io.druid.query.DruidMetrics; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; import io.druid.query.QueryRunner; @@ -63,6 +61,7 @@ import io.druid.query.QueryWatcher; import io.druid.query.Result; import io.druid.query.aggregation.MetricManipulatorFns; +import io.druid.server.QueryResourceV3; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.handler.codec.http.HttpChunk; @@ -503,4 +502,139 @@ public void close() throws IOException } } } + + // Mostly same as JsonParserIterator with updates to handle v3 query response of type + // { "result": [....], "context": { .... } } + // adds all the context entries from response into the Map responseContext passed to it. + // against the key + static class JsonParserV3ResponseIterator implements Iterator, Closeable + { + private JsonParser jp; + private ObjectCodec objectCodec; + private final JavaType typeRef; + private final Future future; + private final String url; + + private final ObjectMapper objectMapper; + private final String host; + private final Map responseContext; + + public JsonParserV3ResponseIterator( + JavaType typeRef, + Future future, + String url, + ObjectMapper objectMapper, + String host, + Map responseContext + ) + { + this.typeRef = typeRef; + this.future = future; + this.url = url; + jp = null; + this.objectMapper = objectMapper; + this.host = host; + this.responseContext = responseContext; + } + + @Override + public boolean hasNext() + { + init(); + + if (jp.isClosed()) { + return false; + } + + if (jp.getCurrentToken() == JsonToken.END_ARRAY) { + // now we are finised reading all the result values. + try { + jp.nextToken(); //read off FIELD_NAME token for "context" + jp.nextToken(); + Map ctx = objectCodec.readValue(jp, Map.class); + responseContext.put(host, ctx); + jp.nextToken(); //read off END_OBJECT token + jp.close(); + } catch (IOException ex) { + throw Throwables.propagate(ex); + } + + return false; + } + + + return true; + } + + @Override + public T next() + { + init(); + try { + final T retVal = objectCodec.readValue(jp, typeRef); + jp.nextToken(); + return retVal; + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + private void init() + { + if (jp == null) { + try { + jp = objectMapper.getFactory().createParser(future.get()); + if (jp.nextToken() == JsonToken.START_OBJECT) { + if (jp.nextToken() == JsonToken.FIELD_NAME) { + if (QueryResourceV3.KEY_RESULT.equals(jp.getCurrentName())) { + if (jp.nextToken() == JsonToken.START_ARRAY) { + jp.nextToken(); + objectCodec = jp.getCodec(); + } else { + throw new IAE("result must be array, token was[%s] from url [%s]", jp.getCurrentToken(), url); + } + } else { + QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); + //case we get an exception with an unknown message. + if (cause.isNotKnown()) { + throw new QueryInterruptedException( + QueryInterruptedException.UNKNOWN_EXCEPTION, + cause.getMessage(), + host + ); + } else { + throw new QueryInterruptedException(cause, host); + } + } + } else { + throw new IAE("Next token wasn't a FIELD_NAME, was[%s] from url [%s]", jp.getCurrentToken(), url); + } + } else { + throw new IAE("expecting json object, was[%s] from url [%s]", jp.getCurrentToken(), url); + } + } + catch (IOException | InterruptedException | ExecutionException e) { + throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage()); + } + catch (CancellationException e) { + throw new QueryInterruptedException(e, host); + } + } + } + + @Override + public void close() throws IOException + { + if (jp != null) { + jp.close(); + } + } + } } diff --git a/server/src/test/java/io/druid/client/JsonParserV3ResponseIteratorTest.java b/server/src/test/java/io/druid/client/JsonParserV3ResponseIteratorTest.java new file mode 100644 index 000000000000..6c4d8c99f81c --- /dev/null +++ b/server/src/test/java/io/druid/client/JsonParserV3ResponseIteratorTest.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import io.druid.query.QueryInterruptedException; +import io.druid.segment.TestHelper; +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +/** + */ +public class JsonParserV3ResponseIteratorTest +{ + @Test + public void testSimpleResponse() throws Exception + { + String response = "{\n" + + " \"result\": [\"v1\", \"v2\"],\n" + + " \"context\": {\n" + + " \"k\": \"v\"\n" + + " }\n" + + "}"; + + doTest(response, ImmutableList.of("v1", "v2"), ImmutableMap.of("k", "v")); + } + + @Test + public void testEmptyResponse() throws Exception + { + String response = "{\n" + + " \"result\": [],\n" + + " \"context\": { }\n" + + "}"; + + doTest(response, Collections.emptyList(), Collections.emptyMap()); + } + + @Test (expected = QueryInterruptedException.class) + public void testErrorResponse() throws Exception + { + String response = "{\n" + + " \"error\": \"Query timeout\"\n" + + "}"; + doTest(response, null, null); + } + + private void doTest(String response, List expectedValues, Map expectedContext) + throws Exception + { + ObjectMapper objectMapper = TestHelper.JSON_MAPPER; + + final TypeFactory typeFactory = objectMapper.getTypeFactory(); + JavaType baseType = typeFactory.constructType(new TypeReference(){}); + + Future responseFuture = Futures.immediateFuture( + IOUtils.toInputStream(response) + ); + + Map context = new HashMap<>(); + + DirectDruidClient.JsonParserV3ResponseIterator parser = new DirectDruidClient.JsonParserV3ResponseIterator<>( + baseType, + responseFuture, + "http://dummy/", + objectMapper, + "host1", + context + ); + + List resultValues = new ArrayList<>(); + while (parser.hasNext()) { + resultValues.add(parser.next()); + } + + Assert.assertEquals(resultValues, expectedValues); + Assert.assertEquals(context, ImmutableMap.of("host1", expectedContext)); + + Assert.assertEquals(responseFuture.get().available(), 0); + } +} From f49c3287a45eaebda87c5414bd0335efe47672ce Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 3 Aug 2016 22:19:25 -0500 Subject: [PATCH 4/5] configuration to enable v3 query url use in DirectDruidClient at the Broker --- .../io/druid/client/BrokerServerView.java | 17 ++++-- .../io/druid/client/DirectDruidClient.java | 53 +++++++++++++------ .../druid/client/DirectDruidClientConfig.java | 38 +++++++++++++ .../io/druid/client/BrokerServerViewTest.java | 3 +- .../druid/client/DirectDruidClientTest.java | 12 +++-- .../src/main/java/io/druid/cli/CliBroker.java | 2 + .../main/java/io/druid/cli/CliHistorical.java | 2 + 7 files changed, 104 insertions(+), 23 deletions(-) create mode 100644 server/src/main/java/io/druid/client/DirectDruidClientConfig.java diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 95826bb60ca5..e7425d1b940b 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -44,7 +44,6 @@ import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; -import javax.annotation.Nullable; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -72,6 +71,8 @@ public class BrokerServerView implements TimelineServerView private final ServiceEmitter emitter; private final Predicate> segmentFilter; + private final DirectDruidClientConfig directDruidClientConfig; + private volatile boolean initialized = false; @Inject @@ -83,7 +84,8 @@ public BrokerServerView( FilteredServerInventoryView baseView, TierSelectorStrategy tierSelectorStrategy, ServiceEmitter emitter, - final BrokerSegmentWatcherConfig segmentWatcherConfig + final BrokerSegmentWatcherConfig segmentWatcherConfig, + final DirectDruidClientConfig directDruidClientConfig ) { this.warehouse = warehouse; @@ -93,6 +95,7 @@ public BrokerServerView( this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; + this.directDruidClientConfig = directDruidClientConfig; this.clients = Maps.newConcurrentMap(); this.selectors = Maps.newHashMap(); this.timelines = Maps.newHashMap(); @@ -200,7 +203,15 @@ private QueryableDruidServer addServer(DruidServer server) private DirectDruidClient makeDirectClient(DruidServer server) { - return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost(), emitter); + return new DirectDruidClient( + warehouse, + queryWatcher, + smileMapper, + httpClient, + server.getHost(), + emitter, + directDruidClientConfig.isUseV3QueryUrl() + ); } private QueryableDruidServer removeServer(DruidServer server) diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 5d368b9661e5..e62fb6050a5a 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -105,13 +105,16 @@ public class DirectDruidClient implements QueryRunner private final AtomicInteger openConnections; private final boolean isSmile; + private final boolean useV3QueryUrl; + public DirectDruidClient( QueryToolChestWarehouse warehouse, QueryWatcher queryWatcher, ObjectMapper objectMapper, HttpClient httpClient, String host, - ServiceEmitter emitter + ServiceEmitter emitter, + boolean useV3QueryUrl ) { this.warehouse = warehouse; @@ -120,6 +123,7 @@ public DirectDruidClient( this.httpClient = httpClient; this.host = host; this.emitter = emitter; + this.useV3QueryUrl = useV3QueryUrl; this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory; this.openConnections = new AtomicInteger(); @@ -155,7 +159,7 @@ public Sequence run(final Query query, final Map context) } final ListenableFuture future; - final String url = String.format("http://%s/druid/v2/", host); + final String url = String.format("http://%s/druid/%s/", host, useV3QueryUrl ? "v3" : "v2"); final String cancelUrl = String.format("http://%s/druid/v2/%s", host, query.getId()); try { @@ -385,22 +389,42 @@ public void onFailure(Throwable t) throw Throwables.propagate(e); } - Sequence retVal = new BaseSequence<>( - new BaseSequence.IteratorMaker>() - { - @Override - public JsonParserIterator make() + Sequence retVal; + if (useV3QueryUrl) { + retVal = new BaseSequence<>( + new BaseSequence.IteratorMaker>() { - return new JsonParserIterator(typeRef, future, url); - } + @Override + public JsonParserV3ResponseIterator make() + { + return new JsonParserV3ResponseIterator(typeRef, future, url, objectMapper, host, context); + } - @Override - public void cleanup(JsonParserIterator iterFromMake) + @Override + public void cleanup(JsonParserV3ResponseIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + } + ); + } else { + retVal = new BaseSequence<>( + new BaseSequence.IteratorMaker>() { - CloseQuietly.close(iterFromMake); + @Override + public JsonParserIterator make() + { + return new JsonParserIterator(typeRef, future, url); + } + + @Override + public void cleanup(JsonParserIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } } - } - ); + ); + } // bySegment queries are de-serialized after caching results in order to // avoid the cost of de-serializing and then re-serializing again when adding to cache @@ -554,7 +578,6 @@ public boolean hasNext() Map ctx = objectCodec.readValue(jp, Map.class); responseContext.put(host, ctx); jp.nextToken(); //read off END_OBJECT token - jp.close(); } catch (IOException ex) { throw Throwables.propagate(ex); } diff --git a/server/src/main/java/io/druid/client/DirectDruidClientConfig.java b/server/src/main/java/io/druid/client/DirectDruidClientConfig.java new file mode 100644 index 000000000000..19bfc8d40ea4 --- /dev/null +++ b/server/src/main/java/io/druid/client/DirectDruidClientConfig.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +public class DirectDruidClientConfig +{ + @JsonProperty + private boolean useV3QueryUrl = false; + + public boolean isUseV3QueryUrl() + { + return useV3QueryUrl; + } +} diff --git a/server/src/test/java/io/druid/client/BrokerServerViewTest.java b/server/src/test/java/io/druid/client/BrokerServerViewTest.java index 8e2282439350..11989eeaaeb6 100644 --- a/server/src/test/java/io/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/io/druid/client/BrokerServerViewTest.java @@ -336,7 +336,8 @@ public CallbackAction segmentViewInitialized() baseView, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), new NoopServiceEmitter(), - new BrokerSegmentWatcherConfig() + new BrokerSegmentWatcherConfig(), + new DirectDruidClientConfig() ); baseView.start(); diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index 1b222539f060..b31d300697f5 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -122,7 +122,8 @@ public void testRun() throws Exception new DefaultObjectMapper(), httpClient, "foo", - new NoopServiceEmitter() + new NoopServiceEmitter(), + false ); DirectDruidClient client2 = new DirectDruidClient( new ReflectionQueryToolChestWarehouse(), @@ -130,7 +131,8 @@ public void testRun() throws Exception new DefaultObjectMapper(), httpClient, "foo2", - new NoopServiceEmitter() + new NoopServiceEmitter(), + false ); QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( @@ -232,7 +234,8 @@ public void testCancel() throws Exception new DefaultObjectMapper(), httpClient, "foo", - new NoopServiceEmitter() + new NoopServiceEmitter(), + false ); QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer( @@ -301,7 +304,8 @@ public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingExce new DefaultObjectMapper(), httpClient, hostName, - new NoopServiceEmitter() + new NoopServiceEmitter(), + false ); QueryableDruidServer queryableDruidServer = new QueryableDruidServer( diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index a74aaf39aac3..8f79877fedbe 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -28,6 +28,7 @@ import io.druid.client.BrokerSegmentWatcherConfig; import io.druid.client.BrokerServerView; import io.druid.client.CachingClusteredClient; +import io.druid.client.DirectDruidClientConfig; import io.druid.client.TimelineServerView; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.CacheMonitor; @@ -96,6 +97,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.broker.balancer", ServerSelectorStrategy.class); JsonConfigProvider.bind(binder, "druid.broker.retryPolicy", RetryQueryRunnerConfig.class); JsonConfigProvider.bind(binder, "druid.broker.segment", BrokerSegmentWatcherConfig.class); + JsonConfigProvider.bind(binder, "druid.broker.httpClient", DirectDruidClientConfig.class); binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index b6b6ad34f644..cb2f1dbd868d 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -37,6 +37,7 @@ import io.druid.query.QuerySegmentWalker; import io.druid.query.lookup.LookupModule; import io.druid.server.QueryResource; +import io.druid.server.QueryResourceV3; import io.druid.server.coordination.ServerManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.server.http.HistoricalResource; @@ -82,6 +83,7 @@ public void configure(Binder binder) binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical")); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, QueryResourceV3.class); Jerseys.addResource(binder, HistoricalResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, ZkCoordinator.class); From aa40da506be8c7efaf5d97c7dc03842ae0e76f84 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 5 Aug 2016 23:47:17 -0500 Subject: [PATCH 5/5] dump query performance stats in response context --- .../main/java/io/druid/query/BaseQuery.java | 5 + .../query/MetricsEmittingQueryRunner.java | 21 +- .../java/io/druid/query/QueryContextKeys.java | 1 + .../java/io/druid/query/QueryPerfStats.java | 196 ++++++++++++++++++ .../io/druid/client/DirectDruidClient.java | 29 +-- .../java/io/druid/server/QueryResource.java | 15 ++ .../java/io/druid/server/QueryResourceV3.java | 50 ++++- 7 files changed, 298 insertions(+), 19 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/QueryPerfStats.java diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 69f297a48618..9f3e0abac23e 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -67,6 +67,11 @@ public static int getContextUncoveredIntervalsLimit(Query query, int defa return parseInt(query, "uncoveredIntervalsLimit", defaultValue); } + public static int getContextDumpPerf(Query query, int defaultValue) + { + return parseInt(query, QueryContextKeys.DUMP_QUERY_PERF, defaultValue); + } + private static int parseInt(Query query, String key, int defaultValue) { Object val = query.getContextValue(key); diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index a7633705d36a..59f5906ef654 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -20,8 +20,6 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.base.Strings; -import com.google.common.collect.Maps; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Yielder; @@ -36,6 +34,8 @@ */ public class MetricsEmittingQueryRunner implements QueryRunner { + private static final String METRIC_QUERY_WAIT_TIME = "query/wait/time"; + private final ServiceEmitter emitter; private final Function, ServiceMetricEvent.Builder> builderFn; private final QueryRunner queryRunner; @@ -112,12 +112,27 @@ public OutType accumulate(OutType outType, Accumulator acc throw e; } finally { + MetricsEmittingQueryRunnerStats stats = null; + QueryPerfStats perfStats = (QueryPerfStats) responseContext.get(QueryPerfStats.KEY_CTX); + if (perfStats != null) { + stats = new MetricsEmittingQueryRunnerStats(userDimensions); + perfStats.addMetricsEmittingQueryRunnerStats(stats); + } + + long timeTaken = System.currentTimeMillis() - startTime; emitter.emit(builder.build(metricName, timeTaken)); + if (stats != null) { + stats.addMetric(metricName, timeTaken); + } if (creationTime > 0) { - emitter.emit(builder.build("query/wait/time", startTime - creationTime)); + long waitTime = startTime - creationTime; + emitter.emit(builder.build(METRIC_QUERY_WAIT_TIME, waitTime)); + if (stats != null) { + stats.addMetric(METRIC_QUERY_WAIT_TIME, waitTime); + } } } diff --git a/processing/src/main/java/io/druid/query/QueryContextKeys.java b/processing/src/main/java/io/druid/query/QueryContextKeys.java index 480dcd551f4f..10863da73303 100644 --- a/processing/src/main/java/io/druid/query/QueryContextKeys.java +++ b/processing/src/main/java/io/druid/query/QueryContextKeys.java @@ -24,4 +24,5 @@ public class QueryContextKeys public static final String PRIORITY = "priority"; public static final String TIMEOUT = "timeout"; public static final String CHUNK_PERIOD = "chunkPeriod"; + public static final String DUMP_QUERY_PERF = "dumpQueryPerf"; } diff --git a/processing/src/main/java/io/druid/query/QueryPerfStats.java b/processing/src/main/java/io/druid/query/QueryPerfStats.java new file mode 100644 index 000000000000..938423939c9f --- /dev/null +++ b/processing/src/main/java/io/druid/query/QueryPerfStats.java @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class QueryPerfStats +{ + public static final String KEY_CTX = "queryPerfStats"; + + private long queryResultTime; + private long queryResultBytes; + + //holders for historical/realtime stats as reported from DirectDruidClient + private Map serverPerf; + + //holders for per segment stats on historical/realtime node + private List metricsEmittingQueryRunnerStats; + + //limit maximum number of per segment stats, to keep context size bounded + private final int maxNumMetricsEmittingQueryRunnerStats; + + public QueryPerfStats(int maxNumMetricsEmittingQueryRunnerStats) + { + this.maxNumMetricsEmittingQueryRunnerStats = maxNumMetricsEmittingQueryRunnerStats; + } + + @JsonProperty + public long getQueryResultTime() + { + return queryResultTime; + } + + @JsonProperty + public long getQueryResultBytes() + { + return queryResultBytes; + } + + @JsonProperty + public Map getServerPerf() + { + return serverPerf; + } + + @JsonProperty + public List getMetricsEmittingQueryRunnerStats() + { + return metricsEmittingQueryRunnerStats; + } + + public void updateQueryResultTime(long queryTime) { + this.queryResultTime = queryTime; + } + + public void updateQueryResultBytes(long queryBytes) { + this.queryResultBytes = queryBytes; + } + + public void updateServerTime(String host, long timeTaken) + { + getServerPerfStats(host).setQueryNodeTime(timeTaken); + } + + public void updateServerBytes(String host, long l) + { + getServerPerfStats(host).setQueryNodeBytes(l); + } + + public void updateServerTTFB(String host, long timeTaken) + { + getServerPerfStats(host).setQueryNodeTTFB(timeTaken); + } + + public void addMetricsEmittingQueryRunnerStats(MetricsEmittingQueryRunnerStats stats) + { + if (metricsEmittingQueryRunnerStats == null) { + metricsEmittingQueryRunnerStats = new ArrayList<>(); + } + + //its possible for multiple threads to end up putting more entries than maxNumMetricsEmittingQueryRunnerStats + //but that is OK, limit does not have to be so strict. + if (metricsEmittingQueryRunnerStats.size() < maxNumMetricsEmittingQueryRunnerStats) { + metricsEmittingQueryRunnerStats.add(stats); + } + } + + private ServerPerfStats getServerPerfStats(String host) + { + //This is only called in single thread, so concurrent Map is not necessary. + if (serverPerf == null) { + serverPerf = new HashMap<>(); + } + + ServerPerfStats sps = serverPerf.get(host); + if (sps == null) { + sps = new ServerPerfStats(); + serverPerf.put(host, sps); + } + return sps; + } +} + +class ServerPerfStats +{ + private long queryNodeTTFB; + private long queryNodeTime; + private long queryNodeBytes; + + @JsonProperty + public long getQueryNodeTTFB() + { + return queryNodeTTFB; + } + + public void setQueryNodeTTFB(long queryNodeTTFB) + { + this.queryNodeTTFB = queryNodeTTFB; + } + + @JsonProperty + public long getQueryNodeTime() + { + return queryNodeTime; + } + + public void setQueryNodeTime(long queryNodeTime) + { + this.queryNodeTime = queryNodeTime; + } + + @JsonProperty + public long getQueryNodeBytes() + { + return queryNodeBytes; + } + + public void setQueryNodeBytes(long queryNodeBytes) + { + this.queryNodeBytes = queryNodeBytes; + } +} + +class MetricsEmittingQueryRunnerStats +{ + private final Map dimensions; + private final Map metrics = new HashMap<>(); + + @JsonProperty + public Map getDimensions() + { + return dimensions; + } + + @JsonProperty + public Map getMetrics() + { + return metrics; + } + + public MetricsEmittingQueryRunnerStats(Map dimensions) + { + this.dimensions = dimensions; + } + + public void addMetric(String metricName, Object value) + { + metrics.put(metricName, value); + } +} diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index e62fb6050a5a..f452971d0280 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -55,6 +55,7 @@ import io.druid.query.BySegmentResultValueClass; import io.druid.query.Query; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryPerfStats; import io.druid.query.QueryRunner; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -182,7 +183,12 @@ public ClientResponse handleResponse(HttpResponse response) { log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); responseStartTime = System.currentTimeMillis(); - emitter.emit(builder.build("query/node/ttfb", responseStartTime - requestStartTime)); + long timeTaken = responseStartTime - requestStartTime; + emitter.emit(builder.build("query/node/ttfb", timeTaken)); + QueryPerfStats perfStats = (QueryPerfStats) context.get(QueryPerfStats.KEY_CTX); + if (perfStats != null) { + perfStats.updateServerTTFB(host, timeTaken); + } try { final String responseContext = response.headers().get("X-Druid-Response-Context"); @@ -280,8 +286,16 @@ public ClientResponse done(ClientResponse clientRespon stopTime - responseStartTime, byteCount.get() / (0.0001 * (stopTime - responseStartTime)) ); - emitter.emit(builder.build("query/node/time", stopTime - requestStartTime)); + long timeTaken = stopTime - requestStartTime; + emitter.emit(builder.build("query/node/time", timeTaken)); emitter.emit(builder.build("query/node/bytes", byteCount.get())); + + QueryPerfStats perfStats = (QueryPerfStats) context.get(QueryPerfStats.KEY_CTX); + if (perfStats != null) { + perfStats.updateServerTime(host, timeTaken); + perfStats.updateServerBytes(host, byteCount.get()); + } + synchronized (done) { try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out @@ -625,16 +639,7 @@ private void init() } } else { QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); - //case we get an exception with an unknown message. - if (cause.isNotKnown()) { - throw new QueryInterruptedException( - QueryInterruptedException.UNKNOWN_EXCEPTION, - cause.getMessage(), - host - ); - } else { - throw new QueryInterruptedException(cause, host); - } + throw new QueryInterruptedException(cause, host); } } else { throw new IAE("Next token wasn't a FIELD_NAME, was[%s] from url [%s]", jp.getCurrentToken(), url); diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 5ccc074130b8..4d06e101ca05 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -37,10 +37,12 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; +import io.druid.query.BaseQuery; import io.druid.query.DruidMetrics; import io.druid.query.Query; import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryPerfStats; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -193,6 +195,7 @@ public Response doPost( ) ); } + toolChest = warehouse.getToolChest(query); Thread.currentThread() @@ -220,6 +223,18 @@ public Response doPost( } final Map responseContext = new MapMaker().makeMap(); + + int dumpPerf = BaseQuery.getContextDumpPerf(query, 0); + if (dumpPerf > 0) { + responseContext.put(QueryPerfStats.KEY_CTX, new QueryPerfStats(dumpPerf * 10)); + query = query.withOverriddenContext( + ImmutableMap.of( + QueryContextKeys.DUMP_QUERY_PERF, + dumpPerf - 1 + ) + ); + } + final Sequence res = query.run(texasRanger, responseContext); final Sequence results; if (res == null) { diff --git a/server/src/main/java/io/druid/server/QueryResourceV3.java b/server/src/main/java/io/druid/server/QueryResourceV3.java index f143d521409e..bfc1cc3dbda4 100644 --- a/server/src/main/java/io/druid/server/QueryResourceV3.java +++ b/server/src/main/java/io/druid/server/QueryResourceV3.java @@ -22,6 +22,7 @@ package io.druid.server; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.base.Throwables; @@ -34,6 +35,7 @@ import io.druid.guice.annotations.Smile; import io.druid.query.DruidMetrics; import io.druid.query.Query; +import io.druid.query.QueryPerfStats; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChestWarehouse; @@ -109,10 +111,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE try { jsonWriter.writeValue( os, - ImmutableMap.of( - KEY_RESULT, yielder, - KEY_CONTEXT, responseContext - ) + new ResponseObj(yielder, responseContext, startTime, os) ); } finally { os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. @@ -161,4 +160,47 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE // StreamingOutput having iterated over all the results } } + + private static class ResponseObj + { + private final Yielder result; + private final Map context; + + private final long startTime; + private final CountingOutputStream stream; + + private final long id = System.currentTimeMillis(); + + ResponseObj(Yielder result, Map context, long startTime, CountingOutputStream stream) + { + this.result = result; + this.context = context; + + this.startTime = startTime; + this.stream = stream; + } + + @JsonProperty + public Yielder getResult() + { + return result; + } + + @JsonProperty + public Map getContext() + { + QueryPerfStats perfStats = (QueryPerfStats) context.get(QueryPerfStats.KEY_CTX); + + if (perfStats != null) { + //Note: It is assumed the Json Serializer serializes result first and then context. + //Or else the time reported here would be incorrect. + long queryTime = System.currentTimeMillis() - startTime; + long queryBytes = stream.getCount(); + perfStats.updateQueryResultTime(queryTime); + perfStats.updateQueryResultBytes(queryBytes); + } + + return context; + } + } }