From 61dab0ce643d34fd0544eeb0e9ce5555249e7a41 Mon Sep 17 00:00:00 2001 From: GWphua Date: Thu, 20 Feb 2025 15:28:28 +0800 Subject: [PATCH 1/4] Support task payload for HDFS --- .../storage/hdfs/tasklog/HdfsTaskLogs.java | 31 +++++++++++++++++++ .../common/tasklogs/HdfsTaskLogsTest.java | 14 +++++++++ 2 files changed, 45 insertions(+) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index f89b0fafde95..7a8476e54956 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -207,6 +207,37 @@ public void killOlderThan(long timestamp) throws IOException } } } + + /** + * Save payload into HDFS, so it can be retrieved later. + * + * @throws IOException When Druid fails to push the task payload. + */ + @Override + public void pushTaskPayload(String taskId, File taskPayloadFile) throws IOException + { + final Path path = getTaskPayloadFromId(taskId); + log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile, path); + pushTaskFile(path, taskPayloadFile); + } + + /** + * Stream payload from HDFS for a task. + * + * @return InputStream for this taskPayload, if available. + * @throws IOException When Druid fails to read the task payload. + */ + @Override + public Optional streamTaskPayload(String taskId) throws IOException + { + final Path path = getTaskPayloadFromId(taskId); + return streamTaskFile(path, 0); + } + + private Path getTaskPayloadFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".payload.json")); + } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 9d0273d1a95f..7b50bf3baf01 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -95,6 +95,20 @@ public void test_taskStatus() throws Exception ); } + @Test + public void test_taskPayload() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File payload = new File(tmpDir, "payload.json"); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + Files.write("{}", payload, StandardCharsets.UTF_8); + taskLogs.pushTaskLog("id", payload); + Assert.assertEquals("{}", + StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get()))); + } + @Test public void testKill() throws Exception { From 161dbe958d825f2d3c834c9a3ae95c0978954d97 Mon Sep 17 00:00:00 2001 From: GWphua Date: Thu, 20 Feb 2025 15:32:35 +0800 Subject: [PATCH 2/4] Fix mistake in Unit Test --- .../druid/indexing/common/tasklogs/HdfsTaskLogsTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 7b50bf3baf01..8c3773070fce 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -104,9 +104,8 @@ public void test_taskPayload() throws Exception final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); Files.write("{}", payload, StandardCharsets.UTF_8); - taskLogs.pushTaskLog("id", payload); - Assert.assertEquals("{}", - StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get()))); + taskLogs.pushTaskPayload("id", payload); + Assert.assertEquals("{}", StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get()))); } @Test From c40a6a07edc42f806cd49b4812369175ec0f3f4c Mon Sep 17 00:00:00 2001 From: GWphua Date: Fri, 21 Feb 2025 14:39:06 +0800 Subject: [PATCH 3/4] PR comments - Remove deprecated Files.write() and Javadocs, method name --- .../storage/hdfs/tasklog/HdfsTaskLogs.java | 19 ++++--------------- .../common/tasklogs/HdfsTaskLogsTest.java | 14 +++++++------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 7a8476e54956..535598bc2d70 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -208,33 +208,22 @@ public void killOlderThan(long timestamp) throws IOException } } - /** - * Save payload into HDFS, so it can be retrieved later. - * - * @throws IOException When Druid fails to push the task payload. - */ @Override public void pushTaskPayload(String taskId, File taskPayloadFile) throws IOException { - final Path path = getTaskPayloadFromId(taskId); - log.info("Pushing task payload [%s] to location [%s]", taskPayloadFile, path); + final Path path = getTaskPayloadFileFromId(taskId); + log.info("Pushing payload for task[%s] to location[%s]", taskId, path); pushTaskFile(path, taskPayloadFile); } - /** - * Stream payload from HDFS for a task. - * - * @return InputStream for this taskPayload, if available. - * @throws IOException When Druid fails to read the task payload. - */ @Override public Optional streamTaskPayload(String taskId) throws IOException { - final Path path = getTaskPayloadFromId(taskId); + final Path path = getTaskPayloadFileFromId(taskId); return streamTaskFile(path, 0); } - private Path getTaskPayloadFromId(String taskId) + private Path getTaskPayloadFileFromId(String taskId) { return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".payload.json")); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 8c3773070fce..125fbd288721 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -50,7 +50,7 @@ public void testStream() throws Exception final File tmpDir = tempFolder.newFolder(); final File logDir = new File(tmpDir, "logs"); final File logFile = new File(tmpDir, "log"); - Files.write("blah", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah"); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); taskLogs.pushTaskLog("foo", logFile); @@ -69,11 +69,11 @@ public void testOverwrite() throws Exception final File logFile = new File(tmpDir, "log"); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("blah", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah"); taskLogs.pushTaskLog("foo", logFile); Assert.assertEquals("blah", readLog(taskLogs, "foo", 0)); - Files.write("blah blah", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah blah"); taskLogs.pushTaskLog("foo", logFile); Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0)); } @@ -87,7 +87,7 @@ public void test_taskStatus() throws Exception final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("{}", statusFile, StandardCharsets.UTF_8); + Files.asCharSink(statusFile, StandardCharsets.UTF_8).write("{}"); taskLogs.pushTaskStatus("id", statusFile); Assert.assertEquals( "{}", @@ -103,7 +103,7 @@ public void test_taskPayload() throws Exception final File payload = new File(tmpDir, "payload.json"); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("{}", payload, StandardCharsets.UTF_8); + Files.asCharSink(payload, StandardCharsets.UTF_8).write("{}"); taskLogs.pushTaskPayload("id", payload); Assert.assertEquals("{}", StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get()))); } @@ -120,7 +120,7 @@ public void testKill() throws Exception final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); - Files.write("log1content", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("log1content"); taskLogs.pushTaskLog("log1", logFile); Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0)); @@ -131,7 +131,7 @@ public void testKill() throws Exception long time = (System.currentTimeMillis() / 1000) * 1000; Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time); - Files.write("log2content", logFile, StandardCharsets.UTF_8); + Files.asCharSink(logFile, StandardCharsets.UTF_8).write("log2content"); taskLogs.pushTaskLog("log2", logFile); Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time); From 376c7d547fc17f6aecf8db38fbdfb65f084463e4 Mon Sep 17 00:00:00 2001 From: GWphua Date: Fri, 21 Feb 2025 14:42:06 +0800 Subject: [PATCH 4/4] Add taskID pass as parameters --- distribution/docker/peon.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distribution/docker/peon.sh b/distribution/docker/peon.sh index 8103f475ccb3..3b4dfc4326b9 100755 --- a/distribution/docker/peon.sh +++ b/distribution/docker/peon.sh @@ -161,4 +161,4 @@ fi # If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage. mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json; -exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@ +exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon --taskId ${TASK_ID} $@