Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.server;

import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -62,7 +62,6 @@

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;

import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -434,7 +433,7 @@ private boolean isSerializeDateTimeAsLong()
|| (!shouldFinalize && queryContext.isSerializeDateTimeAsLongInner(false));
}

public ObjectWriter newOutputWriter(ResourceIOReaderWriter ioReaderWriter)
public ObjectMapper newOutputWriter(ResourceIOReaderWriter ioReaderWriter)
{
return ioReaderWriter.getResponseWriter().newOutputWriter(
getToolChest(),
Expand Down
85 changes: 50 additions & 35 deletions server/src/main/java/org/apache/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.apache.druid.server;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SequenceWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BadJsonQueryException;
import org.apache.druid.query.Query;
Expand Down Expand Up @@ -374,7 +376,7 @@ String getResponseType()
return responseType;
}

ObjectWriter newOutputWriter(
ObjectMapper newOutputWriter(
@Nullable QueryToolChest<?, Query<?>> toolChest,
@Nullable Query<?> query,
boolean serializeDateTimeAsLong
Expand All @@ -387,7 +389,7 @@ ObjectWriter newOutputWriter(
} else {
decoratedMapper = mapper;
}
return isPretty ? decoratedMapper.writerWithDefaultPrettyPrinter() : decoratedMapper.writer();
return isPretty ? decoratedMapper.copy().enable(SerializationFeature.INDENT_OUTPUT) : decoratedMapper;
}

Response ok(Object object) throws IOException
Expand Down Expand Up @@ -531,35 +533,7 @@ public QueryResponse<Object> getQueryResponse()
@Override
public Writer makeWriter(OutputStream out) throws IOException
{
final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io);
final SequenceWriter sequenceWriter = objectWriter.writeValuesAsArray(out);
return new Writer()
{

@Override
public void writeResponseStart()
{
// Do nothing
}

@Override
public void writeRow(Object obj) throws IOException
{
sequenceWriter.write(obj);
}

@Override
public void writeResponseEnd()
{
// Do nothing
}

@Override
public void close() throws IOException
{
sequenceWriter.close();
}
};
return new NativeQueryWriter(queryLifecycle.newOutputWriter(io), out);
}

@Override
Expand All @@ -585,8 +559,49 @@ public void close()
@Override
public void writeException(Exception e, OutputStream out) throws IOException
{
final ObjectWriter objectWriter = queryLifecycle.newOutputWriter(io);
out.write(objectWriter.writeValueAsBytes(e));
final ObjectMapper objectMapper = queryLifecycle.newOutputWriter(io);
out.write(objectMapper.writeValueAsBytes(e));
}
}

static class NativeQueryWriter implements QueryResultPusher.Writer
{
private final SerializerProvider serializers;
private final JsonGenerator jsonGenerator;

public NativeQueryWriter(final ObjectMapper responseMapper, final OutputStream out) throws IOException
{
// Don't use objectWriter.writeValuesAsArray(out), because that causes an end array ] to be written when the
// writer is closed, even if it's closed in case of an exception. This causes valid JSON to be emitted in case
// of an exception, which makes it difficult for callers to detect problems. Note: this means that if an error
// occurs on a Historical (or other data server) after it started to push results to the Broker, the Broker
// will experience that as "JsonEOFException: Unexpected end-of-input: expected close marker for Array".
this.serializers = responseMapper.getSerializerProviderInstance();
this.jsonGenerator = responseMapper.createGenerator(out);
}

@Override
public void writeResponseStart() throws IOException
{
jsonGenerator.writeStartArray();
}

@Override
public void writeRow(Object obj) throws IOException
{
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializers, obj);
}

@Override
public void writeResponseEnd() throws IOException
{
jsonGenerator.writeEndArray();
}

@Override
public void close() throws IOException
{
jsonGenerator.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
Expand Down Expand Up @@ -80,12 +81,14 @@
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.http.HttpStatus;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -98,6 +101,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -424,7 +428,8 @@ public QueryLifecycle factorize()
overrideConfig,
new AuthConfig(),
System.currentTimeMillis(),
System.nanoTime())
System.nanoTime()
)
{
@Override
public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAddress, long bytesWritten)
Expand Down Expand Up @@ -453,7 +458,8 @@ public void emitLogsAndMetrics(@Nullable Throwable e, @Nullable String remoteAdd
entity.getUnderlyingException(),
new DruidExceptionMatcher(
DruidException.Persona.OPERATOR,
DruidException.Category.RUNTIME_FAILURE, "legacyQueryException")
DruidException.Category.RUNTIME_FAILURE, "legacyQueryException"
)
.expectMessageIs("something")
);
}
Expand Down Expand Up @@ -1250,6 +1256,46 @@ public void testTooManyQueryInLaneImplicitFromDurationThreshold() throws Interru
}
}

@Test
public void testNativeQueryWriter_goodResponse() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos);
writer.writeResponseStart();
writer.writeRow(Arrays.asList("foo", "bar"));
writer.writeRow(Collections.singletonList("baz"));
writer.writeResponseEnd();
writer.close();

Assert.assertEquals(
ImmutableList.of(
ImmutableList.of("foo", "bar"),
ImmutableList.of("baz")
),
jsonMapper.readValue(baos.toByteArray(), Object.class)
);
}

@Test
public void testNativeQueryWriter_truncatedResponse() throws IOException
{
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final QueryResultPusher.Writer writer = new QueryResource.NativeQueryWriter(jsonMapper, baos);
writer.writeResponseStart();
writer.writeRow(Arrays.asList("foo", "bar"));
writer.close(); // Simulate an error that occurs midstream; close writer without calling writeResponseEnd.

final JsonProcessingException e = Assert.assertThrows(
JsonProcessingException.class,
() -> jsonMapper.readValue(baos.toByteArray(), Object.class)
);

MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("expected close marker for Array"))
);
}

private void createScheduledQueryResource(
QueryScheduler scheduler,
Collection<CountDownLatch> beforeScheduler,
Expand Down