From c8cc2c66574e976fdaabe34d9904790d979982e1 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 9 Nov 2016 15:57:50 -0600 Subject: [PATCH 1/3] overlord helpers framework and tasklog auto cleanup --- .../java/io/druid/tasklogs/NoopTaskLogs.java | 6 ++ .../java/io/druid/tasklogs/TaskLogKiller.java | 1 + .../content/configuration/indexing-service.md | 9 ++ .../io/druid/storage/azure/AzureTaskLogs.java | 6 ++ .../storage/hdfs/tasklog/HdfsTaskLogs.java | 24 +++++ .../common/tasklogs/HdfsTaskLogsTest.java | 40 +++++++- .../java/io/druid/storage/s3/S3TaskLogs.java | 6 ++ .../common/tasklogs/FileTaskLogs.java | 28 ++++++ .../druid/indexing/overlord/TaskMaster.java | 5 +- .../overlord/helpers/OverlordHelper.java | 30 ++++++ .../helpers/OverlordHelperManager.java | 91 +++++++++++++++++++ .../overlord/helpers/TaskLogAutoCleaner.java | 79 ++++++++++++++++ .../helpers/TaskLogAutoCleanerConfig.java | 91 +++++++++++++++++++ .../common/tasklogs/FileTaskLogsTest.java | 37 ++++++++ .../helpers/TaskLogAutoCleanerConfigTest.java | 57 ++++++++++++ .../indexing/overlord/http/OverlordTest.java | 4 +- .../main/java/io/druid/cli/CliOverlord.java | 14 ++- 17 files changed, 520 insertions(+), 8 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java diff --git a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java index 4eaf72d14cf0..33dd315235b9 100644 --- a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java +++ b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java @@ -48,4 +48,10 @@ public void killAll() throws IOException { log.info("Noop: No task logs are deleted."); } + + @Override + public void kill(long beforeTimestamp) throws IOException + { + log.info("Noop: No task logs are deleted."); + } } diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java b/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java index 39a879a0ccd5..fa56fb9fa880 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java @@ -26,4 +26,5 @@ public interface TaskLogKiller { void killAll() throws IOException; + void kill(long beforeTimestamp) throws IOException; } diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 6eca7bc1f503..0f314d8fb862 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -25,6 +25,15 @@ If you are running the indexing service in remote mode, the task logs must be st |--------|-----------|-------| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file| +You can also configure the Overlord to automatically retain the task logs only for last x seconds by configuring following additional properties. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| +|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| +|`druid.indexer.logs.kill.initialDelay`| Optional. Number of seconds after overlord start when first auto kill is run. |random value less than 300 (5 mins)| +|`druid.indexer.logs.kill.delay`|Optional. Number of seconds of delay between successive executions of auto kill run. |21600 (6 hours)| + ##### File Task Logs Store task logs in the local filesystem. diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index 5ea9a3242998..c5411c1a7bf0 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -120,4 +120,10 @@ public void killAll() throws IOException { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void kill(long beforeTimestamp) throws IOException + { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 811cb012de28..8f62eb7398a3 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import java.io.File; import java.io.FileInputStream; @@ -123,6 +125,28 @@ public void killAll() throws IOException FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); fs.delete(taskLogDir, true); } + + @Override + public void kill(long beforeTimestamp) throws IOException + { + Path taskLogDir = new Path(config.getDirectory()); + FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); + if (fs.exists(taskLogDir)) { + RemoteIterator iter = fs.listLocatedStatus(taskLogDir); + while (iter.hasNext()) { + LocatedFileStatus file = iter.next(); + if (file.getModificationTime() < beforeTimestamp) { + Path p = file.getPath(); + log.info("Deleting hdfs task log [%s].", p.toUri().toString()); + fs.delete(p, true); + } + + if (Thread.interrupted()) { + throw new IOException("Thread interrupted. Couldn't delete all tasklogs."); + } + } + } + } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index c3b253096a31..8e9ea3daca9a 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -53,7 +53,7 @@ public void testStream() throws Exception final Map expected = ImmutableMap.of(0L, "blah", 1L, "lah", -2L, "ah", -5L, "blah"); for (Map.Entry entry : expected.entrySet()) { - final String string = readLog(taskLogs, entry.getKey()); + final String string = readLog(taskLogs, "foo", entry.getKey()); Assert.assertEquals(String.format("Read with offset %,d", entry.getKey()), string, entry.getValue()); } } @@ -68,17 +68,47 @@ public void testOverwrite() throws Exception Files.write("blah", logFile, Charsets.UTF_8); taskLogs.pushTaskLog("foo", logFile); - Assert.assertEquals("blah", readLog(taskLogs, 0)); + Assert.assertEquals("blah", readLog(taskLogs, "foo", 0)); Files.write("blah blah", logFile, Charsets.UTF_8); taskLogs.pushTaskLog("foo", logFile); - Assert.assertEquals("blah blah", readLog(taskLogs, 0)); + Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0)); } - private String readLog(TaskLogs taskLogs, long offset) throws IOException + @Test + public void testKill() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + Files.write("log1content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log1", logFile); + Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0)); + + //File modification timestamp is only maintained to seconds resolution, so artificial delay + //is necessary to separate 2 file creations by a timestamp that would result in only one + //of them getting deleted + Thread.sleep(1500); + long time = (System.currentTimeMillis()/1000)*1000; + + Files.write("log2content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log2", logFile); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + + taskLogs.kill(time); + + Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent()); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + + } + + private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException { return new String( - ByteStreams.toByteArray(taskLogs.streamTaskLog("foo", offset).get().openStream()), + ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()), Charsets.UTF_8 ); } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index 7f854e86f30e..5c4d8ae294d2 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -144,4 +144,10 @@ public void killAll() throws IOException { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void kill(long beforeTimestamp) throws IOException + { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index 4b24e6a09ee9..b665a5c17c0b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.io.InputStream; @@ -89,4 +90,31 @@ public void killAll() throws IOException log.info("Deleting all task logs from local dir [%s].", config.getDirectory().getAbsolutePath()); FileUtils.deleteDirectory(config.getDirectory()); } + + @Override + public void kill(final long beforeTimestamp) throws IOException + { + File taskLogDir = config.getDirectory(); + if (taskLogDir.exists()) { + File[] files = taskLogDir.listFiles( + new FileFilter() + { + @Override + public boolean accept(File f) + { + return f.lastModified() < beforeTimestamp; + } + } + ); + + for (File file : files) { + log.info("Deleting local task log [%s].", file.getAbsolutePath()); + FileUtils.forceDelete(file); + + if (Thread.interrupted()) { + throw new IOException("Thread interrupted. Couldn't delete all tasklogs."); + } + } + } + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 253b2c9e463a..93ced507882c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -31,6 +31,7 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.helpers.OverlordHelperManager; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.java.util.common.lifecycle.LifecycleStart; @@ -78,7 +79,8 @@ public TaskMaster( final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, final ServiceEmitter emitter, - final SupervisorManager supervisorManager + final SupervisorManager supervisorManager, + final OverlordHelperManager overlordHelperManager ) { this.supervisorManager = supervisorManager; @@ -120,6 +122,7 @@ public void takeLeadership(CuratorFramework client) throws Exception leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(supervisorManager); + leaderLifecycle.addManagedInstance(overlordHelperManager); leaderLifecycle.addHandler( new Lifecycle.Handler() diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java new file mode 100644 index 000000000000..0b5cbf919228 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelper.java @@ -0,0 +1,30 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public interface OverlordHelper +{ + boolean isEnabled(); + void schedule(ScheduledExecutorService exec); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java new file mode 100644 index 000000000000..912ab3c23db4 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java @@ -0,0 +1,91 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.google.inject.Inject; +import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.common.logger.Logger; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class OverlordHelperManager +{ + private static final Logger log = new Logger(OverlordHelperManager.class); + + private final ScheduledExecutorFactory execFactory; + private final Set helpers; + + private volatile ScheduledExecutorService exec; + private final Object lock = new Object(); + private volatile boolean started = false; + + @Inject + public OverlordHelperManager( + ScheduledExecutorFactory scheduledExecutorFactory, + Set helpers) + { + this.execFactory = scheduledExecutorFactory; + this.helpers = helpers; + } + + @LifecycleStart + public void start() + { + synchronized (lock) { + if (!started) { + log.info("OverlordHelperManager is starting."); + + for (OverlordHelper helper : helpers) { + if (helper.isEnabled()) { + if (exec == null) { + exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d"); + } + helper.schedule(exec); + } + } + started = true; + + log.info("OverlordHelperManager is started."); + } + } + } + + @LifecycleStop + public void stop() + { + synchronized (lock) { + if (started) { + log.info("OverlordHelperManager is stopping."); + if (exec != null) { + exec.shutdownNow(); + exec = null; + } + started = false; + + log.info("OverlordHelperManager is stopped."); + } + } + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java new file mode 100644 index 000000000000..73c0fb5db800 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java @@ -0,0 +1,79 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.google.inject.Inject; +import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.logger.Logger; +import io.druid.tasklogs.TaskLogKiller; +import org.joda.time.Duration; + +import java.util.concurrent.ScheduledExecutorService; + +/** + */ +public class TaskLogAutoCleaner implements OverlordHelper +{ + private static final Logger log = new Logger(TaskLogAutoCleaner.class); + + private final TaskLogKiller taskLogKiller; + private final TaskLogAutoCleanerConfig config; + + @Inject + public TaskLogAutoCleaner( + TaskLogKiller taskLogKiller, + TaskLogAutoCleanerConfig config + ) + { + this.taskLogKiller = taskLogKiller; + this.config = config; + } + + @Override + public boolean isEnabled() + { + return config.isEnabled(); + } + + @Override + public void schedule(ScheduledExecutorService exec) + { + log.info("Scheduling TaskLogAutoCleaner with config [%s].", config.toString()); + + ScheduledExecutors.scheduleWithFixedDelay( + exec, + Duration.standardSeconds(config.getInitialDelay()), + Duration.standardSeconds(config.getDelay()), + new Runnable() + { + @Override + public void run() + { + try { + taskLogKiller.kill(System.currentTimeMillis() - config.getDurationToRetain()*1000); + } + catch (Exception ex) { + log.error(ex, "Failed to clean-up the task logs"); + } + } + } + ); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java new file mode 100644 index 000000000000..9c7e5af29622 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java @@ -0,0 +1,91 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Random; + +/** + */ +public class TaskLogAutoCleanerConfig +{ + @JsonProperty + private final boolean enabled; + + @JsonProperty + private final long initialDelay; + + @JsonProperty + private final long delay; + + @JsonProperty + private final long durationToRetain; + + @JsonCreator + public TaskLogAutoCleanerConfig( + @JsonProperty("enabled") boolean enabled, + @JsonProperty("initialDelay") Long initialDelay, + @JsonProperty("delay") Long delay, + @JsonProperty("durationToRetain") Long durationToRetain + ){ + if (enabled) { + Preconditions.checkNotNull(durationToRetain, "durationToRetain must be provided."); + } + + this.enabled = enabled; + this.initialDelay = initialDelay == null ? 60 + new Random().nextInt(4*60) : initialDelay.longValue(); + this.delay = delay == null ? 6*60*60 : delay.longValue(); + this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue(); + } + + public boolean isEnabled() + { + return true; + } + + public long getInitialDelay() + { + return initialDelay; + } + + public long getDelay() + { + return delay; + } + + public long getDurationToRetain() + { + return durationToRetain; + } + + @Override + public String toString() + { + return "TaskLogAutoCleanerConfig{" + + "enabled=" + enabled + + ", initialDelay=" + initialDelay + + ", delay=" + delay + + ", durationToRetain=" + durationToRetain + + '}'; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java index 79415c07310f..26e355db693f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -86,4 +86,41 @@ public void testPushTaskLogDirCreationFails() throws Exception expectedException.expectMessage("Unable to create task log dir"); taskLogs.pushTaskLog("foo", logFile); } + + @Test + public void testKill() throws Exception + { + final File tmpDir = temporaryFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File logFile = new File(tmpDir, "log"); + final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir)); + + Files.write("log1content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log1", logFile); + Assert.assertEquals("log1content", readLog(taskLogs, "log1", 0)); + + //File modification timestamp is only maintained to seconds resolution, so artificial delay + //is necessary to separate 2 file creations by a timestamp that would result in only one + //of them getting deleted + Thread.sleep(1500); + long time = (System.currentTimeMillis()/1000)*1000; + + Files.write("log2content", logFile, Charsets.UTF_8); + taskLogs.pushTaskLog("log2", logFile); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + + taskLogs.kill(time); + + Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent()); + Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + + } + + private String readLog(TaskLogs taskLogs, String logFile, long offset) throws IOException + { + return new String( + ByteStreams.toByteArray(taskLogs.streamTaskLog(logFile, offset).get().openStream()), + Charsets.UTF_8 + ); + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java new file mode 100644 index 000000000000..01579618e157 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java @@ -0,0 +1,57 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.overlord.helpers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.TestUtil; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class TaskLogAutoCleanerConfigTest +{ + @Test + public void testSerde() throws Exception + { + String json = "{\n" + + " \"enabled\": true,\n" + + " \"initialDelay\": 10,\n" + + " \"delay\": 40,\n" + + " \"durationToRetain\": 30\n" + + "}"; + + ObjectMapper mapper = TestUtil.MAPPER; + + TaskLogAutoCleanerConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + json, + TaskLogAutoCleanerConfig.class + ) + ), TaskLogAutoCleanerConfig.class + ); + + Assert.assertEquals(true, config.isEnabled()); + Assert.assertEquals(10, config.getInitialDelay()); + Assert.assertEquals(40, config.getDelay()); + Assert.assertEquals(30, config.getDurationToRetain()); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index e39d07886ae5..7ac6c94a16d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -48,6 +48,7 @@ import io.druid.indexing.overlord.TaskStorageQueryAdapter; import io.druid.indexing.overlord.autoscaling.ScalingStats; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.helpers.OverlordHelperManager; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; @@ -183,7 +184,8 @@ public void announce(DruidNode node) } }, serviceEmitter, - supervisorManager + supervisorManager, + EasyMock.createNiceMock(OverlordHelperManager.class) ); EmittingLogger.registerEmitter(serviceEmitter); } diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 0c37f4e34bb6..39aa998bf0ae 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -26,10 +26,10 @@ import com.google.inject.Module; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; +import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Names; import com.google.inject.servlet.GuiceFilter; import com.google.inject.util.Providers; - import io.airlift.airline.Command; import io.druid.audit.AuditManager; import io.druid.client.indexing.IndexingServiceSelectorConfig; @@ -67,6 +67,9 @@ import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementConfig; import io.druid.indexing.overlord.autoscaling.SimpleWorkerResourceManagementStrategy; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.helpers.OverlordHelper; +import io.druid.indexing.overlord.helpers.TaskLogAutoCleaner; +import io.druid.indexing.overlord.helpers.TaskLogAutoCleanerConfig; import io.druid.indexing.overlord.http.OverlordRedirectInfo; import io.druid.indexing.overlord.http.OverlordResource; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; @@ -149,6 +152,7 @@ public void configure(Binder binder) configureTaskStorage(binder); configureAutoscale(binder); configureRunners(binder); + configureOverlordHelpers(binder); binder.bind(AuditManager.class) .toProvider(AuditManagerProvider.class) @@ -232,6 +236,14 @@ private void configureAutoscale(Binder binder) biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class); } + + private void configureOverlordHelpers(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.indexer.logs.kill", TaskLogAutoCleanerConfig.class); + Multibinder.newSetBinder(binder, OverlordHelper.class) + .addBinding() + .to(TaskLogAutoCleaner.class); + } }, new IndexingServiceFirehoseModule(), new IndexingServiceTaskLogsModule() From e9dae87edc6b515d6579e084faba9662cbcf55e0 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 14 Nov 2016 10:56:11 -0600 Subject: [PATCH 2/3] review comment changes --- api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java | 2 +- api/src/main/java/io/druid/tasklogs/TaskLogKiller.java | 2 +- docs/content/configuration/indexing-service.md | 9 +++++---- .../java/io/druid/storage/azure/AzureTaskLogs.java | 2 +- .../io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java | 10 ++++++---- .../indexing/common/tasklogs/HdfsTaskLogsTest.java | 9 ++++++++- .../src/main/java/io/druid/storage/s3/S3TaskLogs.java | 2 +- .../druid/indexing/common/tasklogs/FileTaskLogs.java | 10 ++++++---- .../indexing/overlord/helpers/TaskLogAutoCleaner.java | 6 +++--- .../overlord/helpers/TaskLogAutoCleanerConfig.java | 4 ++-- .../indexing/common/tasklogs/FileTaskLogsTest.java | 4 +++- 11 files changed, 37 insertions(+), 23 deletions(-) diff --git a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java index 33dd315235b9..f82bf1d7b586 100644 --- a/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java +++ b/api/src/main/java/io/druid/tasklogs/NoopTaskLogs.java @@ -50,7 +50,7 @@ public void killAll() throws IOException } @Override - public void kill(long beforeTimestamp) throws IOException + public void killOlderThan(long timestamp) throws IOException { log.info("Noop: No task logs are deleted."); } diff --git a/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java b/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java index fa56fb9fa880..5296ea29a8f6 100644 --- a/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java +++ b/api/src/main/java/io/druid/tasklogs/TaskLogKiller.java @@ -26,5 +26,5 @@ public interface TaskLogKiller { void killAll() throws IOException; - void kill(long beforeTimestamp) throws IOException; + void killOlderThan(long timestamp) throws IOException; } diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 0f314d8fb862..348b6b167960 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -25,14 +25,15 @@ If you are running the indexing service in remote mode, the task logs must be st |--------|-----------|-------| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file| -You can also configure the Overlord to automatically retain the task logs only for last x seconds by configuring following additional properties. +You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties. +Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior. |Property|Description|Default| |--------|-----------|-------| |`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| -|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| -|`druid.indexer.logs.kill.initialDelay`| Optional. Number of seconds after overlord start when first auto kill is run. |random value less than 300 (5 mins)| -|`druid.indexer.logs.kill.delay`|Optional. Number of seconds of delay between successive executions of auto kill run. |21600 (6 hours)| +|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None| +|`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)| +|`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)| ##### File Task Logs diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java index c5411c1a7bf0..1ec0c2c7b67c 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/storage/azure/AzureTaskLogs.java @@ -122,7 +122,7 @@ public void killAll() throws IOException } @Override - public void kill(long beforeTimestamp) throws IOException + public void killOlderThan(long timestamp) throws IOException { throw new UnsupportedOperationException("not implemented"); } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 8f62eb7398a3..62f38303c77e 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -127,7 +127,7 @@ public void killAll() throws IOException } @Override - public void kill(long beforeTimestamp) throws IOException + public void killOlderThan(long timestamp) throws IOException { Path taskLogDir = new Path(config.getDirectory()); FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); @@ -135,14 +135,16 @@ public void kill(long beforeTimestamp) throws IOException RemoteIterator iter = fs.listLocatedStatus(taskLogDir); while (iter.hasNext()) { LocatedFileStatus file = iter.next(); - if (file.getModificationTime() < beforeTimestamp) { + if (file.getModificationTime() < timestamp) { Path p = file.getPath(); log.info("Deleting hdfs task log [%s].", p.toUri().toString()); fs.delete(p, true); } - if (Thread.interrupted()) { - throw new IOException("Thread interrupted. Couldn't delete all tasklogs."); + if (Thread.currentThread().isInterrupted()) { + throw new IOException( + new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.") + ); } } } diff --git a/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 8e9ea3daca9a..a90164ee67ed 100644 --- a/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/io/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -27,6 +27,8 @@ import io.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import io.druid.tasklogs.TaskLogs; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -82,6 +84,9 @@ public void testKill() throws Exception final File logDir = new File(tmpDir, "logs"); final File logFile = new File(tmpDir, "log"); + final Path logDirPath = new Path(logDir.toString()); + FileSystem fs = new Path(logDir.toString()).getFileSystem(new Configuration()); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); Files.write("log1content", logFile, Charsets.UTF_8); @@ -93,12 +98,14 @@ public void testKill() throws Exception //of them getting deleted Thread.sleep(1500); long time = (System.currentTimeMillis()/1000)*1000; + Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log1")).getModificationTime() < time); Files.write("log2content", logFile, Charsets.UTF_8); taskLogs.pushTaskLog("log2", logFile); Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + Assert.assertTrue(fs.getFileStatus(new Path(logDirPath, "log2")).getModificationTime() >= time); - taskLogs.kill(time); + taskLogs.killOlderThan(time); Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent()); Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index 5c4d8ae294d2..0a3cdcebe0f4 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -146,7 +146,7 @@ public void killAll() throws IOException } @Override - public void kill(long beforeTimestamp) throws IOException + public void killOlderThan(long timestamp) throws IOException { throw new UnsupportedOperationException("not implemented"); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index b665a5c17c0b..61f731a91f07 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -92,7 +92,7 @@ public void killAll() throws IOException } @Override - public void kill(final long beforeTimestamp) throws IOException + public void killOlderThan(final long timestamp) throws IOException { File taskLogDir = config.getDirectory(); if (taskLogDir.exists()) { @@ -102,7 +102,7 @@ public void kill(final long beforeTimestamp) throws IOException @Override public boolean accept(File f) { - return f.lastModified() < beforeTimestamp; + return f.lastModified() < timestamp; } } ); @@ -111,8 +111,10 @@ public boolean accept(File f) log.info("Deleting local task log [%s].", file.getAbsolutePath()); FileUtils.forceDelete(file); - if (Thread.interrupted()) { - throw new IOException("Thread interrupted. Couldn't delete all tasklogs."); + if (Thread.currentThread().isInterrupted()) { + throw new IOException( + new InterruptedException("Thread interrupted. Couldn't delete all tasklogs.") + ); } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java index 73c0fb5db800..d413d6ae46f3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java @@ -59,15 +59,15 @@ public void schedule(ScheduledExecutorService exec) ScheduledExecutors.scheduleWithFixedDelay( exec, - Duration.standardSeconds(config.getInitialDelay()), - Duration.standardSeconds(config.getDelay()), + Duration.millis(config.getInitialDelay()), + Duration.millis(config.getDelay()), new Runnable() { @Override public void run() { try { - taskLogKiller.kill(System.currentTimeMillis() - config.getDurationToRetain()*1000); + taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain()); } catch (Exception ex) { log.error(ex, "Failed to clean-up the task logs"); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java index 9c7e5af29622..60c41bd7bfd0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java @@ -53,8 +53,8 @@ public TaskLogAutoCleanerConfig( } this.enabled = enabled; - this.initialDelay = initialDelay == null ? 60 + new Random().nextInt(4*60) : initialDelay.longValue(); - this.delay = delay == null ? 6*60*60 : delay.longValue(); + this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue(); + this.delay = delay == null ? 6*60*60*1000 : delay.longValue(); this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java index 26e355db693f..64c97666ffa0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -104,12 +104,14 @@ public void testKill() throws Exception //of them getting deleted Thread.sleep(1500); long time = (System.currentTimeMillis()/1000)*1000; + Assert.assertTrue(new File(logDir, "log1.log").lastModified() < time); Files.write("log2content", logFile, Charsets.UTF_8); taskLogs.pushTaskLog("log2", logFile); Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); + Assert.assertTrue(new File(logDir, "log2.log").lastModified() >= time); - taskLogs.kill(time); + taskLogs.killOlderThan(time); Assert.assertFalse(taskLogs.streamTaskLog("log1", 0).isPresent()); Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); From 8293b42102ba03c6d491d57aa7460f18feb2fa6c Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 12 Dec 2016 13:19:32 -0600 Subject: [PATCH 3/3] further review comments addressed --- .../druid/storage/google/GoogleTaskLogs.java | 6 +++++ .../storage/hdfs/tasklog/HdfsTaskLogs.java | 5 ++++ .../common/tasklogs/FileTaskLogs.java | 5 ++++ .../helpers/OverlordHelperManager.java | 6 ++--- .../helpers/TaskLogAutoCleanerConfig.java | 6 ++++- .../helpers/TaskLogAutoCleanerConfigTest.java | 24 ++++++++++++++++++- 6 files changed, 47 insertions(+), 5 deletions(-) diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java index 90cf4738150f..08cf00d82f0b 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/storage/google/GoogleTaskLogs.java @@ -106,4 +106,10 @@ public void killAll() throws IOException { throw new UnsupportedOperationException("not implemented"); } + + @Override + public void killOlderThan(long timestamp) throws IOException + { + throw new UnsupportedOperationException("not implemented"); + } } diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 62f38303c77e..f438b6643673 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -132,6 +132,11 @@ public void killOlderThan(long timestamp) throws IOException Path taskLogDir = new Path(config.getDirectory()); FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); if (fs.exists(taskLogDir)) { + + if (!fs.isDirectory(taskLogDir)) { + throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir)); + } + RemoteIterator iter = fs.listLocatedStatus(taskLogDir); while (iter.hasNext()) { LocatedFileStatus file = iter.next(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java index 61f731a91f07..9e36fdca8185 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -96,6 +96,11 @@ public void killOlderThan(final long timestamp) throws IOException { File taskLogDir = config.getDirectory(); if (taskLogDir.exists()) { + + if (!taskLogDir.isDirectory()) { + throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir)); + } + File[] files = taskLogDir.listFiles( new FileFilter() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java index 912ab3c23db4..ee95def43c20 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/OverlordHelperManager.java @@ -38,7 +38,7 @@ public class OverlordHelperManager private final Set helpers; private volatile ScheduledExecutorService exec; - private final Object lock = new Object(); + private final Object startStopLock = new Object(); private volatile boolean started = false; @Inject @@ -53,7 +53,7 @@ public OverlordHelperManager( @LifecycleStart public void start() { - synchronized (lock) { + synchronized (startStopLock) { if (!started) { log.info("OverlordHelperManager is starting."); @@ -75,7 +75,7 @@ public void start() @LifecycleStop public void stop() { - synchronized (lock) { + synchronized (startStopLock) { if (started) { log.info("OverlordHelperManager is stopping."); if (exec != null) { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java index 60c41bd7bfd0..b5907e9778cb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfig.java @@ -56,11 +56,15 @@ public TaskLogAutoCleanerConfig( this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue(); this.delay = delay == null ? 6*60*60*1000 : delay.longValue(); this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue(); + + Preconditions.checkArgument(this.initialDelay > 0, "initialDelay must be > 0."); + Preconditions.checkArgument(this.delay > 0, "delay must be > 0."); + Preconditions.checkArgument(this.durationToRetain > 0, "durationToRetain must be > 0."); } public boolean isEnabled() { - return true; + return this.enabled; } public long getInitialDelay() diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java index 01579618e157..1a5b0e54c23b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/helpers/TaskLogAutoCleanerConfigTest.java @@ -49,9 +49,31 @@ public void testSerde() throws Exception ), TaskLogAutoCleanerConfig.class ); - Assert.assertEquals(true, config.isEnabled()); + Assert.assertTrue(config.isEnabled()); Assert.assertEquals(10, config.getInitialDelay()); Assert.assertEquals(40, config.getDelay()); Assert.assertEquals(30, config.getDurationToRetain()); } + + @Test + public void testSerdeWithDefaults() throws Exception + { + String json = "{}"; + + ObjectMapper mapper = TestUtil.MAPPER; + + TaskLogAutoCleanerConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + json, + TaskLogAutoCleanerConfig.class + ) + ), TaskLogAutoCleanerConfig.class + ); + + Assert.assertFalse(config.isEnabled()); + Assert.assertTrue(config.getInitialDelay() >= 60000 && config.getInitialDelay() <= 300000); + Assert.assertEquals(6*60*60*1000, config.getDelay()); + Assert.assertEquals(Long.MAX_VALUE, config.getDurationToRetain()); + } }