diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index 100c06a016c3..4231f318efd8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -19,9 +19,12 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; @@ -34,6 +37,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; +import java.nio.charset.StandardCharsets; import java.util.Map; public class TaskReportSerdeTest @@ -47,6 +51,7 @@ public TaskReportSerdeTest() { TestUtils testUtils = new TestUtils(); jsonMapper = testUtils.getTestObjectMapper(); + jsonMapper.registerSubtypes(ExceptionalTaskReport.class); } @Test @@ -87,4 +92,47 @@ public void testSerde() throws Exception ); Assert.assertEquals(reportMap1, reportMap2); } + + @Test + public void testExceptionWhileWritingReport() throws Exception + { + final File reportFile = temporaryFolder.newFile(); + final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); + writer.setObjectMapper(jsonMapper); + writer.write("theTask", ImmutableMap.of("report", new ExceptionalTaskReport())); + + // Read the file, ensure it's incomplete and not valid JSON. This allows callers to determine the report was + // not complete when written. + Assert.assertEquals( + "{\"report\":{\"type\":\"exceptional\"", + Files.asCharSource(reportFile, StandardCharsets.UTF_8).read() + ); + } + + /** + * Task report that throws an exception while being serialized. + */ + @JsonTypeName("exceptional") + private static class ExceptionalTaskReport implements TaskReport + { + @Override + @JsonProperty + public String getTaskId() + { + throw new UnsupportedOperationException("cannot serialize task ID"); + } + + @Override + public String getReportKey() + { + return "report"; + } + + @Override + @JsonProperty + public Object getPayload() + { + throw new UnsupportedOperationException("cannot serialize payload"); + } + } } diff --git a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java index 1a6186bbc8a8..4b0d8abb23f5 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java +++ b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java @@ -20,6 +20,7 @@ package org.apache.druid.jackson; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; @@ -35,7 +36,6 @@ import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; - import java.io.IOException; /** @@ -81,6 +81,10 @@ public DefaultObjectMapper(JsonFactory factory, @Nullable String serviceName) configure(SerializationFeature.INDENT_OUTPUT, false); configure(SerializationFeature.FLUSH_AFTER_WRITE_VALUE, false); + // Disable automatic JSON termination, so readers can detect truncated responses when a JsonGenerator is + // closed after an exception is thrown while writing. + configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); + addHandler(new DefaultDeserializationProblemHandler(serviceName)); } diff --git a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java index e70f7ecfdf65..4726e70a0e6b 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ArrayWriter.java @@ -43,9 +43,6 @@ public ArrayWriter(final OutputStream outputStream, final ObjectMapper jsonMappe this.serializers = jsonMapper.getSerializerProviderInstance(); this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream); this.outputStream = outputStream; - - // Disable automatic JSON termination, so clients can detect truncated responses. - jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); } @Override diff --git a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java index 6545ce80eacb..27a027d0abd6 100644 --- a/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java +++ b/sql/src/main/java/org/apache/druid/sql/http/ObjectWriter.java @@ -46,9 +46,6 @@ public ObjectWriter(final OutputStream outputStream, final ObjectMapper jsonMapp this.serializers = jsonMapper.getSerializerProviderInstance(); this.jsonGenerator = jsonMapper.getFactory().createGenerator(outputStream); this.outputStream = outputStream; - - // Disable automatic JSON termination, so clients can detect truncated responses. - jsonGenerator.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); } @Override