Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,4 @@ fi
# If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage.
mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon --taskId ${TASK_ID} $@
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed

Copy link
Copy Markdown
Contributor Author

@GWphua GWphua Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is absent, Druid will not be able to start the tasks when we have payload above 128KiB, and we will receive the error log:
ERROR [main] org.apache.druid.cli.CliPeon - Stream on task id[] is null.

This means that Druid will have problems reading the taskId.
So, we will have to pass it in under peon.sh to ensure that we receive the task ID.

Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,26 @@ public void killOlderThan(long timestamp) throws IOException
}
}
}

@Override
public void pushTaskPayload(String taskId, File taskPayloadFile) throws IOException
{
final Path path = getTaskPayloadFileFromId(taskId);
log.info("Pushing payload for task[%s] to location[%s]", taskId, path);
pushTaskFile(path, taskPayloadFile);
}

@Override
public Optional<InputStream> streamTaskPayload(String taskId) throws IOException
{
final Path path = getTaskPayloadFileFromId(taskId);
return streamTaskFile(path, 0);
}

private Path getTaskPayloadFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".payload.json"));
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testStream() throws Exception
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, StandardCharsets.UTF_8);
Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());
taskLogs.pushTaskLog("foo", logFile);

Expand All @@ -69,11 +69,11 @@ public void testOverwrite() throws Exception
final File logFile = new File(tmpDir, "log");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());

Files.write("blah", logFile, StandardCharsets.UTF_8);
Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah");
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah", readLog(taskLogs, "foo", 0));

Files.write("blah blah", logFile, StandardCharsets.UTF_8);
Files.asCharSink(logFile, StandardCharsets.UTF_8).write("blah blah");
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
}
Expand All @@ -87,14 +87,27 @@ public void test_taskStatus() throws Exception
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());


Files.write("{}", statusFile, StandardCharsets.UTF_8);
Files.asCharSink(statusFile, StandardCharsets.UTF_8).write("{}");
taskLogs.pushTaskStatus("id", statusFile);
Assert.assertEquals(
"{}",
StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus("id").get()))
);
}

@Test
public void test_taskPayload() throws Exception
{
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File payload = new File(tmpDir, "payload.json");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());

Files.asCharSink(payload, StandardCharsets.UTF_8).write("{}");
taskLogs.pushTaskPayload("id", payload);
Assert.assertEquals("{}", StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get())));
}

@Test
public void testKill() throws Exception
{
Expand All @@ -107,7 +120,7 @@ public void testKill() throws Exception

final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());

Files.write("log1content", logFile, StandardCharsets.UTF_8);
Files.asCharSink(logFile, StandardCharsets.UTF_8).write("log1content");
taskLogs.pushTaskLog("log1", logFile);
Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0));

Expand All @@ -118,7 +131,7 @@ public void testKill() throws Exception
long time = (System.currentTimeMillis() / 1000) * 1000;
Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time);

Files.write("log2content", logFile, StandardCharsets.UTF_8);
Files.asCharSink(logFile, StandardCharsets.UTF_8).write("log2content");
taskLogs.pushTaskLog("log2", logFile);
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));
Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time);
Expand Down