From 4ad839ba6c2a12dba8ea7137b61dac2785a46dff Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 22 Aug 2022 10:16:25 -0700 Subject: [PATCH 1/2] Fix serialization in TaskReportFileWriters. For some reason, serializing a Map would omit the "type" field. Explicitly sending each value through the ObjectMapper fixes this, because the type information does not get lost. --- .../IngestionStatsAndErrorsTaskReport.java | 10 ------- .../MultipleFileTaskReportFileWriter.java | 6 ++++- .../SingleFileTaskReportFileWriter.java | 27 +++++++++++++++++-- .../common/task/TaskReportSerdeTest.java | 23 +++++++++++----- 4 files changed, 46 insertions(+), 20 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java index 02b20aa63005..610d9a4b708e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java @@ -24,7 +24,6 @@ import java.util.Objects; -@JsonTypeName("ingestionStatsAndErrors") public class IngestionStatsAndErrorsTaskReport implements TaskReport { public static final String REPORT_KEY = "ingestionStatsAndErrors"; @@ -90,13 +89,4 @@ public String toString() ", payload=" + payload + '}'; } - - // TaskReports are put into a Map and serialized. - // Jackson doesn't normally serialize the TaskReports with a "type" field in that situation, - // so explictly serialize the "type" field (otherwise, deserialization fails). - @JsonProperty("type") - private String getType() - { - return "ingestionStatsAndErrors"; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java index 5119b9886143..18313ccd79b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.logger.Logger; import java.io.File; +import java.io.FileOutputStream; import java.util.HashMap; import java.util.Map; @@ -49,7 +50,10 @@ public void write(String taskId, Map reports) if (reportsFileParent != null) { FileUtils.mkdirp(reportsFileParent); } - objectMapper.writeValue(reportsFile, reports); + + try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) { + SingleFileTaskReportFileWriter.writeReportToStream(objectMapper, outputStream, reports); + } } catch (Exception e) { log.error(e, "Encountered exception in write()."); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java index 2e79fe0510c5..cc263b280fa0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java @@ -19,11 +19,14 @@ package org.apache.druid.indexing.common; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.logger.Logger; import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; import java.util.Map; public class SingleFileTaskReportFileWriter implements TaskReportFileWriter @@ -46,17 +49,37 @@ public void write(String taskId, Map reports) if (reportsFileParent != null) { FileUtils.mkdirp(reportsFileParent); } - objectMapper.writeValue(reportsFile, reports); + + try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) { + writeReportToStream(objectMapper, outputStream, reports); + } } catch (Exception e) { log.error(e, "Encountered exception in write()."); } } - @Override public void setObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } + + public static void writeReportToStream( + final ObjectMapper objectMapper, + final OutputStream outputStream, + final Map reports + ) throws Exception + { + try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) { + jg.writeStartObject(); + + for (final Map.Entry entry : reports.entrySet()) { + jg.writeFieldName(entry.getKey()); + objectMapper.writeValue(jg, entry.getValue()); + } + + jg.writeEndObject(); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java index cea2e83a25ef..100c06a016c3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskReportSerdeTest.java @@ -25,17 +25,24 @@ import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TestUtils; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; import java.util.Map; public class TaskReportSerdeTest { private final ObjectMapper jsonMapper; + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + public TaskReportSerdeTest() { TestUtils testUtils = new TestUtils(); @@ -61,20 +68,22 @@ public void testSerde() throws Exception ) ); String report1serialized = jsonMapper.writeValueAsString(report1); - IngestionStatsAndErrorsTaskReport report2 = jsonMapper.readValue( + IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue( report1serialized, - IngestionStatsAndErrorsTaskReport.class + TaskReport.class ); Assert.assertEquals(report1, report2); Assert.assertEquals(report1.hashCode(), report2.hashCode()); + final File reportFile = temporaryFolder.newFile(); + final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile); + writer.setObjectMapper(jsonMapper); Map reportMap1 = TaskReport.buildTaskReports(report1); - String reportMapSerialized = jsonMapper.writeValueAsString(reportMap1); + writer.write("testID", reportMap1); + Map reportMap2 = jsonMapper.readValue( - reportMapSerialized, - new TypeReference>() - { - } + reportFile, + new TypeReference>() {} ); Assert.assertEquals(reportMap1, reportMap2); } From de51b98fe539933b8a9c6ce058eae104a149d330 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 22 Aug 2022 13:34:55 -0700 Subject: [PATCH 2/2] Fixes for static analysis. --- .../indexing/common/IngestionStatsAndErrorsTaskReport.java | 1 - .../indexing/common/SingleFileTaskReportFileWriter.java | 6 +++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java index 610d9a4b708e..9d9d86685333 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReport.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import java.util.Objects; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java index cc263b280fa0..4d55dd649631 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import java.io.File; @@ -71,12 +73,14 @@ public static void writeReportToStream( final Map reports ) throws Exception { + final SerializerProvider serializers = objectMapper.getSerializerProviderInstance(); + try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) { jg.writeStartObject(); for (final Map.Entry entry : reports.entrySet()) { jg.writeFieldName(entry.getKey()); - objectMapper.writeValue(jg, entry.getValue()); + JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, entry.getValue()); } jg.writeEndObject();