From c0caaffb5ff44ae40e7f8a213ec0393e3eeaed1a Mon Sep 17 00:00:00 2001 From: seoeun Date: Fri, 9 Nov 2018 11:46:57 +0900 Subject: [PATCH 1/6] tasks tables in metadata storage are not cleared --- .../MetadataStorageActionHandler.java | 32 ++++++++--- .../overlord/HeapMemoryTaskStorage.java | 19 +++++++ .../overlord/MetadataTaskStorage.java | 7 +++ .../druid/indexing/overlord/TaskStorage.java | 24 ++++++-- .../overlord/helpers/TaskLogAutoCleaner.java | 8 ++- .../SQLMetadataStorageActionHandler.java | 20 +++++++ .../SQLMetadataStorageActionHandlerTest.java | 55 +++++++++++++++++++ 7 files changed, 149 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index 1614464d76a7..941652e61c61 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -32,13 +32,14 @@ public interface MetadataStorageActionHandler getEntry(String entryId); @@ -74,6 +77,7 @@ void insert( * Retrieve the status for the entry with the given id. * * @param entryId entry id + * * @return optional status, absent if entry does not exist or status is not set */ Optional getStatus(String entryId); @@ -107,7 +111,8 @@ List> getCompletedTaskInfo( * Add a lock to the given entry * * @param entryId entry id - * @param lock lock to add + * @param lock lock to add + * * @return true if the lock was added */ boolean addLock(String entryId, LockType lock); @@ -130,6 +135,13 @@ List> getCompletedTaskInfo( */ void removeLock(long lockId); + /** + * Remove the tasks created order than the given createdTime. + * + * @param createdTime datetime to check the {@code created_date} of tasks + */ + void removeTasksBefore(DateTime createdTime); + /** * Add a log to the entry with the given id. * @@ -143,6 +155,7 @@ List> getCompletedTaskInfo( * Returns the logs for the entry with the given id. * * @param entryId entry id + * * @return list of logs */ List getLogs(String entryId); @@ -151,6 +164,7 @@ List> getCompletedTaskInfo( * Returns the locks for the given entry * * @param entryId entry id + * * @return map of lockId to lock */ Map getLocks(String entryId); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 006286ac8417..4ae68fb13fcd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -362,6 +362,25 @@ public void removeLock(final String taskid, final TaskLock taskLock) } } + @Override + public void removeTasksBefore(final DateTime createdTime) + { + giant.lock(); + + try { + List taskIds = tasks.entrySet().stream() + .filter(entry -> entry.getValue().getStatus().isComplete() + && entry.getValue().getCreatedDate().isBefore(createdTime)) + .map(entry -> entry.getKey()) + .collect(Collectors.toList()); + + taskIds.stream().forEach(taskId -> tasks.remove(taskId)); + } + finally { + giant.unlock(); + } + } + @Override public List getLocks(final String taskid) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 808fdb797295..e216a5d387b6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -44,6 +44,7 @@ import org.apache.druid.metadata.MetadataStorageActionHandlerTypes; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -276,6 +277,12 @@ public void removeLock(String taskid, TaskLock taskLockToRemove) } } + @Override + public void removeTasksBefore(DateTime createdTime) + { + handler.removeTasksBefore(createdTime); + } + @Override public List getLocks(String taskid) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index b2f55f0c9d43..400b64e4d84a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -26,6 +26,7 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.EntryExistsException; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -36,8 +37,9 @@ public interface TaskStorage /** * Adds a task to the storage facility with a particular status. * - * @param task task to add + * @param task task to add * @param status task status + * * @throws EntryExistsException if the task ID already exists */ void insert(Task task, TaskStatus status) throws EntryExistsException; @@ -52,7 +54,8 @@ public interface TaskStorage /** * Persists lock state in the storage facility. - * @param taskid task ID + * + * @param taskid task ID * @param taskLock lock state */ void addLock(String taskid, TaskLock taskLock); @@ -70,11 +73,18 @@ public interface TaskStorage * Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but * this method can help reclaim wasted space. * - * @param taskid task ID + * @param taskid task ID * @param taskLock lock state */ void removeLock(String taskid, TaskLock taskLock); + /** + * Remove the tasks created order than the given createdTime. + * + * @param createdTime datetime to check the {@code created_date} of tasks + */ + void removeTasksBefore(DateTime createdTime); + /** * Returns task as stored in the storage facility. If the task ID does not exist, this will return an * absentee Optional. @@ -91,6 +101,7 @@ public interface TaskStorage * an absentee Optional. * * @param taskid task ID + * * @return task status */ Optional getStatus(String taskid); @@ -101,10 +112,9 @@ public interface TaskStorage /** * Add an action taken by a task to the audit log. * - * @param task task to record action for + * @param task task to record action for * @param taskAction task action to record - * - * @param task action return type + * @param task action return type */ @Deprecated void addAuditLog(Task task, TaskAction taskAction); @@ -113,6 +123,7 @@ public interface TaskStorage * Returns all actions taken by a task. * * @param taskid task ID + * * @return list of task actions */ @Deprecated @@ -156,6 +167,7 @@ List> getRecentlyFinishedTaskInfo( * Returns a list of locks for a particular task. * * @param taskid task ID + * * @return list of TaskLocks for the given task */ List getLocks(String taskid); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java index 18da374fae71..1f92e69ee00b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java @@ -20,6 +20,8 @@ package org.apache.druid.indexing.overlord.helpers; import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.tasklogs.TaskLogKiller; @@ -35,15 +37,18 @@ public class TaskLogAutoCleaner implements OverlordHelper private final TaskLogKiller taskLogKiller; private final TaskLogAutoCleanerConfig config; + private final TaskStorage taskStorage; @Inject public TaskLogAutoCleaner( TaskLogKiller taskLogKiller, - TaskLogAutoCleanerConfig config + TaskLogAutoCleanerConfig config, + TaskStorage taskStorage ) { this.taskLogKiller = taskLogKiller; this.config = config; + this.taskStorage = taskStorage; } @Override @@ -68,6 +73,7 @@ public void run() { try { taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain()); + taskStorage.removeTasksBefore(DateTimes.nowUtc().minus(config.getDurationToRetain())); } catch (Exception ex) { log.error(ex, "Failed to clean-up the task logs"); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index ebee2c192ff8..9c7fd40c52ac 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -439,6 +439,26 @@ public Void withHandle(Handle handle) ); } + @Override + public void removeTasksBefore(final DateTime createdTime) + { + connector.retryWithHandle( + (HandleCallback) handle -> { + log.info("Deleting tasks before [%s] and inactive at metastore.", createdTime.toString()); + handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE created_date < :created_date AND active = false", + entryTable + ) + ) + .bind("created_date", createdTime.toString()) + .execute(); + + return null; + } + ); + } + private int removeLock(Handle handle, long lockId) { return handle.createStatement(StringUtils.format("DELETE FROM %s WHERE id = :id", lockTable)) diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 46dc7f49049d..ac1f38fb5cce 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -381,4 +381,59 @@ public void testGetLockId() throws EntryExistsException Assert.assertNotNull(handler.getLockId(entryId, lock1)); Assert.assertNull(handler.getLockId(entryId, lock2)); } + + @Test + public void testRemoveTasksBefore() throws Exception + { + final String entryId1 = "1234"; + Map entry1 = ImmutableMap.of("numericId", 1234); + Map status1 = ImmutableMap.of("count", 42, "temp", 1); + handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1); + + final String entryId2 = "ABC123"; + Map entry2 = ImmutableMap.of("a", 1); + Map status2 = ImmutableMap.of("count", 42); + handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2); + + final String entryId3 = "DEF5678"; + Map entry3 = ImmutableMap.of("numericId", 5678); + Map status3 = ImmutableMap.of("count", 21, "temp", 2); + handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3); + + Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1)); + Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2)); + Assert.assertEquals(Optional.of(entry3), handler.getEntry(entryId3)); + + Assert.assertEquals( + ImmutableList.of(entryId2), + handler.getActiveTaskInfo(null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + ); + + Assert.assertEquals( + ImmutableList.of(entryId3, entryId1), + handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + + ); + + handler.removeTasksBefore(DateTimes.of("2014-01-02")); + // active task not removed. + Assert.assertEquals( + ImmutableList.of(entryId2), + handler.getActiveTaskInfo(null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + ); + Assert.assertEquals( + ImmutableList.of(entryId3), + handler.getCompletedTaskInfo(DateTimes.of("2014-01-02"), null, null).stream() + .map(taskInfo -> taskInfo.getId()) + .collect(Collectors.toList()) + + ); + + } } From eb90baaa31db2c0fa4dac09fa70f0c6a4f51f431 Mon Sep 17 00:00:00 2001 From: seoeun Date: Mon, 12 Nov 2018 18:38:58 +0900 Subject: [PATCH 2/6] address comments. remove tasklogs and revert obsolete changes --- .../MetadataStorageActionHandler.java | 31 +++++++------------ .../overlord/HeapMemoryTaskStorage.java | 7 +++-- .../overlord/MetadataTaskStorage.java | 5 ++- .../druid/indexing/overlord/TaskStorage.java | 21 +++++-------- .../overlord/helpers/TaskLogAutoCleaner.java | 6 ++-- .../DerbyMetadataStorageActionHandler.java | 9 ++++++ ...ostgreSQLMetadataStorageActionHandler.java | 10 ++++++ .../SQLMetadataStorageActionHandler.java | 30 +++++++++++++++++- .../SQLMetadataStorageActionHandlerTest.java | 14 ++++++--- 9 files changed, 87 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index 941652e61c61..86ac8059ec39 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -32,14 +32,13 @@ public interface MetadataStorageActionHandler getEntry(String entryId); @@ -77,7 +74,6 @@ void insert( * Retrieve the status for the entry with the given id. * * @param entryId entry id - * * @return optional status, absent if entry does not exist or status is not set */ Optional getStatus(String entryId); @@ -111,8 +107,7 @@ List> getCompletedTaskInfo( * Add a lock to the given entry * * @param entryId entry id - * @param lock lock to add - * + * @param lock lock to add * @return true if the lock was added */ boolean addLock(String entryId, LockType lock); @@ -137,10 +132,10 @@ List> getCompletedTaskInfo( /** * Remove the tasks created order than the given createdTime. - * - * @param createdTime datetime to check the {@code created_date} of tasks + * + * @param timestamp timestamp to check the {@code created_date} of tasks */ - void removeTasksBefore(DateTime createdTime); + void removeTasksOlderThan(long timestamp); /** * Add a log to the entry with the given id. @@ -155,7 +150,6 @@ List> getCompletedTaskInfo( * Returns the logs for the entry with the given id. * * @param entryId entry id - * * @return list of logs */ List getLogs(String entryId); @@ -164,7 +158,6 @@ List> getCompletedTaskInfo( * Returns the locks for the given entry * * @param entryId entry id - * * @return map of lockId to lock */ Map getLocks(String entryId); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index 4ae68fb13fcd..0480e179783b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -363,18 +363,19 @@ public void removeLock(final String taskid, final TaskLock taskLock) } @Override - public void removeTasksBefore(final DateTime createdTime) + public void removeTasksOlderThan(final long timestamp) { giant.lock(); try { List taskIds = tasks.entrySet().stream() .filter(entry -> entry.getValue().getStatus().isComplete() - && entry.getValue().getCreatedDate().isBefore(createdTime)) + && entry.getValue().getCreatedDate().isBefore(timestamp)) .map(entry -> entry.getKey()) .collect(Collectors.toList()); - taskIds.stream().forEach(taskId -> tasks.remove(taskId)); + taskIds.forEach(taskActions::removeAll); + taskIds.forEach(tasks::remove); } finally { giant.unlock(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index e216a5d387b6..a93e385edd20 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -44,7 +44,6 @@ import org.apache.druid.metadata.MetadataStorageActionHandlerTypes; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; -import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -278,9 +277,9 @@ public void removeLock(String taskid, TaskLock taskLockToRemove) } @Override - public void removeTasksBefore(DateTime createdTime) + public void removeTasksOlderThan(long timestamp) { - handler.removeTasksBefore(createdTime); + handler.removeTasksOlderThan(timestamp); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 400b64e4d84a..6b079b3a20da 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -26,7 +26,6 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.EntryExistsException; -import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -37,9 +36,8 @@ public interface TaskStorage /** * Adds a task to the storage facility with a particular status. * - * @param task task to add + * @param task task to add * @param status task status - * * @throws EntryExistsException if the task ID already exists */ void insert(Task task, TaskStatus status) throws EntryExistsException; @@ -54,8 +52,7 @@ public interface TaskStorage /** * Persists lock state in the storage facility. - * - * @param taskid task ID + * @param taskid task ID * @param taskLock lock state */ void addLock(String taskid, TaskLock taskLock); @@ -73,7 +70,7 @@ public interface TaskStorage * Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but * this method can help reclaim wasted space. * - * @param taskid task ID + * @param taskid task ID * @param taskLock lock state */ void removeLock(String taskid, TaskLock taskLock); @@ -81,9 +78,9 @@ public interface TaskStorage /** * Remove the tasks created order than the given createdTime. * - * @param createdTime datetime to check the {@code created_date} of tasks + * @param timestamp timestamp to check the {@code created_date} of tasks */ - void removeTasksBefore(DateTime createdTime); + void removeTasksOlderThan(long timestamp); /** * Returns task as stored in the storage facility. If the task ID does not exist, this will return an @@ -101,7 +98,6 @@ public interface TaskStorage * an absentee Optional. * * @param taskid task ID - * * @return task status */ Optional getStatus(String taskid); @@ -112,9 +108,10 @@ public interface TaskStorage /** * Add an action taken by a task to the audit log. * - * @param task task to record action for + * @param task task to record action for * @param taskAction task action to record - * @param task action return type + * + * @param task action return type */ @Deprecated void addAuditLog(Task task, TaskAction taskAction); @@ -123,7 +120,6 @@ public interface TaskStorage * Returns all actions taken by a task. * * @param taskid task ID - * * @return list of task actions */ @Deprecated @@ -167,7 +163,6 @@ List> getRecentlyFinishedTaskInfo( * Returns a list of locks for a particular task. * * @param taskid task ID - * * @return list of TaskLocks for the given task */ List getLocks(String taskid); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java index 1f92e69ee00b..4ab544049250 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/helpers/TaskLogAutoCleaner.java @@ -21,7 +21,6 @@ import com.google.inject.Inject; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.tasklogs.TaskLogKiller; @@ -72,8 +71,9 @@ public void schedule(ScheduledExecutorService exec) public void run() { try { - taskLogKiller.killOlderThan(System.currentTimeMillis() - config.getDurationToRetain()); - taskStorage.removeTasksBefore(DateTimes.nowUtc().minus(config.getDurationToRetain())); + long timestamp = System.currentTimeMillis() - config.getDurationToRetain(); + taskLogKiller.killOlderThan(timestamp); + taskStorage.removeTasksOlderThan(timestamp); } catch (Exception ex) { log.error(ex, "Failed to clean-up the task logs"); diff --git a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java index 039a6427f02e..bcfb10ee3b5d 100644 --- a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -91,4 +91,13 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data } return sql; } + + @Deprecated + @Override + public String getSqlRemoveLogsOlderThan() + { + return StringUtils.format("DELETE FROM %s WHERE %s_id in (" + + " SELECT id FROM %s WHERE created_date < :created_date and active = false)", + getLogTable(), getEntryTypeName(), getEntryTable()); + } } diff --git a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java index 2f902365e743..6156743219a5 100644 --- a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java @@ -89,4 +89,14 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data } return sql; } + + @Deprecated + @Override + public String getSqlRemoveLogsOlderThan() + { + return StringUtils.format("DELETE FROM %s USING %s " + + "WHERE %s_id = %s.id AND created_date < :created_date and active = false", + getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable()); + } + } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 9c7fd40c52ac..74c6032566d7 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -113,6 +113,16 @@ protected String getEntryTable() return entryTable; } + protected String getLogTable() + { + return logTable; + } + + protected String getEntryTypeName() + { + return entryTypeName; + } + public TypeReference getEntryType() { return entryType; @@ -440,11 +450,20 @@ public Void withHandle(Handle handle) } @Override - public void removeTasksBefore(final DateTime createdTime) + public void removeTasksOlderThan(final long timestamp) { connector.retryWithHandle( (HandleCallback) handle -> { + DateTime createdTime = DateTimes.utc(timestamp); log.info("Deleting tasks before [%s] and inactive at metastore.", createdTime.toString()); + handle.createStatement( + StringUtils.format( + getSqlRemoveLogsOlderThan(), + logTable, entryTable + ) + ) + .bind("created_date", createdTime.toString()) + .execute(); handle.createStatement( StringUtils.format( "DELETE FROM %s WHERE created_date < :created_date AND active = false", @@ -528,6 +547,15 @@ public List withHandle(Handle handle) ); } + @Deprecated + public String getSqlRemoveLogsOlderThan() + { + return StringUtils.format("DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id " + + "WHERE b.created_date < :created_date and b.active = false", + logTable, entryTable, entryTypeName + ); + } + @Override public Map getLocks(final String entryId) { diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index ac1f38fb5cce..c34ff93ebbcf 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -383,22 +383,25 @@ public void testGetLockId() throws EntryExistsException } @Test - public void testRemoveTasksBefore() throws Exception + public void testRemoveTasksOlderThan() throws Exception { final String entryId1 = "1234"; Map entry1 = ImmutableMap.of("numericId", 1234); Map status1 = ImmutableMap.of("count", 42, "temp", 1); handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1); + Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created"))); final String entryId2 = "ABC123"; Map entry2 = ImmutableMap.of("a", 1); Map status2 = ImmutableMap.of("count", 42); handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2); + Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created"))); final String entryId3 = "DEF5678"; Map entry3 = ImmutableMap.of("numericId", 5678); Map status3 = ImmutableMap.of("count", 21, "temp", 2); handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3); + Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created"))); Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1)); Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2)); @@ -419,7 +422,7 @@ public void testRemoveTasksBefore() throws Exception ); - handler.removeTasksBefore(DateTimes.of("2014-01-02")); + handler.removeTasksOlderThan(DateTimes.of("2014-01-02").getMillis()); // active task not removed. Assert.assertEquals( ImmutableList.of(entryId2), @@ -429,11 +432,14 @@ public void testRemoveTasksBefore() throws Exception ); Assert.assertEquals( ImmutableList.of(entryId3), - handler.getCompletedTaskInfo(DateTimes.of("2014-01-02"), null, null).stream() + handler.getCompletedTaskInfo(DateTimes.of("2014-01-01"), null, null).stream() .map(taskInfo -> taskInfo.getId()) .collect(Collectors.toList()) ); - + // tasklogs + Assert.assertEquals(0, handler.getLogs(entryId1).size()); + Assert.assertEquals(1, handler.getLogs(entryId2).size()); + Assert.assertEquals(1, handler.getLogs(entryId3).size()); } } From acbc4c898fb404913338b1d1cb56ca46f1a28da3 Mon Sep 17 00:00:00 2001 From: seoeun Date: Wed, 14 Nov 2018 14:08:13 +0900 Subject: [PATCH 3/6] address comments. change comment and update doc. --- .../druid/metadata/MetadataStorageActionHandler.java | 4 ++-- docs/content/configuration/index.md | 4 ++-- .../apache/druid/indexing/overlord/TaskStorage.java | 4 ++-- .../metadata/DerbyMetadataStorageActionHandler.java | 2 +- .../PostgreSQLMetadataStorageActionHandler.java | 2 +- .../metadata/SQLMetadataStorageActionHandler.java | 12 ++++++------ 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index 86ac8059ec39..d9e1e8c85e39 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -131,9 +131,9 @@ List> getCompletedTaskInfo( void removeLock(long lockId); /** - * Remove the tasks created order than the given createdTime. + * Remove the tasks with timestamp older than given timestamp. * - * @param timestamp timestamp to check the {@code created_date} of tasks + * @param timestamp timestamp in milliseconds */ void removeTasksOlderThan(long timestamp); diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index f30c555eb445..f27082562b49 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -525,12 +525,12 @@ If you are running the indexing service in remote mode, the task logs must be st |--------|-----------|-------| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file| -You can also configure the Overlord to automatically retain the task logs only for last x milliseconds by configuring following additional properties. +You can also configure the Overlord to automatically retain the task logs in log directory and metadata store only for last x milliseconds by configuring following additional properties. Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior. |Property|Description|Default| |--------|-----------|-------| -|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| +|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs in log directory and metadata store. |false| |`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None| |`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)| |`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)| diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 6b079b3a20da..a880085ce308 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -76,9 +76,9 @@ public interface TaskStorage void removeLock(String taskid, TaskLock taskLock); /** - * Remove the tasks created order than the given createdTime. + * Remove the tasks created older than the given timestamp. * - * @param timestamp timestamp to check the {@code created_date} of tasks + * @param timestamp timestamp in milliseconds */ void removeTasksOlderThan(long timestamp); diff --git a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java index bcfb10ee3b5d..bff95785f8f3 100644 --- a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -97,7 +97,7 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data public String getSqlRemoveLogsOlderThan() { return StringUtils.format("DELETE FROM %s WHERE %s_id in (" - + " SELECT id FROM %s WHERE created_date < :created_date and active = false)", + + " SELECT id FROM %s WHERE created_date < :date_time and active = false)", getLogTable(), getEntryTypeName(), getEntryTable()); } } diff --git a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java index 6156743219a5..e967fb8f09e3 100644 --- a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java @@ -95,7 +95,7 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data public String getSqlRemoveLogsOlderThan() { return StringUtils.format("DELETE FROM %s USING %s " - + "WHERE %s_id = %s.id AND created_date < :created_date and active = false", + + "WHERE %s_id = %s.id AND created_date < :date_time and active = false", getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable()); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 74c6032566d7..63e852390017 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -454,23 +454,23 @@ public void removeTasksOlderThan(final long timestamp) { connector.retryWithHandle( (HandleCallback) handle -> { - DateTime createdTime = DateTimes.utc(timestamp); - log.info("Deleting tasks before [%s] and inactive at metastore.", createdTime.toString()); + DateTime dateTime = DateTimes.utc(timestamp); + log.info("Deleting tasks older than [%s] and inactive at metastore.", dateTime.toString()); handle.createStatement( StringUtils.format( getSqlRemoveLogsOlderThan(), logTable, entryTable ) ) - .bind("created_date", createdTime.toString()) + .bind("date_time", dateTime.toString()) .execute(); handle.createStatement( StringUtils.format( - "DELETE FROM %s WHERE created_date < :created_date AND active = false", + "DELETE FROM %s WHERE created_date < :date_time AND active = false", entryTable ) ) - .bind("created_date", createdTime.toString()) + .bind("date_time", dateTime.toString()) .execute(); return null; @@ -551,7 +551,7 @@ public List withHandle(Handle handle) public String getSqlRemoveLogsOlderThan() { return StringUtils.format("DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id " - + "WHERE b.created_date < :created_date and b.active = false", + + "WHERE b.created_date < :date_time and b.active = false", logTable, entryTable, entryTypeName ); } From c31b4fed3a53cdb0856e6243f00d851cee572ba3 Mon Sep 17 00:00:00 2001 From: seoeun Date: Tue, 20 Nov 2018 15:32:48 +0900 Subject: [PATCH 4/6] address comments. update doc more detailed --- docs/content/configuration/index.md | 4 ++-- .../metadata/SQLMetadataStorageActionHandler.java | 11 +++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index f27082562b49..7657c5fcef14 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -525,12 +525,12 @@ If you are running the indexing service in remote mode, the task logs must be st |--------|-----------|-------| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file| -You can also configure the Overlord to automatically retain the task logs in log directory and metadata store only for last x milliseconds by configuring following additional properties. +You can also configure the Overlord to automatically retain the task logs in log directory and task-related metadata storage tables only for last x milliseconds by configuring following additional properties. Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior. |Property|Description|Default| |--------|-----------|-------| -|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs in log directory and metadata store. |false| +|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit tasks periodically based on `delay` specified. These kill tasks will delete task logs from log directory and tasks and tasklogs table in metadata storage except for the last `durationToRetain` period. |false| |`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None| |`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)| |`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)| diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 63e852390017..0488e1673501 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -452,16 +452,11 @@ public Void withHandle(Handle handle) @Override public void removeTasksOlderThan(final long timestamp) { + DateTime dateTime = DateTimes.utc(timestamp); + log.info("Deleting tasks older than [%s] and inactive at metastore.", dateTime.toString()); connector.retryWithHandle( (HandleCallback) handle -> { - DateTime dateTime = DateTimes.utc(timestamp); - log.info("Deleting tasks older than [%s] and inactive at metastore.", dateTime.toString()); - handle.createStatement( - StringUtils.format( - getSqlRemoveLogsOlderThan(), - logTable, entryTable - ) - ) + handle.createStatement(getSqlRemoveLogsOlderThan()) .bind("date_time", dateTime.toString()) .execute(); handle.createStatement( From f428cbaeac678851696803d4184f73d304a91b7b Mon Sep 17 00:00:00 2001 From: seoeun Date: Wed, 21 Nov 2018 10:47:59 +0900 Subject: [PATCH 5/6] address comments. remove redundant log and update doc more detailed. --- docs/content/configuration/index.md | 4 ++-- .../druid/metadata/SQLMetadataStorageActionHandler.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 7657c5fcef14..4a9368efbe92 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -530,8 +530,8 @@ Caution: Automatic log file deletion typically works based on log file modificat |Property|Description|Default| |--------|-----------|-------| -|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit tasks periodically based on `delay` specified. These kill tasks will delete task logs from log directory and tasks and tasklogs table in metadata storage except for the last `durationToRetain` period. |false| -|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs to be retained created in last x milliseconds. |None| +|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit kill tasks periodically based on `delay` specified, which will delete task logs from the log directory as well as tasks and tasklogs table in metadata storage except for tasks created in the last `durationToRetain` period. |false| +|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs and task-related metadata storage tables to be retained created in last x milliseconds. |None| |`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)| |`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)| diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 0488e1673501..da8bd61b9eef 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -453,7 +453,6 @@ public Void withHandle(Handle handle) public void removeTasksOlderThan(final long timestamp) { DateTime dateTime = DateTimes.utc(timestamp); - log.info("Deleting tasks older than [%s] and inactive at metastore.", dateTime.toString()); connector.retryWithHandle( (HandleCallback) handle -> { handle.createStatement(getSqlRemoveLogsOlderThan()) From b60e06d8004e79e58bb7e60313e812483119484c Mon Sep 17 00:00:00 2001 From: seoeun Date: Wed, 21 Nov 2018 18:02:48 +0900 Subject: [PATCH 6/6] address comments. update document --- .../apache/druid/metadata/MetadataStorageActionHandler.java | 2 +- docs/content/configuration/index.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index d9e1e8c85e39..d25829cd0bc0 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -131,7 +131,7 @@ List> getCompletedTaskInfo( void removeLock(long lockId); /** - * Remove the tasks with timestamp older than given timestamp. + * Remove the tasks created older than the given timestamp. * * @param timestamp timestamp in milliseconds */ diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 4a9368efbe92..eceab6344432 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -525,13 +525,13 @@ If you are running the indexing service in remote mode, the task logs must be st |--------|-----------|-------| |`druid.indexer.logs.type`|Choices:noop, s3, azure, google, hdfs, file. Where to store task logs|file| -You can also configure the Overlord to automatically retain the task logs in log directory and task-related metadata storage tables only for last x milliseconds by configuring following additional properties. +You can also configure the Overlord to automatically retain the task logs in log directory and entries in task-related metadata storage tables only for last x milliseconds by configuring following additional properties. Caution: Automatic log file deletion typically works based on log file modification timestamp on the backing store, so large clock skews between druid nodes and backing store nodes might result in un-intended behavior. |Property|Description|Default| |--------|-----------|-------| -|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit kill tasks periodically based on `delay` specified, which will delete task logs from the log directory as well as tasks and tasklogs table in metadata storage except for tasks created in the last `durationToRetain` period. |false| -|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs and task-related metadata storage tables to be retained created in last x milliseconds. |None| +|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. If set to true, overlord will submit kill tasks periodically based on `druid.indexer.logs.kill.delay` specified, which will delete task logs from the log directory as well as tasks and tasklogs table entries in metadata storage except for tasks created in the last `druid.indexer.logs.kill.durationToRetain` period. |false| +|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In milliseconds, task logs and entries in task-related metadata storage tables to be retained created in last x milliseconds. |None| |`druid.indexer.logs.kill.initialDelay`| Optional. Number of milliseconds after overlord start when first auto kill is run. |random value less than 300000 (5 mins)| |`druid.indexer.logs.kill.delay`|Optional. Number of milliseconds of delay between successive executions of auto kill run. |21600000 (6 hours)|