diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 0d76a7193d68..ccbcaf995361 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -756,6 +756,9 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.rule.on`| Boolean value for whether to enable automatic deletion of rules. If set to true, Coordinator will periodically remove rules of inactive datasource (datasource with no used and unused segments) from the rule table in metadata storage.| No | False| |`druid.coordinator.kill.rule.period`| How often to do automatic deletion of rules in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.rule.on` is set to "True".| No| `P1D`| |`druid.coordinator.kill.rule.durationToRetain`| Duration of rules to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.rule.on` is set to "True".| Yes if `druid.coordinator.kill.rule.on` is set to "True".| None| +|`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage. | No | False| +|`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to "True".| No| `P1D`| +|`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to "True".| Yes if `druid.coordinator.kill.datasource.on` is set to "True".| None| ##### Segment Management |Property|Possible Values|Description|Default| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index b68dcca58307..a4aa9abbe142 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -259,6 +259,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`metadata/kill/supervisor/count`|Total number of terminated supervisors that were automatically deleted from metadata store per each Coordinator kill supervisor duty run. This metric can help adjust `druid.coordinator.kill.supervisor.durationToRetain` configuration based on whether more or less terminated supervisors need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.supervisor.on` is set to true.| |Varies.| |`metadata/kill/audit/count`|Total number of audit logs that were automatically deleted from metadata store per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on whether more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.| |`metadata/kill/rule/count`|Total number of rules that were automatically deleted from metadata store per each Coordinator kill rule duty run. This metric can help adjust `druid.coordinator.kill.rule.durationToRetain` configuration based on whether more or less rules need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.rule.on` is set to true.| |Varies.| +|`metadata/kill/datasource/count`|Total number of datasource metadata that were automatically deleted from metadata store per each Coordinator kill datasource duty run (Note: datasource metadata only exists for datasource created from supervisor). This metric can help adjust `druid.coordinator.kill.datasource.durationToRetain` configuration based on whether more or less datasource metadata need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.datasource.on` is set to true.| |Varies.| If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index decaff87f875..fb274f348459 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -34,6 +34,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -139,6 +140,13 @@ public SegmentPublishResult commitMetadataOnly( throw new UnsupportedOperationException("Not implemented, no test uses this currently."); } + @Override + public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set excludeDatasources) + { + throw new UnsupportedOperationException("Not implemented, no test uses this currently."); + } + + @Override public SegmentIdWithShardSpec allocatePendingSegment( String dataSource, diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 3a1d2578c5a0..513f5b98c598 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -26,6 +26,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -267,6 +268,15 @@ SegmentPublishResult announceHistoricalSegments( */ boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata); + /** + * Remove datasource metadata created before the given timestamp and not in given excludeDatasources set. + * + * @param timestamp timestamp in milliseconds + * @param excludeDatasources set of datasource names to exclude from removal + * @return number of datasource metadata removed + */ + int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set excludeDatasources); + /** * Similar to {@link #announceHistoricalSegments(Set)}, but meant for streaming ingestion tasks for handling * the case where the task ingested no records and created no segments, but still needs to update the metadata diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 05fcb563dad4..46d6c0e069d3 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -57,6 +57,7 @@ import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionIds; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Batch; @@ -73,6 +74,7 @@ import org.skife.jdbi.v2.util.ByteArrayMapper; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.io.IOException; import java.sql.ResultSet; import java.util.ArrayList; @@ -1423,4 +1425,39 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me .execute() ); } + + @Override + public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set excludeDatasources) + { + DateTime dateTime = DateTimes.utc(timestamp); + List datasourcesToDelete = connector.getDBI().withHandle( + handle -> handle + .createQuery( + StringUtils.format( + "SELECT dataSource FROM %1$s WHERE created_date < '%2$s'", + dbTables.getDataSourceTable(), + dateTime.toString() + ) + ) + .mapTo(String.class) + .list() + ); + datasourcesToDelete.removeAll(excludeDatasources); + return connector.getDBI().withHandle( + handle -> { + final PreparedBatch batch = handle.prepareBatch( + StringUtils.format( + "DELETE FROM %1$s WHERE dataSource = :dataSource AND created_date < '%2$s'", + dbTables.getDataSourceTable(), + dateTime.toString() + ) + ); + for (String datasource : datasourcesToDelete) { + batch.bind("dataSource", datasource).add(); + } + int[] result = batch.execute(); + return IntStream.of(result).sum(); + } + ); + } } diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java index 591dee280a91..9eb254ff52e8 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSupervisorManager.java @@ -33,8 +33,27 @@ public interface MetadataSupervisorManager Map> getAll(); + /** + * Return latest supervisors (both active and terminated) + * + * @return latest terminated supervisors + */ Map getLatest(); + /** + * Only return the latest active supervisors + * + * @return latest active supervisors + */ + Map getLatestActiveOnly(); + + /** + * Only return the latest terminated supervisors + * + * @return latest terminated supervisors + */ + Map getLatestTerminatedOnly(); + /** * Remove terminated supervisors created before the given timestamp. * diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index e995a14aac16..0480c451f9f4 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -253,11 +253,41 @@ public Map fold( ); } + @Override + public Map getLatestActiveOnly() + { + Map supervisors = getLatest(); + Map activeSupervisors = new HashMap<>(); + for (Map.Entry entry : supervisors.entrySet()) { + // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // (NoopSupervisorSpec is used as a tombstone marker) + if (!(entry.getValue() instanceof NoopSupervisorSpec)) { + activeSupervisors.put(entry.getKey(), entry.getValue()); + } + } + return ImmutableMap.copyOf(activeSupervisors); + } + + @Override + public Map getLatestTerminatedOnly() + { + Map supervisors = getLatest(); + Map activeSupervisors = new HashMap<>(); + for (Map.Entry entry : supervisors.entrySet()) { + // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // (NoopSupervisorSpec is used as a tombstone marker) + if (entry.getValue() instanceof NoopSupervisorSpec) { + activeSupervisors.put(entry.getKey(), entry.getValue()); + } + } + return ImmutableMap.copyOf(activeSupervisors); + } + @Override public int removeTerminatedSupervisorsOlderThan(long timestamp) { DateTime dateTime = DateTimes.utc(timestamp); - Map supervisors = getLatest(); + Map terminatedSupervisors = getLatestTerminatedOnly(); return dbi.withHandle( handle -> { final PreparedBatch batch = handle.prepareBatch( @@ -267,13 +297,8 @@ public int removeTerminatedSupervisorsOlderThan(long timestamp) dateTime.toString() ) ); - for (Map.Entry supervisor : supervisors.entrySet()) { - final SupervisorSpec spec = supervisor.getValue(); - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec - // (NoopSupervisorSpec is used as a tombstone marker) - if (spec instanceof NoopSupervisorSpec) { - batch.bind("spec_id", supervisor.getKey()).add(); - } + for (Map.Entry supervisor : terminatedSupervisors.entrySet()) { + batch.bind("spec_id", supervisor.getKey()).add(); } int[] result = batch.execute(); return IntStream.of(result).sum(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 7985b5c48329..14da3c8afbed 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -79,6 +79,14 @@ public abstract class DruidCoordinatorConfig @Default("PT-1s") public abstract Duration getCoordinatorRuleKillDurationToRetain(); + @Config("druid.coordinator.kill.datasource.period") + @Default("P1D") + public abstract Duration getCoordinatorDatasourceKillPeriod(); + + @Config("druid.coordinator.kill.datasource.durationToRetain") + @Default("PT-1s") + public abstract Duration getCoordinatorDatasourceKillDurationToRetain(); + @Config("druid.coordinator.load.timeout") public Duration getLoadTimeoutDelay() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java index 651cf6b60c96..ac735c6594aa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java @@ -51,6 +51,7 @@ public KillAuditLog( ); this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be greater than current time in ms"); log.debug( "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -62,19 +63,24 @@ public KillAuditLog( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - long timestamp = System.currentTimeMillis() - retainDuration; - int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/audit/count", - auditRemoved - ) - ); - log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/audit/count", + auditRemoved + ) + ); + log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved); + } + catch (Exception e) { + log.error(e, "Failed to kill audit log"); + } } return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java new file mode 100644 index 000000000000..9c9535b2502a --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * CoordinatorDuty for automatic deletion of datasource metadata from the datasource table in metadata storage. + * (Note: datasource metadata only exists for datasource created from supervisor). + * Note that this class relies on the supervisorSpec.getDataSources names to match with the + * 'datasource' column of the datasource metadata table. + */ +public class KillDatasourceMetadata implements CoordinatorDuty +{ + private static final Logger log = new Logger(KillDatasourceMetadata.class); + + private final long period; + private final long retainDuration; + private long lastKillTime = 0; + + private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + private final MetadataSupervisorManager metadataSupervisorManager; + + @Inject + public KillDatasourceMetadata( + DruidCoordinatorConfig config, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + MetadataSupervisorManager metadataSupervisorManager + ) + { + this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; + this.metadataSupervisorManager = metadataSupervisorManager; + this.period = config.getCoordinatorDatasourceKillPeriod().getMillis(); + Preconditions.checkArgument( + this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), + "Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" + ); + this.retainDuration = config.getCoordinatorDatasourceKillDurationToRetain().getMillis(); + Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator datasource metadata kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator datasource metadata kill retainDuration cannot be greater than current time in ms"); + log.debug( + "Datasource Metadata Kill Task scheduling enabled with period [%s], retainDuration [%s]", + this.period, + this.retainDuration + ); + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + // Datasource metadata only exists for datasource with supervisor + // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource + // is still active or not + Map allActiveSupervisor = metadataSupervisorManager.getLatestActiveOnly(); + Set allDatasourceWithActiveSupervisor = allActiveSupervisor.values() + .stream() + .map(supervisorSpec -> supervisorSpec.getDataSources()) + .flatMap(Collection::stream) + .filter(datasource -> !Strings.isNullOrEmpty(datasource)) + .collect(Collectors.toSet()); + // We exclude removing datasource metadata with active supervisor + int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan( + timestamp, + allDatasourceWithActiveSupervisor + ); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/datasource/count", + datasourceMetadataRemovedCount + ) + ); + log.info( + "Finished running KillDatasourceMetadata duty. Removed %,d datasource metadata", + datasourceMetadataRemovedCount + ); + } + catch (Exception e) { + log.error(e, "Failed to kill datasource metadata"); + } + } + return params; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java index eb4f0186cdd4..50b1740e8643 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java @@ -47,6 +47,7 @@ public KillRules( ); this.retainDuration = config.getCoordinatorRuleKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "coordinator rule kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator rule kill retainDuration cannot be greater than current time in ms"); log.debug( "Rule Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -57,19 +58,24 @@ public KillRules( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - long timestamp = System.currentTimeMillis() - retainDuration; - int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/rule/count", - ruleRemoved - ) - ); - log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/rule/count", + ruleRemoved + ) + ); + log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved); + } + catch (Exception e) { + log.error(e, "Failed to kill rules metadata"); + } } return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java index a1c2a3f28326..8a0b484f3eaa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java @@ -55,6 +55,7 @@ public KillSupervisors( ); this.retainDuration = config.getCoordinatorSupervisorKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator supervisor kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator supervisor kill retainDuration cannot be greater than current time in ms"); log.debug( "Supervisor Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -65,19 +66,24 @@ public KillSupervisors( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - long timestamp = System.currentTimeMillis() - retainDuration; - int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/supervisor/count", - supervisorRemoved - ) - ); - log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/supervisor/count", + supervisorRemoved + ) + ); + log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved); + } + catch (Exception e) { + log.error(e, "Failed to kill terminated supervisor metadata"); + } } return params; } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 924ad92bdcd4..7acd90f0f3f4 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1471,4 +1471,82 @@ public void testDropSegmentsWithHandleForSegmentThatDoesNotExist() Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result); } } + + @Test + public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDeleted() throws Exception + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + ImmutableSet.of(), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + + // Try delete. Datasource should not be deleted as it is in excluded set + int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of("fooDataSource")); + + // Datasource should not be deleted + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + Assert.assertEquals(0, deletedCount); + } + + @Test + public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThanTimeShouldBeDeleted() throws Exception + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + ImmutableSet.of(), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + + // Try delete. Datasource should be deleted as it is not in excluded set and created time older than given time + int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of()); + + // Datasource should be deleted + Assert.assertNull( + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + Assert.assertEquals(1, deletedCount); + } + + @Test + public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() throws Exception + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + ImmutableSet.of(), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + + // Do delete. Datasource metadata should not be deleted. Datasource is not active but it was created just now so it's + // created timestamp will be later than the timestamp 2012-01-01T00:00:00Z + int deletedCount = coordinator.removeDataSourceMetadataOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis(), ImmutableSet.of()); + + // Datasource should not be deleted + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + Assert.assertEquals(0, deletedCount); + } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 5e4f4833b091..9547387bbbf0 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -243,6 +243,46 @@ public void testSkipDeserializingBadSpecs() Assert.assertNull(specs.get(0).getSpec()); } + @Test + public void testGetLatestActiveOnly() + { + final String supervisor1 = "test-supervisor-1"; + final String datasource1 = "datasource-1"; + final String supervisor2 = "test-supervisor-2"; + final Map data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1"); + Assert.assertTrue(supervisorManager.getAll().isEmpty()); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1)); + // supervisor1 is terminated + supervisorManager.insert(supervisor1, new NoopSupervisorSpec(supervisor1, ImmutableList.of(datasource1))); + // supervisor2 is still active + supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data1rev1)); + // get latest active should only return supervisor2 + Map actual = supervisorManager.getLatestActiveOnly(); + Assert.assertEquals(1, actual.size()); + Assert.assertTrue(actual.containsKey(supervisor2)); + } + + + @Test + public void testGetLatestTerminatedOnly() + { + final String supervisor1 = "test-supervisor-1"; + final String datasource1 = "datasource-1"; + final String supervisor2 = "test-supervisor-2"; + final Map data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1"); + Assert.assertTrue(supervisorManager.getAll().isEmpty()); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1)); + // supervisor1 is terminated + supervisorManager.insert(supervisor1, new NoopSupervisorSpec(supervisor1, ImmutableList.of(datasource1))); + // supervisor2 is still active + supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data1rev1)); + // get latest terminated should only return supervisor1 + Map actual = supervisorManager.getLatestTerminatedOnly(); + Assert.assertEquals(1, actual.size()); + Assert.assertTrue(actual.containsKey(supervisor1)); + } + + private static class BadSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index b8e3103681ef..adad57e7a8ea 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -178,6 +178,8 @@ public void setUp() throws Exception null, null, null, + null, + null, 10, new Duration("PT0s") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 97c220d3626c..2d88d6ee4f13 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -150,6 +150,8 @@ public void setUp() throws Exception null, null, null, + null, + null, 10, new Duration("PT0s") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index 2a8550003998..4991f1373f2f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -87,6 +87,8 @@ public class HttpLoadQueuePeonTest null, null, null, + null, + null, 10, Duration.ZERO ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java index 8b2f486d1ace..ea4322a02f8a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java @@ -102,6 +102,8 @@ public void testMultipleLoadDropSegments() throws Exception null, null, null, + null, + null, 10, Duration.millis(0) ) @@ -304,6 +306,8 @@ public void testFailAssignForNonTimeoutFailures() throws Exception null, null, null, + null, + null, 10, new Duration("PT1s") ) @@ -363,6 +367,8 @@ public void testFailAssignForLoadDropTimeout() throws Exception null, null, null, + null, + null, 10, new Duration("PT1s") ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java index 57ac65b88b8c..326bc764113c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java @@ -51,6 +51,8 @@ public LoadQueuePeonTester() null, null, null, + null, + null, 10, new Duration("PT1s") ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 2c1b7067cc4a..e2c05703dc6e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -37,6 +37,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorAuditKillDurationToRetain; private final Duration coordinatorRuleKillPeriod; private final Duration coordinatorRuleKillDurationToRetain; + private final Duration coordinatorDatasourceKillPeriod; + private final Duration coordinatorDatasourceKillDurationToRetain; private final Duration getLoadQueuePeonRepeatDelay; private final int coordinatorKillMaxSegments; @@ -54,6 +56,8 @@ public TestDruidCoordinatorConfig( Duration coordinatorAuditKillDurationToRetain, Duration coordinatorRuleKillPeriod, Duration coordinatorRuleKillDurationToRetain, + Duration coordinatorDatasourceKillPeriod, + Duration coordinatorDatasourceKillDurationToRetain, int coordinatorKillMaxSegments, Duration getLoadQueuePeonRepeatDelay ) @@ -71,6 +75,8 @@ public TestDruidCoordinatorConfig( this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain; this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod; this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain; + this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod; + this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; } @@ -147,6 +153,18 @@ public Duration getCoordinatorRuleKillDurationToRetain() return coordinatorRuleKillDurationToRetain; } + @Override + public Duration getCoordinatorDatasourceKillPeriod() + { + return coordinatorDatasourceKillPeriod; + } + + @Override + public Duration getCoordinatorDatasourceKillDurationToRetain() + { + return coordinatorDatasourceKillDurationToRetain; + } + @Override public int getCoordinatorKillMaxSegments() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java index aeb0bd9cbc5d..4b883f38e783 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java @@ -68,6 +68,8 @@ public void testRunSkipIfLastRunLessThanPeriod() new Duration("PT1S"), null, null, + null, + null, 10, null ); @@ -94,6 +96,8 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() new Duration("PT1S"), null, null, + null, + null, 10, null ); @@ -120,6 +124,8 @@ public void testConstructorFailIfInvalidPeriod() new Duration("PT1S"), null, null, + null, + null, 10, null ); @@ -145,6 +151,8 @@ public void testConstructorFailIfInvalidRetainDuration() new Duration("PT-1S"), null, null, + null, + null, 10, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java new file mode 100644 index 000000000000..ff29136cc2e8 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.TestSupervisorSpec; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KillDatasourceMetadataTest +{ + @Mock + private IndexerMetadataStorageCoordinator mockIndexerMetadataStorageCoordinator; + + @Mock + private MetadataSupervisorManager mockMetadataSupervisorManager; + + @Mock + private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; + + @Mock + private TestSupervisorSpec mockKinesisSupervisorSpec; + + @Mock + private ServiceEmitter mockServiceEmitter; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private KillDatasourceMetadata killDatasourceMetadata; + + @Test + public void testRunSkipIfLastRunLessThanPeriod() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration(Long.MAX_VALUE), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verifyZeroInteractions(mockIndexerMetadataStorageCoordinator); + Mockito.verifyZeroInteractions(mockMetadataSupervisorManager); + } + + @Test + public void testRunNotSkipIfLastRunMoreThanPeriod() + { + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.anySet()); + Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } + + @Test + public void testConstructorFailIfInvalidPeriod() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT3S"), + new Duration("PT1S"), + 10, + null + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + } + + @Test + public void testConstructorFailIfInvalidRetainDuration() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT-1S"), + 10, + null + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0"); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + } + + @Test + public void testRunWithEmptyFilterExcludedDatasource() + { + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of())); + Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java index d3268f21150f..8d63e9136554 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java @@ -75,6 +75,8 @@ public void testRunSkipIfLastRunLessThanPeriod() null, new Duration(Long.MAX_VALUE), new Duration("PT1S"), + null, + null, 10, null ); @@ -101,6 +103,8 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() null, new Duration("PT6S"), new Duration("PT1S"), + null, + null, 10, null ); @@ -127,6 +131,8 @@ public void testConstructorFailIfInvalidPeriod() null, new Duration("PT3S"), new Duration("PT1S"), + null, + null, 10, null ); @@ -152,6 +158,8 @@ public void testConstructorFailIfInvalidRetainDuration() null, new Duration("PT6S"), new Duration("PT-1S"), + null, + null, 10, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java index 9aeb992f9b14..9ab3fb6b3084 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java @@ -68,6 +68,8 @@ public void testRunSkipIfLastRunLessThanPeriod() null, null, null, + null, + null, 10, null ); @@ -94,6 +96,8 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() null, null, null, + null, + null, 10, null ); @@ -120,6 +124,8 @@ public void testConstructorFailIfInvalidPeriod() null, null, null, + null, + null, 10, null ); @@ -145,6 +151,8 @@ public void testConstructorFailIfInvalidRetainDuration() null, null, null, + null, + null, 10, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 34d020f5cd72..8a6eef3b62f4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -115,6 +115,8 @@ private void testFindIntervalForKill(List segmentIntervals, Interval e null, null, null, + null, + null, 1000, Duration.ZERO ) diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 584023c4d6f9..c864592b3ee9 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -73,6 +73,7 @@ import org.apache.druid.server.coordinator.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; import org.apache.druid.server.coordinator.duty.KillAuditLog; +import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata; import org.apache.druid.server.coordinator.duty.KillRules; import org.apache.druid.server.coordinator.duty.KillSupervisors; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; @@ -254,7 +255,7 @@ public void configure(Binder binder) CoordinatorMetadataStoreManagementDuty.class ); conditionalMetadataStoreManagementDutyMultibind.addConditionBinding( - "druid.coordinator.kill.rule.on", + "druid.coordinator.kill.supervisor.on", Predicates.equalTo("true"), KillSupervisors.class ); @@ -268,6 +269,11 @@ public void configure(Binder binder) Predicates.equalTo("true"), KillRules.class ); + conditionalMetadataStoreManagementDutyMultibind.addConditionBinding( + "druid.coordinator.kill.datasource.on", + Predicates.equalTo("true"), + KillDatasourceMetadata.class + ); bindNodeRoleAndAnnouncer( binder,