diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index e0bb9875240d..a91959ca20bb 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -19,7 +19,7 @@ package org.apache.druid.server; -import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; @@ -62,7 +62,6 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; - import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; @@ -434,7 +433,7 @@ private boolean isSerializeDateTimeAsLong() || (!shouldFinalize && queryContext.isSerializeDateTimeAsLongInner(false)); } - public ObjectWriter newOutputWriter(ResourceIOReaderWriter ioReaderWriter) + public ObjectMapper newOutputWriter(ResourceIOReaderWriter ioReaderWriter) { return ioReaderWriter.getResponseWriter().newOutputWriter( getToolChest(), diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 2db205ca0bed..61696dd5cec3 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -19,11 +19,12 @@ package org.apache.druid.server; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SequenceWriter; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; @@ -37,6 +38,7 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.BadJsonQueryException; import org.apache.druid.query.Query; @@ -374,7 +376,7 @@ String getResponseType() return responseType; } - ObjectWriter newOutputWriter( + ObjectMapper newOutputWriter( @Nullable QueryToolChest> toolChest, @Nullable Query query, boolean serializeDateTimeAsLong @@ -387,7 +389,7 @@ ObjectWriter newOutputWriter( } else { decoratedMapper = mapper; } - return isPretty ? decoratedMapper.writerWithDefaultPrettyPrinter() : decoratedMapper.writer(); + return isPretty ? decoratedMapper.copy().enable(SerializationFeature.INDENT_OUTPUT) : decoratedMapper; } Response ok(Object object) throws IOException @@ -531,35 +533,7 @@ public QueryResponse getQueryResponse() @Override public Writer makeWriter(OutputStream out) throws IOException { - final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); - final SequenceWriter sequenceWriter = objectWriter.writeValuesAsArray(out); - return new Writer() - { - - @Override - public void writeResponseStart() - { - // Do nothing - } - - @Override - public void writeRow(Object obj) throws IOException - { - sequenceWriter.write(obj); - } - - @Override - public void writeResponseEnd() - { - // Do nothing - } - - @Override - public void close() throws IOException - { - sequenceWriter.close(); - } - }; + return new NativeQueryWriter(queryLifecycle.newOutputWriter(io), out); } @Override @@ -585,8 +559,49 @@ public void close() @Override public void writeException(Exception e, OutputStream out) throws IOException { - final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); - out.write(objectWriter.writeValueAsBytes(e)); + final ObjectMapper objectMapper = queryLifecycle.newOutputWriter(io); + out.write(objectMapper.writeValueAsBytes(e)); + } + } + + static class NativeQueryWriter implements QueryResultPusher.Writer + { + private final SerializerProvider serializers; + private final JsonGenerator jsonGenerator; + + public NativeQueryWriter(final ObjectMapper responseMapper, final OutputStream out) throws IOException + { + // Don't use objectWriter.writeValuesAsArray(out), because that causes an end array ] to be written when the + // writer is closed, even if it's closed in case of an exception. This causes valid JSON to be emitted in case + // of an exception, which makes it difficult for callers to detect problems. Note: this means that if an error + // occurs on a Historical (or other data server) after it started to push results to the Broker, the Broker + // will experience that as "JsonEOFException: Unexpected end-of-input: expected close marker for Array". + this.serializers = responseMapper.getSerializerProviderInstance(); + this.jsonGenerator = responseMapper.createGenerator(out); + } + + @Override + public void writeResponseStart() throws IOException + { + jsonGenerator.writeStartArray(); + } + + @Override + public void writeRow(Object obj) throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, obj); + } + + @Override + public void writeResponseEnd() throws IOException + { + jsonGenerator.writeEndArray(); + } + + @Override + public void close() throws IOException + { + jsonGenerator.close(); } } } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 43bce13e2c89..768ae528abfa 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; @@ -80,12 +81,14 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.http.HttpStatus; +import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -98,6 +101,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -424,7 +428,8 @@ public QueryLifecycle factorize() overrideConfig, new AuthConfig(), System.currentTimeMillis(), - System.nanoTime()) + System.nanoTime() + ) { @Override public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten) @@ -453,7 +458,8 @@ public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAdd entity.getUnderlyingException(), new DruidExceptionMatcher( DruidException.Persona.OPERATOR, - DruidException.Category.RUNTIME_FAILURE, "legacyQueryException") + DruidException.Category.RUNTIME_FAILURE, "legacyQueryException" + ) .expectMessageIs("something") ); } @@ -1250,6 +1256,46 @@ public void testTooManyQueryInLaneImplicitFromDurationThreshold() throws Interru } } + @Test + public void testNativeQueryWriter_goodResponse() throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos); + writer.writeResponseStart(); + writer.writeRow(Arrays.asList("foo", "bar")); + writer.writeRow(Collections.singletonList("baz")); + writer.writeResponseEnd(); + writer.close(); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("foo", "bar"), + ImmutableList.of("baz") + ), + jsonMapper.readValue(baos.toByteArray(), Object.class) + ); + } + + @Test + public void testNativeQueryWriter_truncatedResponse() throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos); + writer.writeResponseStart(); + writer.writeRow(Arrays.asList("foo", "bar")); + writer.close(); // Simulate an error that occurs midstream; close writer without calling writeResponseEnd. + + final JsonProcessingException e = Assert.assertThrows( + JsonProcessingException.class, + () -> jsonMapper.readValue(baos.toByteArray(), Object.class) + ); + + MatcherAssert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("expected close marker for Array")) + ); + } + private void createScheduledQueryResource( QueryScheduler scheduler, Collection beforeScheduler,