diff --git a/distribution/docker/peon.sh b/distribution/docker/peon.sh index 8103f475ccb3..3b4dfc4326b9 100755 --- a/distribution/docker/peon.sh +++ b/distribution/docker/peon.sh @@ -161,4 +161,4 @@ fi # If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage. mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json; -exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@ +exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon --taskId ${TASK_ID} $@ diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index f89b0fafde95..535598bc2d70 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -207,6 +207,26 @@ public void killOlderThan(long timestamp) throws IOException } } } + + @Override + public void pushTaskPayload(String taskId, File taskPayloadFile) throws IOException + { + final Path path = getTaskPayloadFileFromId(taskId); + log.info("Pushing payload for task[%s] to location[%s]", taskId, path); + pushTaskFile(path, taskPayloadFile); + } + + @Override + public Optional streamTaskPayload(String taskId) throws IOException + { + final Path path = getTaskPayloadFileFromId(taskId); + return streamTaskFile(path, 0); + } + + private Path getTaskPayloadFileFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".payload.json")); + } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 9d0273d1a95f..125fbd288721 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -50,7 +50,7 @@ public void testStream() throws Exception final File tmpDir = tempFolder.newFolder(); final File logDir = new File(tmpDir, "logs"); final File logFile = new File(tmpDir, "log"); - Files.write("blah", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah"); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); taskLogs.pushTaskLog("foo", logFile); @@ -69,11 +69,11 @@ public void testOverwrite() throws Exception final File logFile = new File(tmpDir, "log"); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("blah", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah"); taskLogs.pushTaskLog("foo", logFile); Assert.assertEquals("blah", readLog(taskLogs, "foo", 0)); - Files.write("blah blah", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah blah"); taskLogs.pushTaskLog("foo", logFile); Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0)); } @@ -87,7 +87,7 @@ public void test_taskStatus() throws Exception final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("{}", statusFile, StandardCharsets.UTF_8); + Files.asCharSink(statusFile, StandardCharsets.UTF_8).write("{}"); taskLogs.pushTaskStatus("id", statusFile); Assert.assertEquals( "{}", @@ -95,6 +95,19 @@ public void test_taskStatus() throws Exception ); } + @Test + public void test_taskPayload() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File payload = new File(tmpDir, "payload.json"); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + Files.asCharSink(payload, StandardCharsets.UTF_8).write("{}"); + taskLogs.pushTaskPayload("id", payload); + Assert.assertEquals("{}", StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get()))); + } + @Test public void testKill() throws Exception { @@ -107,7 +120,7 @@ public void testKill() throws Exception final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("log1content", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("log1content"); taskLogs.pushTaskLog("log1", logFile); Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0)); @@ -118,7 +131,7 @@ public void testKill() throws Exception long time = (System.currentTimeMillis() / 1000) * 1000; Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time); - Files.write("log2content", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("log2content"); taskLogs.pushTaskLog("log2", logFile); Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time);