From 372237b2ede47a6752e0348efeee642f44699d73 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 21 Mar 2018 17:12:06 -0700 Subject: [PATCH 1/7] Add support for task reports, upload reports to deep storage --- .../java/io/druid/tasklogs/NoopTaskLogs.java | 7 +++ .../java/io/druid/tasklogs/TaskLogPusher.java | 4 ++ .../io/druid/tasklogs/TaskLogStreamer.java | 5 ++ .../io/druid/storage/azure/AzureTaskLogs.java | 30 ++++++++- .../druid/storage/google/GoogleTaskLogs.java | 28 +++++++++ .../storage/hdfs/tasklog/HdfsTaskLogs.java | 38 +++++++++++- .../druid/indexing/kafka/KafkaIndexTask.java | 10 +-- .../java/io/druid/storage/s3/S3TaskLogs.java | 31 ++++++++-- .../io/druid/indexing/common/TaskReport.java | 56 +++++++++++++++++ .../common/TaskStatusWithReports.java | 61 +++++++++++++++++++ .../AppenderatorDriverRealtimeIndexTask.java | 5 +- .../indexing/common/task/ArchiveTask.java | 5 +- .../indexing/common/task/CompactionTask.java | 4 +- .../common/task/ConvertSegmentTask.java | 11 ++-- .../common/task/HadoopConverterTask.java | 7 ++- .../indexing/common/task/HadoopIndexTask.java | 9 +-- .../druid/indexing/common/task/IndexTask.java | 7 ++- .../druid/indexing/common/task/KillTask.java | 5 +- .../indexing/common/task/MergeTaskBase.java | 7 ++- .../druid/indexing/common/task/MoveTask.java | 5 +- .../druid/indexing/common/task/NoopTask.java | 5 +- .../common/task/RealtimeIndexTask.java | 5 +- .../indexing/common/task/RestoreTask.java | 5 +- .../common/task/SameIntervalMergeTask.java | 7 ++- .../io/druid/indexing/common/task/Task.java | 4 +- .../common/tasklogs/FileTaskLogs.java | 40 ++++++++++-- .../tasklogs/SwitchingTaskLogStreamer.java | 13 ++++ .../indexing/overlord/ForkingTaskRunner.java | 3 + .../overlord/ThreadPoolTaskRunner.java | 24 +++++--- .../overlord/http/OverlordResource.java | 27 ++++++++ .../worker/executor/ExecutorLifecycle.java | 16 +++++ .../executor/ExecutorLifecycleConfig.java | 15 +++++ .../indexing/common/TestRealtimeTask.java | 4 +- .../io/druid/indexing/common/TestTasks.java | 8 +-- .../indexing/common/task/HadoopTaskTest.java | 4 +- .../indexing/overlord/RealtimeishTask.java | 5 +- .../indexing/overlord/TaskLifecycleTest.java | 13 ++-- .../overlord/http/OverlordResourceTest.java | 3 +- .../src/main/java/io/druid/cli/CliPeon.java | 1 + 39 files changed, 454 insertions(+), 83 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java diff --git a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java index 6fb0f309cfe2..d54c63cce182 100644 --- a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java +++ b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java @@ -24,6 +24,7 @@ import io.druid.java.util.common.logger.Logger; import java.io.File; +import java.io.IOException; public class NoopTaskLogs implements TaskLogs { @@ -41,6 +42,12 @@ public void pushTaskLog(String taskid, File logFile) log.info("Not pushing logs for task: %s", taskid); } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + log.info("Not pushing reports for task: %s", taskid); + } + @Override public void killAll() { diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java b/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java index a904a16f5d1e..6329aac866d8 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogPusher.java @@ -31,4 +31,8 @@ public interface TaskLogPusher { void pushTaskLog(String taskid, File logFile) throws IOException; + + default void pushTaskReports(String taskid, File reportFile) throws IOException + { + } } diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java b/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java index b685c7b7659b..7569cdd145ba 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogStreamer.java @@ -40,4 +40,9 @@ public interface TaskLogStreamer * @return input supplier for this log, if available from this provider */ Optional streamTaskLog(String taskid, long offset) throws IOException; + + default Optional streamTaskReports(final String taskid) throws IOException + { + return Optional.absent(); + } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index 2cd17c1c41c1..8fe5f3b39266 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -54,7 +54,19 @@ public void pushTaskLog(final String taskid, final File logFile) { final String taskKey = getTaskLogKey(taskid); log.info("Pushing task log %s to: %s", logFile, taskKey); + pushTaskFile(taskid, logFile, taskKey); + } + + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + final String taskKey = getTaskReportsKey(taskid); + log.info("Pushing task reports %s to: %s", reportFile, taskKey); + pushTaskFile(taskid, reportFile, taskKey); + } + private void pushTaskFile(final String taskId, final File logFile, String taskKey) + { try { AzureUtils.retryAzureOperation( () -> { @@ -71,9 +83,19 @@ public void pushTaskLog(final String taskid, final File logFile) @Override public Optional streamTaskLog(final String taskid, final long offset) throws IOException + { + return streamTaskFile(taskid, offset, getTaskLogKey(taskid)); + } + + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + return streamTaskFile(taskid, 0, getTaskReportsKey(taskid)); + } + + private Optional streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException { final String container = config.getContainer(); - final String taskKey = getTaskLogKey(taskid); try { if (!azureStorage.getBlobExists(container, taskKey)) { @@ -116,12 +138,16 @@ public InputStream openStream() throws IOException } } - private String getTaskLogKey(String taskid) { return StringUtils.format("%s/%s/log", config.getPrefix(), taskid); } + private String getTaskReportsKey(String taskid) + { + return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid); + } + @Override public void killAll() { diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java index 6ed64576266f..d379f3445abb 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java @@ -51,7 +51,19 @@ public void pushTaskLog(final String taskid, final File logFile) throws IOExcept { final String taskKey = getTaskLogKey(taskid); LOG.info("Pushing task log %s to: %s", logFile, taskKey); + pushTaskFile(taskid, logFile, taskKey); + } + + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + final String taskKey = getTaskReportKey(taskid); + LOG.info("Pushing task reports %s to: %s", reportFile, taskKey); + pushTaskFile(taskid, reportFile, taskKey); + } + private void pushTaskFile(final String taskid, final File logFile, final String taskKey) throws IOException + { FileInputStream fileSteam = new FileInputStream(logFile); InputStreamContent mediaContent = new InputStreamContent("text/plain", fileSteam); @@ -64,7 +76,18 @@ public void pushTaskLog(final String taskid, final File logFile) throws IOExcept public Optional streamTaskLog(final String taskid, final long offset) throws IOException { final String taskKey = getTaskLogKey(taskid); + return streamTaskFile(taskid, offset, taskKey); + } + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + final String taskKey = getTaskReportKey(taskid); + return streamTaskFile(taskid, 0, taskKey); + } + + private Optional streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException + { try { if (!storage.exists(config.getBucket(), taskKey)) { return Optional.absent(); @@ -111,6 +134,11 @@ private String getTaskLogKey(String taskid) return config.getPrefix() + "/" + taskid.replaceAll(":", "_"); } + private String getTaskReportKey(String taskid) + { + return config.getPrefix() + "/" + taskid.replaceAll(":", "_") + ".report.json"; + } + @Override public void killAll() { diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 5851c2fa48a9..61da166187f0 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -61,6 +61,21 @@ public void pushTaskLog(String taskId, File logFile) throws IOException { final Path path = getTaskLogFileFromId(taskId); log.info("Writing task log to: %s", path); + pushTaskFile(path, logFile); + log.info("Wrote task log to: %s", path); + } + + @Override + public void pushTaskReports(String taskId, File reportFile) throws IOException + { + final Path path = getTaskReportsFileFromId(taskId); + log.info("Writing task reports to: %s", path); + pushTaskFile(path, reportFile); + log.info("Wrote task reports to: %s", path); + } + + private void pushTaskFile(Path path, File logFile) throws IOException + { final FileSystem fs = path.getFileSystem(hadoopConfig); try ( final InputStream in = new FileInputStream(logFile); @@ -68,14 +83,24 @@ public void pushTaskLog(String taskId, File logFile) throws IOException ) { ByteStreams.copy(in, out); } - - log.info("Wrote task log to: %s", path); } @Override public Optional streamTaskLog(final String taskId, final long offset) throws IOException { final Path path = getTaskLogFileFromId(taskId); + return streamTaskFile(path, offset); + } + + @Override + public Optional streamTaskReports(String taskId) throws IOException + { + final Path path = getTaskReportsFileFromId(taskId); + return streamTaskFile(path, 0); + } + + private Optional streamTaskFile(final Path path, final long offset) throws IOException + { final FileSystem fs = path.getFileSystem(hadoopConfig); if (fs.exists(path)) { return Optional.of( @@ -113,6 +138,15 @@ private Path getTaskLogFileFromId(String taskId) return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_"))); } + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + private Path getTaskReportsFileFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replaceAll(":", "_") + ".reports.json")); + } + // some hadoop version Path.mergePaths does not exist private static String mergePaths(String path1, String path2) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index a325948a6506..7092ac47eacd 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -51,7 +51,7 @@ import io.druid.discovery.LookupNodeService; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; -import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import io.druid.indexing.common.actions.ResetDataSourceMetadataAction; @@ -412,7 +412,7 @@ private void createAndStartPublishExecutor() } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception { // for backwards compatibility, should be remove from versions greater than 0.12.x if (useLegacy) { @@ -904,10 +904,10 @@ public void onFailure(Throwable t) toolbox.getDataSegmentServerAnnouncer().unannounce(); } - return success(); + return new TaskStatusWithReports(success(), null); } - private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception + private TaskStatusWithReports runLegacy(final TaskToolbox toolbox) throws Exception { log.info("Starting up!"); startTime = DateTimes.nowUtc(); @@ -1272,7 +1272,7 @@ public String apply(DataSegment input) toolbox.getDataSegmentServerAnnouncer().unannounce(); } - return success(); + return new TaskStatusWithReports(success(), null); } private void checkAndMaybeThrowException() diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index 426221f508b3..7448842d0f44 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -56,8 +56,19 @@ public S3TaskLogs(S3TaskLogsConfig config, RestS3Service service) @Override public Optional streamTaskLog(final String taskid, final long offset) throws IOException { - final String taskKey = getTaskLogKey(taskid); + final String taskKey = getTaskLogKey(taskid, "log"); + return streamTaskFile(offset, taskKey); + } + + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "report.json"); + return streamTaskFile(0, taskKey); + } + private Optional streamTaskFile(final long offset, String taskKey) throws IOException + { try { final StorageObject objectDetails = service.getObjectDetails(config.getS3Bucket(), taskKey, null, null, null, null); @@ -111,9 +122,21 @@ public InputStream openStream() throws IOException @Override public void pushTaskLog(final String taskid, final File logFile) throws IOException { - final String taskKey = getTaskLogKey(taskid); + final String taskKey = getTaskLogKey(taskid, "log"); log.info("Pushing task log %s to: %s", logFile, taskKey); + pushTaskFile(logFile, taskKey); + } + + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "report.json"); + log.info("Pushing task reports %s to: %s", reportFile, taskKey); + pushTaskFile(reportFile, taskKey); + } + private void pushTaskFile(final File logFile, String taskKey) throws IOException + { try { S3Utils.retryS3Operation( () -> { @@ -130,9 +153,9 @@ public void pushTaskLog(final String taskid, final File logFile) throws IOExcept } } - private String getTaskLogKey(String taskid) + private String getTaskLogKey(String taskid, String filename) { - return StringUtils.format("%s/%s/log", config.getS3Prefix(), taskid); + return StringUtils.format("%s/%s/%s", config.getS3Prefix(), taskid, filename); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java new file mode 100644 index 000000000000..68e8bf8115e8 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java @@ -0,0 +1,56 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and + * published segments. They are kept in deep storage along with task logs. + */ +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { +}) +public interface TaskReport +{ + String getTaskId(); + + String getReportKey(); + + /** + * @return A JSON-serializable Object that contains a TaskReport's information + */ + Object getPayload(); + + static Map buildTaskReports(TaskReport... taskReports) + { + Map taskReportMap = Maps.newHashMap(); + for (TaskReport taskReport : taskReports) { + taskReportMap.put(taskReport.getReportKey(), taskReport); + } + return taskReportMap; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java new file mode 100644 index 000000000000..19f3c883a0cc --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +public class TaskStatusWithReports extends TaskStatus +{ + @JsonProperty + private TaskStatus taskStatus; + + @JsonProperty + private Map taskReports; + + @JsonCreator + public TaskStatusWithReports( + @JsonProperty("taskStatus") TaskStatus taskStatus, + @JsonProperty("taskReports") Map taskReports + ) + { + super( + taskStatus.getId(), + taskStatus.getStatusCode(), + taskStatus.getDuration() + ); + this.taskStatus = taskStatus; + this.taskReports = taskReports; + } + + @JsonProperty + public TaskStatus getTaskStatus() + { + return taskStatus; + } + + @JsonProperty + public Map getTaskReports() + { + return taskReports; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 3181b252544a..a09e5293341e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -38,6 +38,7 @@ import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -185,7 +186,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception { runThread = Thread.currentThread(); @@ -326,7 +327,7 @@ dataSchema, new RealtimeIOConfig(null, null, null), null } log.info("Job done!"); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java index 32787755a8f1..339b5cf89957 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; @@ -61,7 +62,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -103,6 +104,6 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception } } - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 411715a814fd..d51185bee1d6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -40,7 +40,7 @@ import io.druid.data.input.impl.NoopInputRowParser; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; -import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -178,7 +178,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception { if (indexTaskSpec == null) { final IndexIngestionSpec ingestionSpec = createIngestionSchema( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index 0e752cb988ce..b2e8f3d7ac7b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentListUsedAction; @@ -241,7 +242,7 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final Iterable segmentsToUpdate; if (segment == null) { @@ -285,10 +286,10 @@ public boolean apply(DataSegment segment) for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate, getContext())) { final TaskStatus status = subTask.run(toolbox); if (!status.isSuccess()) { - return TaskStatus.fromCode(getId(), status.getStatusCode()); + return new TaskStatusWithReports(TaskStatus.fromCode(getId(), status.getStatusCode()), null); } } - return success(); + return new TaskStatusWithReports(success(), null); } protected Iterable generateSubTasks( @@ -398,7 +399,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); try { @@ -408,7 +409,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception log.error(e, "Conversion failed."); throw e; } - return success(); + return new TaskStatusWithReports(success(), null); } private void convertSegment(TaskToolbox toolbox) throws SegmentLoadingException, IOException diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java index 5e027ac23453..ba34f0d4505d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java @@ -29,6 +29,7 @@ import io.druid.indexer.updater.HadoopConverterJob; import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.UOE; @@ -216,7 +217,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final Map hadoopProperties = new HashMap<>(); final Properties properties = injector.getInstance(Properties.class); @@ -244,7 +245,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception loader ); if (finishedSegmentString == null) { - return TaskStatus.failure(getId()); + return new TaskStatusWithReports(TaskStatus.failure(getId()), null); } final List finishedSegments = HadoopDruidConverterConfig.jsonMapper.readValue( finishedSegmentString, @@ -254,7 +255,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception ); log.debug("Found new segments %s", Arrays.toString(finishedSegments.toArray())); toolbox.publishSegments(finishedSegments); - return success(); + return new TaskStatusWithReports(success(), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 4386b5b35953..20d33fe7f2fe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -38,6 +38,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; @@ -170,7 +171,7 @@ public String getClasspathPrefix() @SuppressWarnings("unchecked") @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); @@ -229,7 +230,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception specVersion, version ); - return TaskStatus.failure(getId()); + return new TaskStatusWithReports(TaskStatus.failure(getId()), null); } } @@ -253,9 +254,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception ); toolbox.publishSegments(publishedSegments); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } else { - return TaskStatus.failure(getId()); + return new TaskStatusWithReports(TaskStatus.failure(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 7a4dff26c85b..04d017cae97d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -43,6 +43,7 @@ import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -216,7 +217,7 @@ public IndexIngestionSpec getIngestionSchema() } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception { final boolean determineIntervals = !ingestionSchema.getDataSchema() .getGranularitySpec() @@ -262,9 +263,9 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception } if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir)) { - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } else { - return TaskStatus.failure(getId()); + return new TaskStatusWithReports(TaskStatus.failure(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java index 3f5d48f2c909..df04876b1c37 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java @@ -26,6 +26,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentNukeAction; @@ -66,7 +67,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -104,6 +105,6 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment))); } - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 747cf58d4162..9b94460cacef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -34,6 +34,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.emitter.service.ServiceMetricEvent; @@ -139,7 +140,7 @@ public int getPriority() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); final ServiceEmitter emitter = toolbox.getEmitter(); @@ -196,14 +197,14 @@ public String apply(DataSegment input) toolbox.publishSegments(ImmutableList.of(uploadedSegment)); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } catch (Exception e) { log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource()) .addData("interval", mergedSegment.getInterval()) .emit(); - return TaskStatus.failure(getId()); + return new TaskStatusWithReports(TaskStatus.failure(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java index 6bafa61254d7..e006fa1272b4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -26,6 +26,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; @@ -70,7 +71,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -108,7 +109,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment))); } - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } @JsonProperty("target") diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index f76895576a67..a355a8812dd5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -27,6 +27,7 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.DateTimes; @@ -136,7 +137,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { if (firehoseFactory != null) { log.info("Connecting firehose"); @@ -147,7 +148,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception log.info("Sleeping for %,d millis.", runTime); Thread.sleep(runTime); log.info("Woke up!"); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index c18354450ad1..3bab47c302bf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -36,6 +36,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockReleaseAction; @@ -198,7 +199,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(final TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception { runThread = Thread.currentThread(); @@ -478,7 +479,7 @@ public void run() } log.info("Job done!"); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java index 62e15edab3ce..c91634bf0c58 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; @@ -62,7 +63,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -117,6 +118,6 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception ); } - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java index cdb60be12f6c..e4d75e1200e2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.java.util.common.DateTimes; @@ -120,7 +121,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final List segments = toolbox.getTaskActionClient().submit( new SegmentListUsedAction( @@ -141,9 +142,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception ); final TaskStatus status = mergeTask.run(toolbox); if (!status.isSuccess()) { - return TaskStatus.fromCode(getId(), status.getStatusCode()); + return new TaskStatusWithReports(TaskStatus.fromCode(getId(), status.getStatusCode()), null); } - return success(); + return new TaskStatusWithReports(success(), null); } public static class SubTask extends MergeTask diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index e3eb3e77c5e5..36f19c5507d2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.query.Query; @@ -178,7 +178,7 @@ default int getPriority() * * @throws Exception if this task failed */ - TaskStatus run(TaskToolbox toolbox) throws Exception; + TaskStatusWithReports run(TaskToolbox toolbox) throws Exception; Map getContext(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index 1c09b56cd8f9..579234e6c07e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -53,7 +53,7 @@ public FileTaskLogs( public void pushTaskLog(final String taskid, File file) throws IOException { if (config.getDirectory().exists() || config.getDirectory().mkdirs()) { - final File outputFile = fileForTask(taskid); + final File outputFile = fileForTask(taskid, file.getName()); Files.copy(file, outputFile); log.info("Wrote task log to: %s", outputFile); } else { @@ -61,10 +61,22 @@ public void pushTaskLog(final String taskid, File file) throws IOException } } + @Override + public void pushTaskReports(String taskid, File reportFile) throws IOException + { + if (config.getDirectory().exists() || config.getDirectory().mkdirs()) { + final File outputFile = fileForTask(taskid, reportFile.getName()); + Files.copy(reportFile, outputFile); + log.info("Wrote task report to: %s", outputFile); + } else { + throw new IOE("Unable to create task report dir[%s]", config.getDirectory()); + } + } + @Override public Optional streamTaskLog(final String taskid, final long offset) { - final File file = fileForTask(taskid); + final File file = fileForTask(taskid, "log"); if (file.exists()) { return Optional.of( new ByteSource() @@ -81,9 +93,29 @@ public InputStream openStream() throws IOException } } - private File fileForTask(final String taskid) + @Override + public Optional streamTaskReports(final String taskid) + { + final File file = fileForTask(taskid, "report.json"); + if (file.exists()) { + return Optional.of( + new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return LogUtils.streamFile(file, 0); + } + } + ); + } else { + return Optional.absent(); + } + } + + private File fileForTask(final String taskid, String filename) { - return new File(config.getDirectory(), StringUtils.format("%s.log", taskid)); + return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, filename)); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java index cb8e0b34325d..3b7e33baa76c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java @@ -53,4 +53,17 @@ public Optional streamTaskLog(String taskid, long offset) throws IOE return Optional.absent(); } + + @Override + public Optional streamTaskReports(String taskid) throws IOException + { + for (TaskLogStreamer provider : providers) { + final Optional stream = provider.streamTaskReports(taskid); + if (stream.isPresent()) { + return stream; + } + } + + return Optional.absent(); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 519c172f35f6..bd3550452e23 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -260,6 +260,7 @@ public TaskStatus call() final File taskFile = new File(taskDir, "task.json"); final File statusFile = new File(attemptDir, "status.json"); final File logFile = new File(taskDir, "log"); + final File reportsFile = new File(attemptDir, "report.json"); // time to adjust process holders synchronized (tasks) { @@ -408,6 +409,7 @@ public TaskStatus call() command.add("peon"); command.add(taskFile.toString()); command.add(statusFile.toString()); + command.add(reportsFile.toString()); String nodeType = task.getNodeType(); if (nodeType != null) { command.add("--nodeType"); @@ -459,6 +461,7 @@ public TaskStatus call() Thread.currentThread().setName(priorThreadName); // Upload task logs taskLogPusher.pushTaskLog(task.getId(), logFile); + taskLogPusher.pushTaskReports(task.getId(), reportsFile); } TaskStatus status; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 06e6342356be..4d2613478e8b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.emitter.service.ServiceMetricEvent; @@ -262,7 +263,7 @@ public ListenableFuture run(final Task task) } } final ListenableFuture statusFuture = exec.get(taskPriority) - .submit(new ThreadPoolTaskRunnerCallable( + .submit(new ThreadPoolTaskRunnerCallable( task, location, toolbox @@ -439,11 +440,11 @@ public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbo } @Override - public TaskStatus call() + public TaskStatusWithReports call() { final long startTime = System.currentTimeMillis(); - TaskStatus status; + TaskStatusWithReports statusAndReports; try { log.info("Running task: %s", task.getId()); @@ -453,7 +454,7 @@ public TaskStatus call() location ); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId())); - status = task.run(toolbox); + statusAndReports = task.run(toolbox); } catch (InterruptedException e) { // Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable. @@ -464,21 +465,24 @@ public TaskStatus call() // Not stopping, this is definitely unexpected. log.warn(e, "Interrupted while running task[%s]", task); } - - status = TaskStatus.failure(task.getId()); + statusAndReports = new TaskStatusWithReports(TaskStatus.failure(task.getId()), null); } catch (Exception e) { log.error(e, "Exception while running task[%s]", task); - status = TaskStatus.failure(task.getId()); + statusAndReports = new TaskStatusWithReports(TaskStatus.failure(task.getId()), null); } catch (Throwable t) { log.error(t, "Uncaught Throwable while running task[%s]", task); throw t; } - status = status.withDuration(System.currentTimeMillis() - startTime); - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); - return status; + TaskStatus statusWithDuration = statusAndReports.getTaskStatus() + .withDuration(System.currentTimeMillis() - startTime); + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), statusWithDuration); + return new TaskStatusWithReports( + statusWithDuration, + statusAndReports.getTaskReports() + ); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 8be1dcdbe95e..1fa59cebd1a8 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -732,6 +732,33 @@ public Response doGetLog( } } + @GET + @Path("/task/{taskid}/reports") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(TaskResourceFilter.class) + public Response doGetReports( + @PathParam("taskid") final String taskid + ) + { + try { + final Optional stream = taskLogStreamer.streamTaskReports(taskid); + if (stream.isPresent()) { + return Response.ok(stream.get().openStream()).build(); + } else { + return Response.status(Response.Status.NOT_FOUND) + .entity( + "No task reports were found for this task. " + + "The task may not exist, or it may not have completed yet." + ) + .build(); + } + } + catch (Exception e) { + log.warn(e, "Failed to stream task reports for task %s", taskid); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } + } + @GET @Path("/dataSources/{dataSource}") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index d06830eaf6d7..d8bdc95ee12a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.common.concurrent.Execs; import io.druid.indexing.common.TaskStatus; @@ -88,6 +89,7 @@ public void start() throws InterruptedException { final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile"); final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile"); + final File reportsFile = Preconditions.checkNotNull(taskExecutorConfig.getReportsFile(), "reportsFile"); final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream"); try { @@ -188,12 +190,26 @@ public TaskStatus apply(TaskStatus taskStatus) jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus) ); + if (taskStatus instanceof TaskStatusWithReports) { + TaskStatusWithReports taskStatusWithReports = (TaskStatusWithReports) taskStatus; + final File reportsFileParent = reportsFile.getParentFile(); + if (reportsFileParent != null) { + reportsFileParent.mkdirs(); + } + jsonMapper.writeValue(reportsFile, taskStatusWithReports.getTaskReports()); + + // we've uploaded the reports, remove them from the returned taskStatus to avoid storing huge + // TaskStatus objects in metadata storage or zookeeper. + taskStatus = taskStatusWithReports.getTaskStatus(); + } + final File statusFileParent = statusFile.getParentFile(); if (statusFileParent != null) { statusFileParent.mkdirs(); } jsonMapper.writeValue(statusFile, taskStatus); + return taskStatus; } catch (Exception e) { diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java index 44f75ccd4548..c4ea12298c12 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java @@ -39,6 +39,10 @@ public class ExecutorLifecycleConfig @NotNull private File statusFile = null; + @JsonProperty + @NotNull + private File reportsFile = null; + @JsonProperty @Pattern(regexp = "\\{stdin\\}") private String parentStreamName = "stdin"; @@ -75,6 +79,17 @@ public ExecutorLifecycleConfig setStatusFile(File statusFile) return this; } + public File getReportsFile() + { + return reportsFile; + } + + public ExecutorLifecycleConfig setReportsFile(File reportsFile) + { + this.reportsFile = reportsFile; + return this; + } + public InputStream getParentStream() { if ("stdin".equals(parentStreamName)) { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 57fcf7cae95a..2a71dab7eb2a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -75,8 +75,8 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) + public TaskStatusWithReports run(TaskToolbox toolbox) { - return status; + return new TaskStatusWithReports(status, null); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java index a7730ba41c65..7b711ca64004 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java @@ -68,9 +68,9 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) + public TaskStatusWithReports run(TaskToolbox toolbox) { - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } } @@ -96,13 +96,13 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { while (!Thread.currentThread().isInterrupted()) { Thread.sleep(1000); } - return TaskStatus.failure(getId()); + return new TaskStatusWithReports(TaskStatus.failure(getId()), null); } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java index 77de5b14cd9f..24215d815470 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java @@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.druid.indexer.updater.HadoopDruidConverterConfig; -import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.config.TaskConfig; @@ -64,7 +64,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) + public TaskStatusWithReports run(TaskToolbox toolbox) { return null; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java index bcbeb3de0193..e3fba04e5770 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java @@ -24,6 +24,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockListAction; @@ -61,7 +62,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final Interval interval1 = Intervals.of("2010-01-01T00/PT1H"); final Interval interval2 = Intervals.of("2010-01-01T01/PT1H"); @@ -129,6 +130,6 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception Assert.assertEquals("locks4", ImmutableList.of(), locks4); // Exit - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 5fb348fc363d..9c7ec5094cf6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -47,6 +47,7 @@ import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TestUtils; @@ -871,7 +872,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement( toolbox.getTaskActionClient() @@ -885,7 +886,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception .build(); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } }; @@ -908,7 +909,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); @@ -919,7 +920,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception .build(); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } }; @@ -942,7 +943,7 @@ public String getType() } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); @@ -953,7 +954,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception .build(); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); - return TaskStatus.success(getId()); + return new TaskStatusWithReports(TaskStatus.success(getId()), null); } }; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 9c6917d9dac7..91513c51dc03 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -28,6 +28,7 @@ import io.druid.indexer.TaskLocation; import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; @@ -439,7 +440,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatus run(TaskToolbox toolbox) + public TaskStatusWithReports run(TaskToolbox toolbox) { return null; } diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index a205dfab01b3..1d3e3eb40428 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -185,6 +185,7 @@ public void configure(Binder binder) new ExecutorLifecycleConfig() .setTaskFile(new File(taskAndStatusFile.get(0))) .setStatusFile(new File(taskAndStatusFile.get(1))) + .setReportsFile(new File(taskAndStatusFile.get(2))) ); binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); From 61ea998bd577579d1a04111ecdb9101b2d0726d8 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 28 Mar 2018 14:50:06 -0700 Subject: [PATCH 2/7] PR comments --- .../druid/indexing/kafka/KafkaIndexTask.java | 3 ++- .../common/TaskStatusWithReports.java | 13 ++++----- .../AppenderatorDriverRealtimeIndexTask.java | 2 +- .../indexing/common/task/ArchiveTask.java | 5 ++-- .../indexing/common/task/CompactionTask.java | 4 +-- .../common/task/ConvertSegmentTask.java | 11 ++++---- .../common/task/HadoopConverterTask.java | 7 +++-- .../indexing/common/task/HadoopIndexTask.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 2 +- .../druid/indexing/common/task/KillTask.java | 5 ++-- .../indexing/common/task/MergeTaskBase.java | 7 +++-- .../druid/indexing/common/task/MoveTask.java | 5 ++-- .../druid/indexing/common/task/NoopTask.java | 5 ++-- .../common/task/RealtimeIndexTask.java | 5 ++-- .../indexing/common/task/RestoreTask.java | 5 ++-- .../common/task/SameIntervalMergeTask.java | 7 +++-- .../io/druid/indexing/common/task/Task.java | 4 +-- .../overlord/ThreadPoolTaskRunner.java | 27 ++++++++++--------- .../worker/executor/ExecutorLifecycle.java | 6 ++--- .../indexing/common/TestRealtimeTask.java | 4 +-- .../io/druid/indexing/common/TestTasks.java | 8 +++--- .../indexing/common/task/HadoopTaskTest.java | 4 +-- .../indexing/overlord/RealtimeishTask.java | 5 ++-- .../indexing/overlord/TaskLifecycleTest.java | 13 +++++---- .../overlord/http/OverlordResourceTest.java | 3 +-- 25 files changed, 77 insertions(+), 85 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 7092ac47eacd..8f6f044a4231 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -51,6 +51,7 @@ import io.druid.discovery.LookupNodeService; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; @@ -412,7 +413,7 @@ private void createAndStartPublishExecutor() } @Override - public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) throws Exception { // for backwards compatibility, should be remove from versions greater than 0.12.x if (useLegacy) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java index 19f3c883a0cc..7ddab0abeaff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java @@ -20,15 +20,13 @@ package io.druid.indexing.common; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Map; public class TaskStatusWithReports extends TaskStatus { - @JsonProperty - private TaskStatus taskStatus; - @JsonProperty private Map taskReports; @@ -43,14 +41,17 @@ public TaskStatusWithReports( taskStatus.getStatusCode(), taskStatus.getDuration() ); - this.taskStatus = taskStatus; this.taskReports = taskReports; } - @JsonProperty + @JsonIgnore public TaskStatus getTaskStatus() { - return taskStatus; + return new TaskStatus( + getId(), + getStatusCode(), + getDuration() + ); } @JsonProperty diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index a09e5293341e..dc7317a6ea5e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -186,7 +186,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) throws Exception { runThread = Thread.currentThread(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java index 339b5cf89957..32787755a8f1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; @@ -62,7 +61,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -104,6 +103,6 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception } } - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index d51185bee1d6..411715a814fd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -40,7 +40,7 @@ import io.druid.data.input.impl.NoopInputRowParser; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; -import io.druid.indexing.common.TaskStatusWithReports; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -178,7 +178,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception } @Override - public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) throws Exception { if (indexTaskSpec == null) { final IndexIngestionSpec ingestionSpec = createIngestionSchema( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java index b2e8f3d7ac7b..0e752cb988ce 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java @@ -28,7 +28,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentListUsedAction; @@ -242,7 +241,7 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final Iterable segmentsToUpdate; if (segment == null) { @@ -286,10 +285,10 @@ public boolean apply(DataSegment segment) for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, indexSpec, force, validate, getContext())) { final TaskStatus status = subTask.run(toolbox); if (!status.isSuccess()) { - return new TaskStatusWithReports(TaskStatus.fromCode(getId(), status.getStatusCode()), null); + return TaskStatus.fromCode(getId(), status.getStatusCode()); } } - return new TaskStatusWithReports(success(), null); + return success(); } protected Iterable generateSubTasks( @@ -399,7 +398,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { log.info("Subs are good! Italian BMT and Meatball are probably my favorite."); try { @@ -409,7 +408,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception log.error(e, "Conversion failed."); throw e; } - return new TaskStatusWithReports(success(), null); + return success(); } private void convertSegment(TaskToolbox toolbox) throws SegmentLoadingException, IOException diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java index ba34f0d4505d..5e027ac23453 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopConverterTask.java @@ -29,7 +29,6 @@ import io.druid.indexer.updater.HadoopConverterJob; import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.UOE; @@ -217,7 +216,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final Map hadoopProperties = new HashMap<>(); final Properties properties = injector.getInstance(Properties.class); @@ -245,7 +244,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception loader ); if (finishedSegmentString == null) { - return new TaskStatusWithReports(TaskStatus.failure(getId()), null); + return TaskStatus.failure(getId()); } final List finishedSegments = HadoopDruidConverterConfig.jsonMapper.readValue( finishedSegmentString, @@ -255,7 +254,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception ); log.debug("Found new segments %s", Arrays.toString(finishedSegments.toArray())); toolbox.publishSegments(finishedSegments); - return new TaskStatusWithReports(success(), null); + return success(); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 20d33fe7f2fe..7315b497ac2a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -171,7 +171,7 @@ public String getClasspathPrefix() @SuppressWarnings("unchecked") @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 04d017cae97d..8847fe3fbdbf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -217,7 +217,7 @@ public IndexIngestionSpec getIngestionSchema() } @Override - public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) throws Exception { final boolean determineIntervals = !ingestionSchema.getDataSchema() .getGranularitySpec() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java index df04876b1c37..3f5d48f2c909 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java @@ -26,7 +26,6 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentNukeAction; @@ -67,7 +66,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -105,6 +104,6 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment))); } - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 9b94460cacef..747cf58d4162 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -34,7 +34,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.emitter.service.ServiceMetricEvent; @@ -140,7 +139,7 @@ public int getPriority() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); final ServiceEmitter emitter = toolbox.getEmitter(); @@ -197,14 +196,14 @@ public String apply(DataSegment input) toolbox.publishSegments(ImmutableList.of(uploadedSegment)); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } catch (Exception e) { log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource()) .addData("interval", mergedSegment.getInterval()) .emit(); - return new TaskStatusWithReports(TaskStatus.failure(getId()), null); + return TaskStatus.failure(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java index e006fa1272b4..6bafa61254d7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java @@ -26,7 +26,6 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; @@ -71,7 +70,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -109,7 +108,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment))); } - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } @JsonProperty("target") diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index a355a8812dd5..f76895576a67 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -27,7 +27,6 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.DateTimes; @@ -137,7 +136,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { if (firehoseFactory != null) { log.info("Connecting firehose"); @@ -148,7 +147,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception log.info("Sleeping for %,d millis.", runTime); Thread.sleep(runTime); log.info("Woke up!"); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 3bab47c302bf..c18354450ad1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -36,7 +36,6 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockReleaseAction; @@ -199,7 +198,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) throws Exception { runThread = Thread.currentThread(); @@ -479,7 +478,7 @@ public void run() } log.info("Job done!"); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java index c91634bf0c58..62e15edab3ce 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentMetadataUpdateAction; @@ -63,7 +62,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { // Confirm we have a lock (will throw if there isn't exactly one element) final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox.getTaskActionClient())); @@ -118,6 +117,6 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception ); } - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java index e4d75e1200e2..cdb60be12f6c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.java.util.common.DateTimes; @@ -121,7 +120,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final List segments = toolbox.getTaskActionClient().submit( new SegmentListUsedAction( @@ -142,9 +141,9 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception ); final TaskStatus status = mergeTask.run(toolbox); if (!status.isSuccess()) { - return new TaskStatusWithReports(TaskStatus.fromCode(getId(), status.getStatusCode()), null); + return TaskStatus.fromCode(getId(), status.getStatusCode()); } - return new TaskStatusWithReports(success(), null); + return success(); } public static class SubTask extends MergeTask diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 36f19c5507d2..e3eb3e77c5e5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import io.druid.indexing.common.TaskStatusWithReports; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.query.Query; @@ -178,7 +178,7 @@ default int getPriority() * * @throws Exception if this task failed */ - TaskStatusWithReports run(TaskToolbox toolbox) throws Exception; + TaskStatus run(TaskToolbox toolbox) throws Exception; Map getContext(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 4d2613478e8b..45470d480de0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -440,11 +440,11 @@ public ThreadPoolTaskRunnerCallable(Task task, TaskLocation location, TaskToolbo } @Override - public TaskStatusWithReports call() + public TaskStatus call() { final long startTime = System.currentTimeMillis(); - TaskStatusWithReports statusAndReports; + TaskStatus status; try { log.info("Running task: %s", task.getId()); @@ -454,7 +454,7 @@ public TaskStatusWithReports call() location ); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId())); - statusAndReports = task.run(toolbox); + status = task.run(toolbox); } catch (InterruptedException e) { // Don't reset the interrupt flag of the thread, as we do want to continue to the end of this callable. @@ -465,24 +465,27 @@ public TaskStatusWithReports call() // Not stopping, this is definitely unexpected. log.warn(e, "Interrupted while running task[%s]", task); } - statusAndReports = new TaskStatusWithReports(TaskStatus.failure(task.getId()), null); + + status = TaskStatus.failure(task.getId()); } catch (Exception e) { log.error(e, "Exception while running task[%s]", task); - statusAndReports = new TaskStatusWithReports(TaskStatus.failure(task.getId()), null); + status = TaskStatus.failure(task.getId()); } catch (Throwable t) { log.error(t, "Uncaught Throwable while running task[%s]", task); throw t; } - TaskStatus statusWithDuration = statusAndReports.getTaskStatus() - .withDuration(System.currentTimeMillis() - startTime); - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), statusWithDuration); - return new TaskStatusWithReports( - statusWithDuration, - statusAndReports.getTaskReports() - ); + status = status.withDuration(System.currentTimeMillis() - startTime); + TaskStatus statusForNotification; + if (status instanceof TaskStatusWithReports) { + statusForNotification = ((TaskStatusWithReports) status).getTaskStatus(); + } else { + statusForNotification = status; + } + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), statusForNotification); + return status; } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index d8bdc95ee12a..e1b9fadd872b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -38,6 +38,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; @@ -194,7 +195,7 @@ public TaskStatus apply(TaskStatus taskStatus) TaskStatusWithReports taskStatusWithReports = (TaskStatusWithReports) taskStatus; final File reportsFileParent = reportsFile.getParentFile(); if (reportsFileParent != null) { - reportsFileParent.mkdirs(); + FileUtils.forceMkdir(reportsFileParent); } jsonMapper.writeValue(reportsFile, taskStatusWithReports.getTaskReports()); @@ -205,11 +206,10 @@ public TaskStatus apply(TaskStatus taskStatus) final File statusFileParent = statusFile.getParentFile(); if (statusFileParent != null) { - statusFileParent.mkdirs(); + FileUtils.forceMkdir(statusFileParent); } jsonMapper.writeValue(statusFile, taskStatus); - return taskStatus; } catch (Exception e) { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 2a71dab7eb2a..57fcf7cae95a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -75,8 +75,8 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) + public TaskStatus run(TaskToolbox toolbox) { - return new TaskStatusWithReports(status, null); + return status; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java index 7b711ca64004..a7730ba41c65 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestTasks.java @@ -68,9 +68,9 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) + public TaskStatus run(TaskToolbox toolbox) { - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } } @@ -96,13 +96,13 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { while (!Thread.currentThread().isInterrupted()) { Thread.sleep(1000); } - return new TaskStatusWithReports(TaskStatus.failure(getId()), null); + return TaskStatus.failure(getId()); } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java index 24215d815470..77de5b14cd9f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/HadoopTaskTest.java @@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.druid.indexer.updater.HadoopDruidConverterConfig; -import io.druid.indexing.common.TaskStatusWithReports; +import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.config.TaskConfig; @@ -64,7 +64,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) + public TaskStatus run(TaskToolbox toolbox) { return null; } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java index e3fba04e5770..bcbeb3de0193 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java @@ -24,7 +24,6 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockListAction; @@ -62,7 +61,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final Interval interval1 = Intervals.of("2010-01-01T00/PT1H"); final Interval interval2 = Intervals.of("2010-01-01T01/PT1H"); @@ -130,6 +129,6 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception Assert.assertEquals("locks4", ImmutableList.of(), locks4); // Exit - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 9c7ec5094cf6..5fb348fc363d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -47,7 +47,6 @@ import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.TestUtils; @@ -872,7 +871,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement( toolbox.getTaskActionClient() @@ -886,7 +885,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception .build(); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } }; @@ -909,7 +908,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); @@ -920,7 +919,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception .build(); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } }; @@ -943,7 +942,7 @@ public String getType() } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction())); @@ -954,7 +953,7 @@ public TaskStatusWithReports run(TaskToolbox toolbox) throws Exception .build(); toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment))); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + return TaskStatus.success(getId()); } }; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 91513c51dc03..9c6917d9dac7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -28,7 +28,6 @@ import io.druid.indexer.TaskLocation; import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; @@ -440,7 +439,7 @@ public boolean isReady(TaskActionClient taskActionClient) } @Override - public TaskStatusWithReports run(TaskToolbox toolbox) + public TaskStatus run(TaskToolbox toolbox) { return null; } From 1f888906185db4ddcd35d239e47b0c00a0e42a8e Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 29 Mar 2018 16:25:21 -0700 Subject: [PATCH 3/7] Better name for method --- .../indexing/common/TaskStatusWithReports.java | 14 +++++++------- .../indexing/overlord/ThreadPoolTaskRunner.java | 2 +- .../worker/executor/ExecutorLifecycle.java | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java index 7ddab0abeaff..8339a169820c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java @@ -44,8 +44,14 @@ public TaskStatusWithReports( this.taskReports = taskReports; } + @JsonProperty + public Map getTaskReports() + { + return taskReports; + } + @JsonIgnore - public TaskStatus getTaskStatus() + public TaskStatus makeTaskStatusWithoutReports() { return new TaskStatus( getId(), @@ -53,10 +59,4 @@ public TaskStatus getTaskStatus() getDuration() ); } - - @JsonProperty - public Map getTaskReports() - { - return taskReports; - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 45470d480de0..42cab08d2988 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -480,7 +480,7 @@ public TaskStatus call() status = status.withDuration(System.currentTimeMillis() - startTime); TaskStatus statusForNotification; if (status instanceof TaskStatusWithReports) { - statusForNotification = ((TaskStatusWithReports) status).getTaskStatus(); + statusForNotification = ((TaskStatusWithReports) status).makeTaskStatusWithoutReports(); } else { statusForNotification = status; } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index e1b9fadd872b..a0579a2d7869 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -201,7 +201,7 @@ public TaskStatus apply(TaskStatus taskStatus) // we've uploaded the reports, remove them from the returned taskStatus to avoid storing huge // TaskStatus objects in metadata storage or zookeeper. - taskStatus = taskStatusWithReports.getTaskStatus(); + taskStatus = taskStatusWithReports.makeTaskStatusWithoutReports(); } final File statusFileParent = statusFile.getParentFile(); From 7bf2329e4d2fd5113d77edf28e93de9f8b00c029 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 29 Mar 2018 18:35:08 -0700 Subject: [PATCH 4/7] Fix report file upload --- .../io/druid/indexing/common/TaskStatusWithReports.java | 6 ++++++ .../java/io/druid/indexing/overlord/ForkingTaskRunner.java | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java index 8339a169820c..0d0a316ef5dc 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java @@ -59,4 +59,10 @@ public TaskStatus makeTaskStatusWithoutReports() getDuration() ); } + + @Override + public TaskStatus withDuration(long _duration) + { + return new TaskStatusWithReports(super.withDuration(_duration), taskReports); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index bd3550452e23..041dfd879f9d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -461,7 +461,9 @@ public TaskStatus call() Thread.currentThread().setName(priorThreadName); // Upload task logs taskLogPusher.pushTaskLog(task.getId(), logFile); - taskLogPusher.pushTaskReports(task.getId(), reportsFile); + if (reportsFile.exists()) { + taskLogPusher.pushTaskReports(task.getId(), reportsFile); + } } TaskStatus status; From 20947d4240f9b23cad23a3e9c6e80459a3810248 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 30 Mar 2018 14:46:17 -0700 Subject: [PATCH 5/7] Use TaskReportFileWriter --- .../druid/indexing/kafka/KafkaIndexTask.java | 9 +-- .../indexing/kafka/KafkaIndexTaskTest.java | 4 +- .../indexing/common/TaskReportFileWriter.java | 58 ++++++++++++++++ .../common/TaskStatusWithReports.java | 68 ------------------- .../io/druid/indexing/common/TaskToolbox.java | 11 ++- .../indexing/common/TaskToolboxFactory.java | 8 ++- .../AppenderatorDriverRealtimeIndexTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 10 +-- .../druid/indexing/common/task/IndexTask.java | 7 +- .../overlord/ThreadPoolTaskRunner.java | 9 +-- .../worker/executor/ExecutorLifecycle.java | 15 ---- .../executor/ExecutorLifecycleConfig.java | 15 ---- .../indexing/common/TaskToolboxTest.java | 4 +- ...penderatorDriverRealtimeIndexTaskTest.java | 3 +- .../common/task/CompactionTaskTest.java | 3 +- .../indexing/common/task/IndexTaskTest.java | 3 +- .../common/task/NoopTestTaskFileWriter.java | 38 +++++++++++ .../common/task/RealtimeIndexTaskTest.java | 3 +- .../task/SameIntervalMergeTaskTest.java | 3 +- .../IngestSegmentFirehoseFactoryTest.java | 4 +- ...estSegmentFirehoseFactoryTimelineTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 4 +- .../worker/WorkerTaskManagerTest.java | 4 +- .../worker/WorkerTaskMonitorTest.java | 4 +- .../src/main/java/io/druid/cli/CliPeon.java | 8 ++- 25 files changed, 168 insertions(+), 135 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java delete mode 100644 indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 8f6f044a4231..82558f0c796a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -52,7 +52,6 @@ import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import io.druid.indexing.common.actions.ResetDataSourceMetadataAction; @@ -905,10 +904,11 @@ public void onFailure(Throwable t) toolbox.getDataSegmentServerAnnouncer().unannounce(); } - return new TaskStatusWithReports(success(), null); + toolbox.getTaskReportFileWriter().write(null); + return success(); } - private TaskStatusWithReports runLegacy(final TaskToolbox toolbox) throws Exception + private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception { log.info("Starting up!"); startTime = DateTimes.nowUtc(); @@ -1273,7 +1273,8 @@ public String apply(DataSegment input) toolbox.getDataSegmentServerAnnouncer().unannounce(); } - return new TaskStatusWithReports(success(), null); + toolbox.getTaskReportFileWriter().write(null); + return success(); } private void checkAndMaybeThrowException() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 8bdd3c7f087f..9fb0495284f2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -57,6 +57,7 @@ import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.kafka.supervisor.KafkaSupervisor; import io.druid.indexing.kafka.test.TestBroker; @@ -2032,7 +2033,8 @@ public List getLocations() EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java new file mode 100644 index 000000000000..eb5e9d9db40b --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskReportFileWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.java.util.common.logger.Logger; +import org.apache.commons.io.FileUtils; + +import java.io.File; + +public class TaskReportFileWriter +{ + private static final Logger log = new Logger(TaskReportFileWriter.class); + + private final File reportsFile; + private ObjectMapper objectMapper; + + public TaskReportFileWriter(File reportFile) + { + this.reportsFile = reportFile; + } + + public void write(TaskReport report) + { + try { + final File reportsFileParent = reportsFile.getParentFile(); + if (reportsFileParent != null) { + FileUtils.forceMkdir(reportsFileParent); + } + objectMapper.writeValue(reportsFile, report); + } + catch (Exception e) { + log.error(e, "Encountered exception in write()."); + } + } + + public void setObjectMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java deleted file mode 100644 index 0d0a316ef5dc..000000000000 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatusWithReports.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.indexing.common; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Map; - -public class TaskStatusWithReports extends TaskStatus -{ - @JsonProperty - private Map taskReports; - - @JsonCreator - public TaskStatusWithReports( - @JsonProperty("taskStatus") TaskStatus taskStatus, - @JsonProperty("taskReports") Map taskReports - ) - { - super( - taskStatus.getId(), - taskStatus.getStatusCode(), - taskStatus.getDuration() - ); - this.taskReports = taskReports; - } - - @JsonProperty - public Map getTaskReports() - { - return taskReports; - } - - @JsonIgnore - public TaskStatus makeTaskStatusWithoutReports() - { - return new TaskStatus( - getId(), - getStatusCode(), - getDuration() - ); - } - - @Override - public TaskStatus withDuration(long _duration) - { - return new TaskStatusWithReports(super.withDuration(_duration), taskReports); - } -} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index dd132769192f..9deab2bc497c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -90,6 +90,7 @@ public class TaskToolbox private final Cache cache; private final CacheConfig cacheConfig; private final IndexMergerV9 indexMergerV9; + private final TaskReportFileWriter taskReportFileWriter; private final DruidNodeAnnouncer druidNodeAnnouncer; private final DruidNode druidNode; @@ -120,7 +121,8 @@ public TaskToolbox( DruidNodeAnnouncer druidNodeAnnouncer, DruidNode druidNode, LookupNodeService lookupNodeService, - DataNodeService dataNodeService + DataNodeService dataNodeService, + TaskReportFileWriter taskReportFileWriter ) { this.config = config; @@ -147,6 +149,8 @@ public TaskToolbox( this.druidNode = druidNode; this.lookupNodeService = lookupNodeService; this.dataNodeService = dataNodeService; + this.taskReportFileWriter = taskReportFileWriter; + this.taskReportFileWriter.setObjectMapper(this.objectMapper); } public TaskConfig getConfig() @@ -303,4 +307,9 @@ public DruidNode getDruidNode() { return druidNode; } + + public TaskReportFileWriter getTaskReportFileWriter() + { + return taskReportFileWriter; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index c17b23fe210a..1a35ec040f8e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -78,6 +78,7 @@ public class TaskToolboxFactory private final DruidNode druidNode; private final LookupNodeService lookupNodeService; private final DataNodeService dataNodeService; + private final TaskReportFileWriter taskReportFileWriter; @Inject public TaskToolboxFactory( @@ -103,7 +104,8 @@ public TaskToolboxFactory( DruidNodeAnnouncer druidNodeAnnouncer, @RemoteChatHandler DruidNode druidNode, LookupNodeService lookupNodeService, - DataNodeService dataNodeService + DataNodeService dataNodeService, + TaskReportFileWriter taskReportFileWriter ) { this.config = config; @@ -129,6 +131,7 @@ public TaskToolboxFactory( this.druidNode = druidNode; this.lookupNodeService = lookupNodeService; this.dataNodeService = dataNodeService; + this.taskReportFileWriter = taskReportFileWriter; } public TaskToolbox build(Task task) @@ -158,7 +161,8 @@ public TaskToolbox build(Task task) druidNodeAnnouncer, druidNode, lookupNodeService, - dataNodeService + dataNodeService, + taskReportFileWriter ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index dc7317a6ea5e..cf408e14aaa0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -38,7 +38,6 @@ import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -327,7 +326,8 @@ dataSchema, new RealtimeIOConfig(null, null, null), null } log.info("Job done!"); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + toolbox.getTaskReportFileWriter().write(null); + return TaskStatus.success(getId()); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 7315b497ac2a..f8e80e569a36 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -38,7 +38,6 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; @@ -230,7 +229,8 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception specVersion, version ); - return new TaskStatusWithReports(TaskStatus.failure(getId()), null); + toolbox.getTaskReportFileWriter().write(null); + return TaskStatus.failure(getId()); } } @@ -254,9 +254,11 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception ); toolbox.publishSegments(publishedSegments); - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + toolbox.getTaskReportFileWriter().write(null); + return TaskStatus.success(getId()); } else { - return new TaskStatusWithReports(TaskStatus.failure(getId()), null); + toolbox.getTaskReportFileWriter().write(null); + return TaskStatus.failure(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 8847fe3fbdbf..08f857ea4212 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -43,7 +43,6 @@ import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -263,9 +262,11 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception } if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, versions, firehoseFactory, firehoseTempDir)) { - return new TaskStatusWithReports(TaskStatus.success(getId()), null); + toolbox.getTaskReportFileWriter().write(null); + return TaskStatus.success(getId()); } else { - return new TaskStatusWithReports(TaskStatus.failure(getId()), null); + toolbox.getTaskReportFileWriter().write(null); + return TaskStatus.failure(getId()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 42cab08d2988..e08d497e841f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -29,7 +29,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.java.util.emitter.service.ServiceMetricEvent; @@ -478,13 +477,7 @@ public TaskStatus call() } status = status.withDuration(System.currentTimeMillis() - startTime); - TaskStatus statusForNotification; - if (status instanceof TaskStatusWithReports) { - statusForNotification = ((TaskStatusWithReports) status).makeTaskStatusWithoutReports(); - } else { - statusForNotification = status; - } - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), statusForNotification); + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); return status; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index a0579a2d7869..98e6382a43ae 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -26,7 +26,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; -import io.druid.indexing.common.TaskStatusWithReports; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.common.concurrent.Execs; import io.druid.indexing.common.TaskStatus; @@ -90,7 +89,6 @@ public void start() throws InterruptedException { final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile"); final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile"); - final File reportsFile = Preconditions.checkNotNull(taskExecutorConfig.getReportsFile(), "reportsFile"); final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream"); try { @@ -191,19 +189,6 @@ public TaskStatus apply(TaskStatus taskStatus) jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus) ); - if (taskStatus instanceof TaskStatusWithReports) { - TaskStatusWithReports taskStatusWithReports = (TaskStatusWithReports) taskStatus; - final File reportsFileParent = reportsFile.getParentFile(); - if (reportsFileParent != null) { - FileUtils.forceMkdir(reportsFileParent); - } - jsonMapper.writeValue(reportsFile, taskStatusWithReports.getTaskReports()); - - // we've uploaded the reports, remove them from the returned taskStatus to avoid storing huge - // TaskStatus objects in metadata storage or zookeeper. - taskStatus = taskStatusWithReports.makeTaskStatusWithoutReports(); - } - final File statusFileParent = statusFile.getParentFile(); if (statusFileParent != null) { FileUtils.forceMkdir(statusFileParent); diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java index c4ea12298c12..44f75ccd4548 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycleConfig.java @@ -39,10 +39,6 @@ public class ExecutorLifecycleConfig @NotNull private File statusFile = null; - @JsonProperty - @NotNull - private File reportsFile = null; - @JsonProperty @Pattern(regexp = "\\{stdin\\}") private String parentStreamName = "stdin"; @@ -79,17 +75,6 @@ public ExecutorLifecycleConfig setStatusFile(File statusFile) return this; } - public File getReportsFile() - { - return reportsFile; - } - - public ExecutorLifecycleConfig setReportsFile(File reportsFile) - { - this.reportsFile = reportsFile; - return this; - } - public InputStream getParentStream() { if ("stdin".equals(parentStreamName)) { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 52bfe1ea891a..f73da9d2128f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -25,6 +25,7 @@ import io.druid.client.cache.CacheConfig; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.Intervals; import io.druid.java.util.emitter.service.ServiceEmitter; @@ -114,7 +115,8 @@ public void setUp() throws IOException null, null, null, - null + null, + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 2761aacbdb64..abd451a9ed30 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1243,7 +1243,8 @@ public List getLocations() EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index 440ed912f0e0..27ecd4bb66a2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -507,7 +507,8 @@ private static class TestTaskToolbox extends TaskToolbox null, null, null, - null + null, + new NoopTestTaskFileWriter() ); this.segmentFileMap = segmentFileMap; } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index f449c3ab05d1..8106c79122e8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -1043,7 +1043,8 @@ public Map makeLoadSpec(URI uri) null, null, null, - null + null, + new NoopTestTaskFileWriter() ); indexTask.isReady(box.getTaskActionClient()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java b/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java new file mode 100644 index 000000000000..a4d9ca1cca72 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +import io.druid.indexing.common.TaskReport; +import io.druid.indexing.common.TaskReportFileWriter; + +import java.io.File; + +public class NoopTestTaskFileWriter extends TaskReportFileWriter +{ + public NoopTestTaskFileWriter() + { + super(null); + } + + @Override + public void write(TaskReport report) + { + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 518d1d63b5d0..916c925e0d02 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -1084,7 +1084,8 @@ public List getLocations() EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); return toolboxFactory.build(task); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index c1cd914a8a37..e3c232db23c8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -256,7 +256,8 @@ public void cleanup(DataSegment segment) null, null, null, - null + null, + new NoopTestTaskFileWriter() ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index e311b514da8d..d85a1e44bd22 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -50,6 +50,7 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.TaskLockbox; @@ -312,7 +313,8 @@ public DataSegment restore(DataSegment segment) null, null, null, - null + null, + new NoopTestTaskFileWriter() ); Collection values = new LinkedList<>(); for (InputRowParser parser : Arrays.asList( diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index ff1a738260b9..14a52cbfcad7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -47,6 +47,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Intervals; @@ -343,7 +344,8 @@ public TaskActionClient create(Task task) null, null, null, - null + null, + new NoopTestTaskFileWriter() ); final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory( DATA_SOURCE, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 5fb348fc363d..c17452f654aa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -63,6 +63,7 @@ import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.common.task.KillTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -605,7 +606,8 @@ public void unannounceSegments(Iterable segments) EasyMock.createNiceMock(DruidNodeAnnouncer.class), EasyMock.createNiceMock(DruidNode.class), new LookupNodeService("tier"), - new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0) + new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0), + new NoopTestTaskFileWriter() ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java index 3b3ca7393fb5..9be677ae07ba 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskManagerTest.java @@ -35,6 +35,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.NoopTask; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.ThreadPoolTaskRunner; @@ -125,7 +126,8 @@ public List getLocations() null, null, null, - null + null, + new NoopTestTaskFileWriter() ), taskConfig, new NoopServiceEmitter(), diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 1b74f0377388..51d27f0b2a48 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -36,6 +36,7 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.task.NoopTestTaskFileWriter; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; @@ -190,7 +191,8 @@ public List getLocations() null, null, null, - null + null, + new NoopTestTaskFileWriter() ), taskConfig, new NoopServiceEmitter(), diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 1d3e3eb40428..ae14efaedb1a 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -54,6 +54,7 @@ import io.druid.guice.annotations.Smile; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; +import io.druid.indexing.common.TaskReportFileWriter; import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.LocalTaskActionClientFactory; import io.druid.indexing.common.actions.RemoteTaskActionClientFactory; @@ -185,7 +186,12 @@ public void configure(Binder binder) new ExecutorLifecycleConfig() .setTaskFile(new File(taskAndStatusFile.get(0))) .setStatusFile(new File(taskAndStatusFile.get(1))) - .setReportsFile(new File(taskAndStatusFile.get(2))) + ); + + binder.bind(TaskReportFileWriter.class).toInstance( + new TaskReportFileWriter( + new File(taskAndStatusFile.get(2)) + ) ); binder.bind(TaskRunner.class).to(ThreadPoolTaskRunner.class); From 573381616dfd2c982d040eafeb655f9eaa485dc6 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 30 Mar 2018 16:39:32 -0700 Subject: [PATCH 6/7] Checkstyle --- .../io/druid/indexing/common/task/NoopTestTaskFileWriter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java b/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java index a4d9ca1cca72..cebee6c624f6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/NoopTestTaskFileWriter.java @@ -22,8 +22,6 @@ import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReportFileWriter; -import java.io.File; - public class NoopTestTaskFileWriter extends TaskReportFileWriter { public NoopTestTaskFileWriter() From cb0bcac84988bd3a99cdd2d360af7fca3163f4ad Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 2 Apr 2018 10:54:01 -0700 Subject: [PATCH 7/7] More PR comments --- .../io/druid/indexing/common/TaskReport.java | 2 -- .../overlord/ThreadPoolTaskRunner.java | 2 +- .../src/main/java/io/druid/cli/CliPeon.java | 21 +++++++++++++++---- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java index 68e8bf8115e8..eff6520741ba 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskReport.java @@ -29,8 +29,6 @@ * TaskReport objects contain additional information about an indexing task, such as row statistics, errors, and * published segments. They are kept in deep storage along with task logs. */ -/** - */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { }) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index e08d497e841f..06e6342356be 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -262,7 +262,7 @@ public ListenableFuture run(final Task task) } } final ListenableFuture statusFuture = exec.get(taskPriority) - .submit(new ThreadPoolTaskRunnerCallable( + .submit(new ThreadPoolTaskRunnerCallable( task, location, toolbox diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index ae14efaedb1a..d7da80bae62e 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -114,9 +114,18 @@ ) public class CliPeon extends GuiceRunnable { - @Arguments(description = "task.json status.json", required = true) + @Arguments(description = "task.json status.json report.json", required = true) public List taskAndStatusFile; + // path to store the task's stdout log + private String taskLogPath; + + // path to store the task's TaskStatus + private String taskStatusPath; + + // path to store the task's TaskReport objects + private String taskReportPath; + @Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK") public String nodeType = "indexer-executor"; @@ -142,6 +151,10 @@ protected List getModules() @Override public void configure(Binder binder) { + taskLogPath = taskAndStatusFile.get(0); + taskStatusPath = taskAndStatusFile.get(1); + taskReportPath = taskAndStatusFile.get(2); + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/peon"); binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); @@ -184,13 +197,13 @@ public void configure(Binder binder) LifecycleModule.register(binder, ExecutorLifecycle.class); binder.bind(ExecutorLifecycleConfig.class).toInstance( new ExecutorLifecycleConfig() - .setTaskFile(new File(taskAndStatusFile.get(0))) - .setStatusFile(new File(taskAndStatusFile.get(1))) + .setTaskFile(new File(taskLogPath)) + .setStatusFile(new File(taskStatusPath)) ); binder.bind(TaskReportFileWriter.class).toInstance( new TaskReportFileWriter( - new File(taskAndStatusFile.get(2)) + new File(taskReportPath) ) );