Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
package org.apache.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.Objects;

@JsonTypeName("ingestionStatsAndErrors")
public class IngestionStatsAndErrorsTaskReport implements TaskReport
{
public static final String REPORT_KEY = "ingestionStatsAndErrors";
Expand Down Expand Up @@ -90,13 +88,4 @@ public String toString()
", payload=" + payload +
'}';
}

// TaskReports are put into a Map and serialized.
// Jackson doesn't normally serialize the TaskReports with a "type" field in that situation,
// so explictly serialize the "type" field (otherwise, deserialization fails).
@JsonProperty("type")
private String getType()
{
return "ingestionStatsAndErrors";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.common.logger.Logger;

import java.io.File;
import java.io.FileOutputStream;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -49,7 +50,10 @@ public void write(String taskId, Map<String, TaskReport> reports)
if (reportsFileParent != null) {
FileUtils.mkdirp(reportsFileParent);
}
objectMapper.writeValue(reportsFile, reports);

try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) {
SingleFileTaskReportFileWriter.writeReportToStream(objectMapper, outputStream, reports);
}
}
catch (Exception e) {
log.error(e, "Encountered exception in write().");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

package org.apache.druid.indexing.common;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Map;

public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
Expand All @@ -46,17 +51,39 @@ public void write(String taskId, Map<String, TaskReport> reports)
if (reportsFileParent != null) {
FileUtils.mkdirp(reportsFileParent);
}
objectMapper.writeValue(reportsFile, reports);

try (final FileOutputStream outputStream = new FileOutputStream(reportsFile)) {
writeReportToStream(objectMapper, outputStream, reports);
}
}
catch (Exception e) {
log.error(e, "Encountered exception in write().");
}
}


@Override
public void setObjectMapper(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
}

public static void writeReportToStream(
final ObjectMapper objectMapper,
final OutputStream outputStream,
final Map<String, TaskReport> reports
) throws Exception
{
final SerializerProvider serializers = objectMapper.getSerializerProviderInstance();

try (final JsonGenerator jg = objectMapper.getFactory().createGenerator(outputStream)) {
jg.writeStartObject();

for (final Map.Entry<String, TaskReport> entry : reports.entrySet()) {
jg.writeFieldName(entry.getKey());
JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, entry.getValue());
}

jg.writeEndObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.util.Map;

public class TaskReportSerdeTest
{
private final ObjectMapper jsonMapper;

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

public TaskReportSerdeTest()
{
TestUtils testUtils = new TestUtils();
Expand All @@ -61,20 +68,22 @@ public void testSerde() throws Exception
)
);
String report1serialized = jsonMapper.writeValueAsString(report1);
IngestionStatsAndErrorsTaskReport report2 = jsonMapper.readValue(
IngestionStatsAndErrorsTaskReport report2 = (IngestionStatsAndErrorsTaskReport) jsonMapper.readValue(
report1serialized,
IngestionStatsAndErrorsTaskReport.class
TaskReport.class
);
Assert.assertEquals(report1, report2);
Assert.assertEquals(report1.hashCode(), report2.hashCode());

final File reportFile = temporaryFolder.newFile();
final SingleFileTaskReportFileWriter writer = new SingleFileTaskReportFileWriter(reportFile);
writer.setObjectMapper(jsonMapper);
Map<String, TaskReport> reportMap1 = TaskReport.buildTaskReports(report1);
String reportMapSerialized = jsonMapper.writeValueAsString(reportMap1);
writer.write("testID", reportMap1);

Map<String, TaskReport> reportMap2 = jsonMapper.readValue(
reportMapSerialized,
new TypeReference<Map<String, TaskReport>>()
{
}
reportFile,
new TypeReference<Map<String, TaskReport>>() {}
);
Assert.assertEquals(reportMap1, reportMap2);
}
Expand Down