diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java index 02b20aa63005..9d9d86685333 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java @@ -20,11 +20,9 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import java.util.Objects; -@JsonTypeName("ingestionStatsAndErrors") public class IngestionStatsAndErrorsTaskReport implements TaskReport { public static final String REPORT_KEY = "ingestionStatsAndErrors"; @@ -90,13 +88,4 @@ public String toString() ", payload=" + payload + '}'; } - - // TaskReports are put into a Map and serialized. - // Jackson doesn't normally serialize the TaskReports with a "type" field in that situation, - // so explictly serialize the "type" field (otherwise, deserialization fails). - @JsonProperty("type") - private String getType() - { - return "ingestionStatsAndErrors"; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java index 5119b9886143..18313ccd79b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.logger.Logger; import java.io.File; +import java.io.FileOutputStream; import java.util.HashMap; import java.util.Map; @@ -49,7 +50,10 @@ public void write(String taskId, Map reports) if (reportsFileParent != null) { FileUtils.mkdirp(reportsFileParent); } - objectMapper.writeValue(reportsFile, reports); + + try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) { + SingleFileTaskReportFileWriter.writeReportToStream(objectMapper, outputStream, reports); + } } catch (Exception e) { log.error(e, "Encountered exception in write()."); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java index 2e79fe0510c5..4d55dd649631 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java @@ -19,11 +19,16 @@ package org.apache.druid.indexing.common; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; import java.util.Map; public class SingleFileTaskReportFileWriter implements TaskReportFileWriter @@ -46,17 +51,39 @@ public void write(String taskId, Map reports) if (reportsFileParent != null) { FileUtils.mkdirp(reportsFileParent); } - objectMapper.writeValue(reportsFile, reports); + + try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) { + writeReportToStream(objectMapper, outputStream, reports); + } } catch (Exception e) { log.error(e, "Encountered exception in write()."); } } - @Override public void setObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } + + public static void writeReportToStream( + final ObjectMapper objectMapper, + final OutputStream outputStream, + final Map reports + ) throws Exception + { + final SerializerProvider serializers = objectMapper.getSerializerProviderInstance(); + + try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) { + jg.writeStartObject(); + + for (final Map.Entry entry : reports.entrySet()) { + jg.writeFieldName(entry.getKey()); + JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, entry.getValue()); + } + + jg.writeEndObject(); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index cea2e83a25ef..100c06a016c3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -25,17 +25,24 @@ import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TestUtils; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; import java.util.Map; public class TaskReportSerdeTest { private final ObjectMapper jsonMapper; + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + public TaskReportSerdeTest() { TestUtils testUtils = new TestUtils(); @@ -61,20 +68,22 @@ public void testSerde() throws Exception ) ); String report1serialized = jsonMapper.writeValueAsString(report1); - IngestionStatsAndErrorsTaskReport report2 = jsonMapper.readValue( + IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue( report1serialized, - IngestionStatsAndErrorsTaskReport.class + TaskReport.class ); Assert.assertEquals(report1, report2); Assert.assertEquals(report1.hashCode(), report2.hashCode()); + final File reportFile = temporaryFolder.newFile(); + final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); + writer.setObjectMapper(jsonMapper); Map reportMap1 = TaskReport.buildTaskReports(report1); - String reportMapSerialized = jsonMapper.writeValueAsString(reportMap1); + writer.write("testID", reportMap1); + Map reportMap2 = jsonMapper.readValue( - reportMapSerialized, - new TypeReference>() - { - } + reportFile, + new TypeReference>() {} ); Assert.assertEquals(reportMap1, reportMap2); }