From 3bce92137c606e783904a44a76cb03423240cc74 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 14 Sep 2024 15:32:49 -0700 Subject: [PATCH] QueryResource: Don't close JSON content on error. (#17034) * QueryResource: Don't close JSON content on error. Following similar issues fixed in #11685 and #15880, this patch fixes a bug where QueryResource would write a closing array marker if it encountered an exception after starting to push results. This makes it difficult for callers to detect errors. The prior patches didn't catch this problem because QueryResource uses the ObjectMapper in a unique way, through writeValuesAsArray, which doesn't respect the global AUTO_CLOSE_JSON_CONTENT setting. * Fix usage of customized ObjectMappers. --- .../apache/druid/server/QueryLifecycle.java | 5 +- .../apache/druid/server/QueryResource.java | 85 +++++++++++-------- .../druid/server/QueryResourceTest.java | 50 ++++++++++- 3 files changed, 100 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java index e0bb9875240d..a91959ca20bb 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java +++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java @@ -19,7 +19,7 @@ package org.apache.druid.server; -import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; @@ -62,7 +62,6 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; - import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; @@ -434,7 +433,7 @@ private boolean isSerializeDateTimeAsLong() || (!shouldFinalize && queryContext.isSerializeDateTimeAsLongInner(false)); } - public ObjectWriter newOutputWriter(ResourceIOReaderWriter ioReaderWriter) + public ObjectMapper newOutputWriter(ResourceIOReaderWriter ioReaderWriter) { return ioReaderWriter.getResponseWriter().newOutputWriter( getToolChest(), diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java index 2db205ca0bed..61696dd5cec3 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResource.java +++ b/server/src/main/java/org/apache/druid/server/QueryResource.java @@ -19,11 +19,12 @@ package org.apache.druid.server; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SequenceWriter; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; @@ -37,6 +38,7 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.BadJsonQueryException; import org.apache.druid.query.Query; @@ -374,7 +376,7 @@ String getResponseType() return responseType; } - ObjectWriter newOutputWriter( + ObjectMapper newOutputWriter( @Nullable QueryToolChest> toolChest, @Nullable Query query, boolean serializeDateTimeAsLong @@ -387,7 +389,7 @@ ObjectWriter newOutputWriter( } else { decoratedMapper = mapper; } - return isPretty ? decoratedMapper.writerWithDefaultPrettyPrinter() : decoratedMapper.writer(); + return isPretty ? decoratedMapper.copy().enable(SerializationFeature.INDENT_OUTPUT) : decoratedMapper; } Response ok(Object object) throws IOException @@ -531,35 +533,7 @@ public QueryResponse getQueryResponse() @Override public Writer makeWriter(OutputStream out) throws IOException { - final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); - final SequenceWriter sequenceWriter = objectWriter.writeValuesAsArray(out); - return new Writer() - { - - @Override - public void writeResponseStart() - { - // Do nothing - } - - @Override - public void writeRow(Object obj) throws IOException - { - sequenceWriter.write(obj); - } - - @Override - public void writeResponseEnd() - { - // Do nothing - } - - @Override - public void close() throws IOException - { - sequenceWriter.close(); - } - }; + return new NativeQueryWriter(queryLifecycle.newOutputWriter(io), out); } @Override @@ -585,8 +559,49 @@ public void close() @Override public void writeException(Exception e, OutputStream out) throws IOException { - final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io); - out.write(objectWriter.writeValueAsBytes(e)); + final ObjectMapper objectMapper = queryLifecycle.newOutputWriter(io); + out.write(objectMapper.writeValueAsBytes(e)); + } + } + + static class NativeQueryWriter implements QueryResultPusher.Writer + { + private final SerializerProvider serializers; + private final JsonGenerator jsonGenerator; + + public NativeQueryWriter(final ObjectMapper responseMapper, final OutputStream out) throws IOException + { + // Don't use objectWriter.writeValuesAsArray(out), because that causes an end array ] to be written when the + // writer is closed, even if it's closed in case of an exception. This causes valid JSON to be emitted in case + // of an exception, which makes it difficult for callers to detect problems. Note: this means that if an error + // occurs on a Historical (or other data server) after it started to push results to the Broker, the Broker + // will experience that as "JsonEOFException: Unexpected end-of-input: expected close marker for Array". + this.serializers = responseMapper.getSerializerProviderInstance(); + this.jsonGenerator = responseMapper.createGenerator(out); + } + + @Override + public void writeResponseStart() throws IOException + { + jsonGenerator.writeStartArray(); + } + + @Override + public void writeRow(Object obj) throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, obj); + } + + @Override + public void writeResponseEnd() throws IOException + { + jsonGenerator.writeEndArray(); + } + + @Override + public void close() throws IOException + { + jsonGenerator.close(); } } } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 43bce13e2c89..768ae528abfa 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; @@ -80,12 +81,14 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.Resource; import org.apache.http.HttpStatus; +import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -98,6 +101,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -424,7 +428,8 @@ public QueryLifecycle factorize() overrideConfig, new AuthConfig(), System.currentTimeMillis(), - System.nanoTime()) + System.nanoTime() + ) { @Override public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten) @@ -453,7 +458,8 @@ public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAdd entity.getUnderlyingException(), new DruidExceptionMatcher( DruidException.Persona.OPERATOR, - DruidException.Category.RUNTIME_FAILURE, "legacyQueryException") + DruidException.Category.RUNTIME_FAILURE, "legacyQueryException" + ) .expectMessageIs("something") ); } @@ -1250,6 +1256,46 @@ public void testTooManyQueryInLaneImplicitFromDurationThreshold() throws Interru } } + @Test + public void testNativeQueryWriter_goodResponse() throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos); + writer.writeResponseStart(); + writer.writeRow(Arrays.asList("foo", "bar")); + writer.writeRow(Collections.singletonList("baz")); + writer.writeResponseEnd(); + writer.close(); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of("foo", "bar"), + ImmutableList.of("baz") + ), + jsonMapper.readValue(baos.toByteArray(), Object.class) + ); + } + + @Test + public void testNativeQueryWriter_truncatedResponse() throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos); + writer.writeResponseStart(); + writer.writeRow(Arrays.asList("foo", "bar")); + writer.close(); // Simulate an error that occurs midstream; close writer without calling writeResponseEnd. + + final JsonProcessingException e = Assert.assertThrows( + JsonProcessingException.class, + () -> jsonMapper.readValue(baos.toByteArray(), Object.class) + ); + + MatcherAssert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("expected close marker for Array")) + ); + } + private void createScheduledQueryResource( QueryScheduler scheduler, Collection beforeScheduler,