Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ public void killAll() throws IOException
{
log.info("Noop: No task logs are deleted.");
}

@Override
public void killOlderThan(long timestamp) throws IOException
{
log.info("Noop: No task logs are deleted.");
}
}
1 change: 1 addition & 0 deletions api/src/main/java/io/druid/tasklogs/TaskLogKiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
public interface TaskLogKiller
{
void killAll() throws IOException;
void killOlderThan(long timestamp) throws IOException;
}
10 changes: 10 additions & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ If you are running the indexing service in remote mode, the task logs must be st
|--------|-----------|-------|
|`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file|

You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties.
Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false|
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None|
|`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)|
|`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)|

##### File Task Logs

Store task logs in the local filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,10 @@ public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}

@Override
public void killOlderThan(long timestamp) throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,10 @@ public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}

@Override
public void killOlderThan(long timestamp) throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -123,6 +125,35 @@ public void killAll() throws IOException
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig);
fs.delete(taskLogDir, true);
}

@Override
public void killOlderThan(long timestamp) throws IOException
{
Path taskLogDir = new Path(config.getDirectory());
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig);
if (fs.exists(taskLogDir)) {

if (!fs.isDirectory(taskLogDir)) {
throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir));
}

RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(taskLogDir);
while (iter.hasNext()) {
LocatedFileStatus file = iter.next();
if (file.getModificationTime() < timestamp) {
Path p = file.getPath();
log.info("Deleting hdfs task log [%s].", p.toUri().toString());
fs.delete(p, true);
}

if (Thread.currentThread().isInterrupted()) {
throw new IOException(
new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.")
);
}
}
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -53,7 +55,7 @@ public void testStream() throws Exception

final Map<Long, String> expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah");
for (Map.Entry<Long, String> entry : expected.entrySet()) {
final String string = readLog(taskLogs, entry.getKey());
final String string = readLog(taskLogs, "foo", entry.getKey());
Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue());
}
}
Expand All @@ -68,17 +70,52 @@ public void testOverwrite() throws Exception

Files.write("blah", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah", readLog(taskLogs, 0));
Assert.assertEquals("blah", readLog(taskLogs, "foo", 0));

Files.write("blah blah", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("foo", logFile);
Assert.assertEquals("blah blah", readLog(taskLogs, 0));
Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0));
}

private String readLog(TaskLogs taskLogs, long offset) throws IOException
@Test
public void testKill() throws Exception
{
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File logFile = new File(tmpDir, "log");

final Path logDirPath = new Path(logDir.toString());
FileSystem fs = new Path(logDir.toString()).getFileSystem(new Configuration());

final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());

Files.write("log1content", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("log1", logFile);
Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0));

//File modification timestamp is only maintained to seconds resolution, so artificial delay
//is necessary to separate 2 file creations by a timestamp that would result in only one
//of them getting deleted
Thread.sleep(1500);
long time = (System.currentTimeMillis()/1000)*1000;
Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time);

Files.write("log2content", logFile, Charsets.UTF_8);
taskLogs.pushTaskLog("log2", logFile);
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));
Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time);

taskLogs.killOlderThan(time);

Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent());
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0));

}

private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException
{
return new String(
ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()),
ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()),
Charsets.UTF_8
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,10 @@ public void killAll() throws IOException
{
throw new UnsupportedOperationException("not implemented");
}

@Override
public void killOlderThan(long timestamp) throws IOException
{
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -89,4 +90,38 @@ public void killAll() throws IOException
log.info("Deleting all task logs from local dir [%s].", config.getDirectory().getAbsolutePath());
FileUtils.deleteDirectory(config.getDirectory());
}

@Override
public void killOlderThan(final long timestamp) throws IOException
{
File taskLogDir = config.getDirectory();
if (taskLogDir.exists()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest && taskLogDir.isDirectory()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, throwing an IOException if it is not a directory would be a reasonable alternative.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if (!taskLogDir.isDirectory()) {
throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference IOE would work best here, not worth blocking on though.

}

File[] files = taskLogDir.listFiles(
new FileFilter()
{
@Override
public boolean accept(File f)
{
return f.lastModified() < timestamp;
}
}
);

for (File file : files) {
log.info("Deleting local task log [%s].", file.getAbsolutePath());
FileUtils.forceDelete(file);

if (Thread.currentThread().isInterrupted()) {
throw new IOException(
new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.")
);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ScalingStats;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.helpers.OverlordHelperManager;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.lifecycle.LifecycleStart;
Expand Down Expand Up @@ -78,7 +79,8 @@ public TaskMaster(
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager
)
{
this.supervisorManager = supervisorManager;
Expand Down Expand Up @@ -120,6 +122,7 @@ public void takeLeadership(CuratorFramework client) throws Exception
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);

leaderLifecycle.addHandler(
new Lifecycle.Handler()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord.helpers;

import java.util.concurrent.ScheduledExecutorService;

/**
*/
public interface OverlordHelper
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you are in this code, u should also consider merging overlord and coordinator :P

Copy link
Copy Markdown
Contributor Author

@himanshug himanshug Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, actually we can. I believe we have had the desire for quite some time and it was discussed in the dev sync meeting too and agreed upon.

I would imagine introducing a CliMaster (a "master" node) that binds/start everything that coordinator and overlords do. also, have only one "leader election" for the "leader master" instead of the two.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug Yeah that would be pretty great, we don't have to do it as part of this PR though, but I think it would be a really useful to have

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, will do that in separate PR. let us keep this one as is.

{
boolean isEnabled();
void schedule(ScheduledExecutorService exec);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the method here should be run()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way i structured things are that helpers schedule themselves on the executor supplied by manager. this is done so that all helpers can have their independent schedule and can potentially run in parallel.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.overlord.helpers;

import com.google.inject.Inject;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;

import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

/**
*/
public class OverlordHelperManager
{
private static final Logger log = new Logger(OverlordHelperManager.class);

private final ScheduledExecutorFactory execFactory;
private final Set<OverlordHelper> helpers;

private volatile ScheduledExecutorService exec;
private final Object startStopLock = new Object();
private volatile boolean started = false;

@Inject
public OverlordHelperManager(
ScheduledExecutorFactory scheduledExecutorFactory,
Set<OverlordHelper> helpers)
{
this.execFactory = scheduledExecutorFactory;
this.helpers = helpers;
}

@LifecycleStart
public void start()
{
synchronized (startStopLock) {
if (!started) {
log.info("OverlordHelperManager is starting.");

for (OverlordHelper helper : helpers) {
if (helper.isEnabled()) {
if (exec == null) {
exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d");
}
helper.schedule(exec);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is basically just repeating what the coordinator is doing

high level comment, can we have this as a coordinator helper?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it is similar. but, i did it on overlord because tasks and tasklogs are owned by overlord and their retention should also be handled at the overlord.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the executor is having issues and rejecting execution, what is the intended behavior here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be a fatal situation, will fail the lifecycle start and consequently prohibit druid process startup.... which is intended behavior in this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool

}
}
started = true;

log.info("OverlordHelperManager is started.");
}
}
}

@LifecycleStop
public void stop()
{
synchronized (startStopLock) {
if (started) {
log.info("OverlordHelperManager is stopping.");
if (exec != null) {
exec.shutdownNow();
exec = null;
}
started = false;

log.info("OverlordHelperManager is stopped.");
}
}
}
}
Loading