diff --git a/docs/content/querying/querying.md b/docs/content/querying/querying.md index 4f090e8c0c6f..8804ab6a9e73 100644 --- a/docs/content/querying/querying.md +++ b/docs/content/querying/querying.md @@ -11,10 +11,35 @@ query is expressed in JSON and each of these node types expose the same REST query interface. For normal Druid operations, queries should be issued to the broker nodes. Queries can be posted to the queryable nodes like this - - ```bash - curl -X POST ':/druid/v2/?pretty' -H 'Content-Type:application/json' -d @ - ``` - +```bash +curl -X POST ':/druid/v2/?pretty' -H 'Content-Type:application/json' -d @ +``` +In response, Druid returns a JSON array of result data objects like below. For example, here is response for a sample groupBy query. + +```json +[ + { + "version" : "v1", + "timestamp" : "2012-01-01T00:00:00.000Z", + "event" : { + "country" : "US", + "name" : "test name1", + "data_transfer" : 500 + } + }, + { + "version" : "v1", + "timestamp" : "2012-01-01T00:00:00.000Z", + "event" : { + "country" : "US", + "name" : "test name2", + "data_transfer" : 354 + } + } + ... +] +``` + Druid's native query language is JSON over HTTP, although many members of the community have contributed different [client libraries](../development/libraries.html) in other languages to query Druid. @@ -22,6 +47,26 @@ Druid's native query is relatively low level, mapping closely to how computation are designed to be lightweight and complete very quickly. This means that for more complex analysis, or to build more complex visualizations, multiple Druid queries may be required. + +
+Another experimental HTTP query endpoint, `/druid/v3`. +
+Sometimes it is desirable to be able to send some contextual information back to druid client in addition to the result data objects. +Existing response format of JSON array did not allow that. So, a new HTTP endpoint is created to handle this use case. +If client sends a druid query to `/druid/v3` like +```bash +curl -X POST ':/druid/v3/?pretty' -H 'Content-Type:application/json' -d @ +``` +Then the response format is as follows. + +```json +{ + "result": [ ...... result data objects ..... ], + "context": { .... context object.... } +} +``` +JSON array contained in "result" attribute is same as the full response object from `/druid/v2` . Note that, first "results" are streamed back to the client and then context object follows. + Available Queries ----------------- diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 5267ea62f037..10ad261fd899 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -241,76 +241,7 @@ public Object accumulate(Object accumulated, Object in) } ); - try { - final Query theQuery = query; - final QueryToolChest theToolChest = toolChest; - Response.ResponseBuilder builder = Response - .ok( - new StreamingOutput() - { - @Override - public void write(OutputStream outputStream) throws IOException, WebApplicationException - { - // json serializer will always close the yielder - CountingOutputStream os = new CountingOutputStream(outputStream); - jsonWriter.writeValue(os, yielder); - - os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. - os.close(); - - final long queryTime = System.currentTimeMillis() - start; - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .setDimension("success", "true") - .build("query/time", queryTime) - ); - emitter.emit( - DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr()) - .build("query/bytes", os.getCount()) - ); - - requestLogger.log( - new RequestLogLine( - new DateTime(start), - req.getRemoteAddr(), - theQuery, - new QueryStats( - ImmutableMap.of( - "query/time", queryTime, - "query/bytes", os.getCount(), - "success", true - ) - ) - ) - ); - } - }, - contentType - ) - .header("X-Druid-Query-Id", queryId); - - //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 - //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() - //and encodes the string using ASCII, so 1 char is = 1 byte - String responseCtxString = jsonMapper.writeValueAsString(responseContext); - if (responseCtxString.length() > RESPONSE_CTX_HEADER_LEN_LIMIT) { - log.warn("Response Context truncated for id [%s] . Full context is [%s].", queryId, responseCtxString); - responseCtxString = responseCtxString.substring(0, RESPONSE_CTX_HEADER_LEN_LIMIT); - } - - return builder - .header("X-Druid-Response-Context", responseCtxString) - .build(); - } - catch (Exception e) { - // make sure to close yielder if anything happened before starting to serialize the response. - yielder.close(); - throw Throwables.propagate(e); - } - finally { - // do not close yielder here, since we do not want to close the yielder prior to - // StreamingOutput having iterated over all the results - } + return buildQueryResponse(req, query, toolChest, jsonWriter, contentType, start, yielder, responseContext); } catch (QueryInterruptedException e) { try { @@ -406,4 +337,89 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE Thread.currentThread().setName(currThreadName); } } + + protected Response buildQueryResponse( + final HttpServletRequest req, + final Query query, + final QueryToolChest toolChest, + final ObjectWriter jsonWriter, + final String contentType, + final long startTime, + final Yielder yielder, + final Map responseContext + ) + throws IOException + { + try { + Response.ResponseBuilder builder = Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + // json serializer will always close the yielder + CountingOutputStream os = new CountingOutputStream(outputStream); + + try { + jsonWriter.writeValue(os, yielder); + os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. + } finally { + os.close(); + } + + final long queryTime = System.currentTimeMillis() - startTime; + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .setDimension("success", "true") + .build("query/time", queryTime) + ); + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .build("query/bytes", os.getCount()) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(startTime), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "query/time", queryTime, + "query/bytes", os.getCount(), + "success", true + ) + ) + ) + ); + } + }, + contentType + ) + .header("X-Druid-Query-Id", query.getId()); + + //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 + //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() + //and encodes the string using ASCII, so 1 char is = 1 byte + String responseCtxString = jsonMapper.writeValueAsString(responseContext); + if (responseCtxString.length() > RESPONSE_CTX_HEADER_LEN_LIMIT) { + log.warn("Response Context truncated for id [%s] . Full context is [%s].", query.getId(), responseCtxString); + responseCtxString = responseCtxString.substring(0, RESPONSE_CTX_HEADER_LEN_LIMIT); + } + + return builder + .header("X-Druid-Response-Context", responseCtxString) + .build(); + } + catch (Exception e) { + // make sure to close yielder if anything happened before starting to serialize the response. + yielder.close(); + throw Throwables.propagate(e); + } + finally { + // do not close yielder here, since we do not want to close the yielder prior to + // StreamingOutput having iterated over all the results + } + } } diff --git a/server/src/main/java/io/druid/server/QueryResourceV3.java b/server/src/main/java/io/druid/server/QueryResourceV3.java new file mode 100644 index 000000000000..dbea8d722465 --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryResourceV3.java @@ -0,0 +1,170 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.CountingOutputStream; +import com.google.inject.Inject; +import com.metamx.common.guava.Yielder; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.query.DruidMetrics; +import io.druid.query.Query; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.server.security.AuthConfig; +import org.joda.time.DateTime; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Path; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + */ +@Path("/druid/v3/") +public class QueryResourceV3 extends QueryResource +{ + private final ServiceEmitter emitter; + private final ObjectMapper jsonMapper; + private final RequestLogger requestLogger; + + @Inject + public QueryResourceV3( + QueryToolChestWarehouse warehouse, + ServerConfig config, + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QuerySegmentWalker texasRanger, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryManager queryManager, + AuthConfig authConfig + ) + { + super(warehouse, config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager, authConfig); + this.emitter = emitter; + this.jsonMapper = jsonMapper; + this.requestLogger = requestLogger; + } + + @Override + protected Response buildQueryResponse( + final HttpServletRequest req, + final Query query, + final QueryToolChest toolChest, + final ObjectWriter jsonWriter, + final String contentType, + final long startTime, + final Yielder yielder, + final Map responseContext + ) + throws IOException + { + try { + return Response + .ok( + new StreamingOutput() + { + @Override + public void write(OutputStream outputStream) throws IOException, WebApplicationException + { + CountingOutputStream os = new CountingOutputStream(outputStream); + + //jsonSerializer would close the yielder. + try { + serializeResponseToOS( + jsonWriter, + os, + ImmutableMap.of( + "result", yielder, + "context", responseContext + ) + ); + os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. + } finally { + os.close(); + } + + final long queryTime = System.currentTimeMillis() - startTime; + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .setDimension("success", "true") + .build("query/time", queryTime) + ); + emitter.emit( + DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr()) + .build("query/bytes", os.getCount()) + ); + + requestLogger.log( + new RequestLogLine( + new DateTime(startTime), + req.getRemoteAddr(), + query, + new QueryStats( + ImmutableMap.of( + "query/time", queryTime, + "query/bytes", os.getCount(), + "success", true + ) + ) + ) + ); + } + }, + contentType + ) + .header("X-Druid-Query-Id", query.getId()) + .build(); + } + catch (Exception e) { + // make sure to close yielder if anything happened before starting to serialize the response. + yielder.close(); + throw Throwables.propagate(e); + } + finally { + // do not close yielder here, since we do not want to close the yielder prior to + // StreamingOutput having iterated over all the results + } + } + + @VisibleForTesting + static void serializeResponseToOS(ObjectWriter jsonWriter, OutputStream os, Map responseObj) + throws IOException + { + jsonWriter.writeValue( + os, + responseObj + ); + } +} diff --git a/server/src/test/java/io/druid/server/QueryResourceV3Test.java b/server/src/test/java/io/druid/server/QueryResourceV3Test.java new file mode 100644 index 000000000000..9bd06819c6d7 --- /dev/null +++ b/server/src/test/java/io/druid/server/QueryResourceV3Test.java @@ -0,0 +1,195 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.server; + +import com.google.common.collect.ImmutableMap; +import com.metamx.common.guava.Yielder; +import io.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +public class QueryResourceV3Test +{ + @Test + public void testYielderCloseOnSerializationSuccess() throws Exception + { + final AtomicBoolean isClosed = new AtomicBoolean(false); + + Yielder yielder = new Yielder() + { + @Override + public Object get() + { + throw new RuntimeException(); + } + + @Override + public Yielder next(Object initValue) + { + throw new RuntimeException(); + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public void close() throws IOException + { + isClosed.set(true); + } + }; + + Map response = ImmutableMap.of( + "result", yielder, + "context", ImmutableMap.of() + ); + + try { + QueryResourceV3.serializeResponseToOS( + TestHelper.getObjectMapper().writer(), + new ByteArrayOutputStream(), + response + ); + } catch (Exception ex) { + Assert.fail(); + } + + Assert.assertTrue(isClosed.get()); + } + + @Test + public void testYielderCloseOnSerializationFailure() throws Exception + { + final AtomicBoolean isClosed = new AtomicBoolean(false); + + Yielder yielder = new Yielder() + { + @Override + public Object get() + { + throw new RuntimeException(); + } + + @Override + public Yielder next(Object initValue) + { + throw new RuntimeException(); + } + + @Override + public boolean isDone() + { + throw new RuntimeException(); + } + + @Override + public void close() throws IOException + { + isClosed.set(true); + } + }; + + Map response = ImmutableMap.of( + "result", yielder, + "context", ImmutableMap.of() + ); + + try { + QueryResourceV3.serializeResponseToOS( + TestHelper.getObjectMapper().writer(), + new ByteArrayOutputStream(), + response + ); + Assert.fail(); + } catch (Exception ex) { + } + + Assert.assertTrue(isClosed.get()); + } + + @Test + public void testYielderCloseOnOutputStreamFailure() throws Exception + { + final AtomicBoolean isClosed = new AtomicBoolean(false); + + Yielder yielder = new Yielder() + { + @Override + public Object get() + { + throw new RuntimeException(); + } + + @Override + public Yielder next(Object initValue) + { + throw new RuntimeException(); + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public void close() throws IOException + { + isClosed.set(true); + } + }; + + Map response = ImmutableMap.of( + "result", yielder, + "context", ImmutableMap.of() + ); + + try { + QueryResourceV3.serializeResponseToOS( + TestHelper.getObjectMapper().writer(), + new OutputStream() + { + @Override + public void write(int b) throws IOException + { + throw new RuntimeException("write failed."); + } + }, + response + ); + + Assert.fail(); + } catch (Exception ex) { + } + + Assert.assertTrue(isClosed.get()); + } +} diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 3748c832bdaa..a2402be7f285 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -39,14 +39,13 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; -import io.druid.query.MapQueryToolChestWarehouse; import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.lookup.LookupModule; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; import io.druid.server.QueryResource; +import io.druid.server.QueryResourceV3; import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.http.BrokerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -102,9 +101,11 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, QueryResourceV3.class); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); LifecycleModule.register(binder, QueryResource.class); + LifecycleModule.register(binder, QueryResourceV3.class); LifecycleModule.register(binder, DruidBroker.class); MetricsModule.register(binder, CacheMonitor.class);