From 48937f61313f4354f49b518796fb6b4e5d53792d Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Thu, 20 Jan 2022 13:52:04 -0800 Subject: [PATCH 1/8] Enable auto-kill by default --- docs/configuration/index.md | 8 +++---- .../coordinator/CoordinatorDynamicConfig.java | 24 ++++++++++--------- .../coordinator/DruidCoordinatorConfig.java | 4 ++-- .../org/apache/druid/cli/CliCoordinator.java | 1 + 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 336f950df56f..b976f8f4d121 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -796,10 +796,10 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S| |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| -|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|false| +|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|true| |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| -|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)| -|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| +|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|P90D| +|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100| |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the coordinator to use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`| |`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait for segment view initialization before creating the `cachingCost` balancing strategy. This property is enabled only when `druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the Coordinator will not start to assign segments, until the segment view is initialized. If set to 'false', the Coordinator will fallback to use the `cost` balancing strategy only if the segment view is not initialized yet. Notes, it may take much time to wait for the initialization since the `cachingCost` balancing strategy involves much computing to build itself.|false| |`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon, which manages the load and drop of segments.|PT0.050S (50 ms)| @@ -902,7 +902,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| |`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false| |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none| -|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false| +|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|true if `killDataSourceWhitelist` is not set or empty| |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none| |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100| |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 9a37e2b98b37..14073ff6dd64 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -65,7 +65,8 @@ public class CoordinatorDynamicConfig /** * If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. */ - private final boolean killUnusedSegmentsInAllDataSources; + @Nullable + private final Boolean killUnusedSegmentsInAllDataSources; /** * List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}. @@ -131,7 +132,7 @@ public CoordinatorDynamicConfig( @JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn, // Keeping the legacy 'killAllDataSources' property name for backward compatibility. When the project is // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 - @JsonProperty("killAllDataSources") boolean killUnusedSegmentsInAllDataSources, + @JsonProperty("killAllDataSources") Boolean killUnusedSegmentsInAllDataSources, // Type is Object here so that we can support both string and list as Coordinator console can not send array of // strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn. // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is @@ -172,8 +173,8 @@ public CoordinatorDynamicConfig( this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); this.emitBalancingStats = emitBalancingStats; - this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn); + this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; this.dataSourcesToNotKillStalePendingSegmentsIn = parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn); this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null @@ -186,7 +187,9 @@ public CoordinatorDynamicConfig( ); this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; - if (this.killUnusedSegmentsInAllDataSources && !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) { + if (this.killUnusedSegmentsInAllDataSources != null && + this.killUnusedSegmentsInAllDataSources && + !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) { throw new IAE( "can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn" ); @@ -315,7 +318,9 @@ public Set getSpecificDataSourcesToKillUnusedSegmentsIn() @JsonProperty("killAllDataSources") public boolean isKillUnusedSegmentsInAllDataSources() { - return killUnusedSegmentsInAllDataSources; + return killUnusedSegmentsInAllDataSources != null + ? killUnusedSegmentsInAllDataSources + : specificDataSourcesToKillUnusedSegmentsIn.isEmpty(); } @JsonProperty("killPendingSegmentsSkipList") @@ -453,7 +458,7 @@ public boolean equals(Object o) if (emitBalancingStats != that.emitBalancingStats) { return false; } - if (killUnusedSegmentsInAllDataSources != that.killUnusedSegmentsInAllDataSources) { + if (!Objects.equals(killUnusedSegmentsInAllDataSources, that.killUnusedSegmentsInAllDataSources)) { return false; } if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) { @@ -523,7 +528,6 @@ public static class Builder private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; private static final boolean DEFAULT_EMIT_BALANCING_STATS = false; private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false; - private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false; private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; private static final boolean DEFAULT_PAUSE_COORDINATION = false; @@ -727,9 +731,7 @@ public CoordinatorDynamicConfig build() balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats, specificDataSourcesToKillUnusedSegmentsIn, - killUnusedSegmentsInAllDataSources == null - ? DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES - : killUnusedSegmentsInAllDataSources, + killUnusedSegmentsInAllDataSources, dataSourcesToNotKillStalePendingSegmentsIn, maxSegmentsInNodeLoadingQueue == null ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE @@ -766,7 +768,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn() : specificDataSourcesToKillUnusedSegmentsIn, killUnusedSegmentsInAllDataSources == null - ? defaults.isKillUnusedSegmentsInAllDataSources() + ? defaults.killUnusedSegmentsInAllDataSources : killUnusedSegmentsInAllDataSources, dataSourcesToNotKillStalePendingSegmentsIn == null ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 14e3ce08ffc7..41139a714d9f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -48,11 +48,11 @@ public abstract class DruidCoordinatorConfig public abstract Duration getCoordinatorKillPeriod(); @Config("druid.coordinator.kill.durationToRetain") - @Default("PT-1s") + @Default("P90D") public abstract Duration getCoordinatorKillDurationToRetain(); @Config("druid.coordinator.kill.maxSegments") - @Default("0") + @Default("100") public abstract int getCoordinatorKillMaxSegments(); @Config("druid.coordinator.kill.supervisor.period") diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 26cdade8eb07..48ae70866b71 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -264,6 +264,7 @@ public void configure(Binder binder) } conditionalIndexingServiceDutyMultibind.addConditionBinding( "druid.coordinator.kill.on", + "true", Predicates.equalTo("true"), KillUnusedSegments.class ); From 031822a52f08e3a881fdb00065f0c7bd420d7f39 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 21 Jan 2022 08:37:51 -0800 Subject: [PATCH 2/8] tests --- .../http/CoordinatorDynamicConfigTest.java | 162 +++++++++++++++++- 1 file changed, 161 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index b96591a16646..206f6745b4cc 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -225,6 +225,106 @@ public void testSerde() throws Exception 10 ); + // If the default specifies killAllDataSources as false, the builder should build a config with killAllDatasources as false + // even if there are no specific datasources to kill specified. + actual = CoordinatorDynamicConfig.builder().withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of()).build(actual); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 10, + 1, + 1, + 2, + true, + ImmutableSet.of(), + false, + 1, + ImmutableSet.of("host1"), + 5, + true, + true, + 10 + ); + } + + @Test + public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() + { + CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, + 1, + 1, + 1, + null, + false, + 1, + 2, + 10, + true, + null, + null, + null, + null, + ImmutableSet.of("host1"), + 5, + true, + true, + 10); + Assert.assertTrue(config.isKillUnusedSegmentsInAllDataSources()); + Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); + } + + @Test + public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegmentsInAllDatasources() + { + CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, + 1, + 1, + 1, + null, + false, + 1, + 2, + 10, + true, + ImmutableSet.of("test1"), + null, + null, + null, + ImmutableSet.of("host1"), + 5, + true, + true, + 10); + Assert.assertFalse(config.isKillUnusedSegmentsInAllDataSources()); + Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorWithSpecificDataSourcesToKillAndKillAllDatasourcesShouldThrowException() + { + CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, + 1, + 1, + 1, + null, + false, + 1, + 2, + 10, + true, + ImmutableSet.of("test1"), + true, + null, + null, + ImmutableSet.of("host1"), + 5, + true, + true, + 10); + Assert.fail(); } @Test @@ -618,7 +718,7 @@ public void testBuilderDefaults() 1, false, emptyList, - false, + true, EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, emptyList, 70, @@ -628,6 +728,66 @@ public void testBuilderDefaults() ); } + @Test + public void testBuilderWithDefaultKillUnusedSegmentsInAllDatasourcesSpecified() + { + CoordinatorDynamicConfig defaultConfig = + CoordinatorDynamicConfig.builder() + .withKillUnusedSegmentsInAllDataSources(false) + .build(); + CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig); + assertConfig( + config, + 900000, + 524288000, + 100, + 5, + 100, + 15, + 10, + 1, + false, + ImmutableSet.of(), + false, + EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, + ImmutableSet.of(), + 70, + false, + false, + Integer.MAX_VALUE + ); + } + + @Test + public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpecified() + { + CoordinatorDynamicConfig defaultConfig = + CoordinatorDynamicConfig.builder() + .withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of("DATASOURCE")) + .build(); + CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig); + assertConfig( + config, + 900000, + 524288000, + 100, + 5, + 100, + 15, + 10, + 1, + false, + ImmutableSet.of("DATASOURCE"), + false, + EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, + ImmutableSet.of(), + 70, + false, + false, + Integer.MAX_VALUE + ); + } + @Test public void testUpdate() { From cc40ac37ce6531aa12060e3e1b07c2fc53398a81 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 25 Jan 2022 12:36:23 -0800 Subject: [PATCH 3/8] wip --- .../coordinator/CoordinatorDynamicConfig.java | 43 ++----------------- .../coordinator/duty/KillUnusedSegments.java | 14 ++---- .../server/coordinator/RunRulesTest.java | 1 - .../http/CoordinatorDynamicConfigTest.java | 5 --- 4 files changed, 6 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 14073ff6dd64..90574780eabe 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -20,11 +20,11 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.JacksonConfigManager; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; @@ -62,12 +62,6 @@ public class CoordinatorDynamicConfig private final int balancerComputeThreads; private final boolean emitBalancingStats; - /** - * If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. - */ - @Nullable - private final Boolean killUnusedSegmentsInAllDataSources; - /** * List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}. */ @@ -130,9 +124,6 @@ public CoordinatorDynamicConfig( // Keeping the legacy 'killDataSourceWhitelist' property name for backward compatibility. When the project is // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 @JsonProperty("killDataSourceWhitelist") Object specificDataSourcesToKillUnusedSegmentsIn, - // Keeping the legacy 'killAllDataSources' property name for backward compatibility. When the project is - // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152 - @JsonProperty("killAllDataSources") Boolean killUnusedSegmentsInAllDataSources, // Type is Object here so that we can support both string and list as Coordinator console can not send array of // strings in the update request, as well as for specificDataSourcesToKillUnusedSegmentsIn. // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is @@ -174,7 +165,6 @@ public CoordinatorDynamicConfig( this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); this.emitBalancingStats = emitBalancingStats; this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn); - this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; this.dataSourcesToNotKillStalePendingSegmentsIn = parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn); this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null @@ -187,13 +177,6 @@ public CoordinatorDynamicConfig( ); this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove; - if (this.killUnusedSegmentsInAllDataSources != null && - this.killUnusedSegmentsInAllDataSources && - !this.specificDataSourcesToKillUnusedSegmentsIn.isEmpty()) { - throw new IAE( - "can't have killUnusedSegmentsInAllDataSources and non-empty specificDataSourcesToKillUnusedSegmentsIn" - ); - } this.pauseCoordination = pauseCoordination; this.replicateAfterLoadTimeout = replicateAfterLoadTimeout; @@ -315,12 +298,10 @@ public Set getSpecificDataSourcesToKillUnusedSegmentsIn() return specificDataSourcesToKillUnusedSegmentsIn; } - @JsonProperty("killAllDataSources") + @JsonIgnore public boolean isKillUnusedSegmentsInAllDataSources() { - return killUnusedSegmentsInAllDataSources != null - ? killUnusedSegmentsInAllDataSources - : specificDataSourcesToKillUnusedSegmentsIn.isEmpty(); + return specificDataSourcesToKillUnusedSegmentsIn.isEmpty(); } @JsonProperty("killPendingSegmentsSkipList") @@ -403,7 +384,6 @@ public String toString() ", replicationThrottleLimit=" + replicationThrottleLimit + ", balancerComputeThreads=" + balancerComputeThreads + ", emitBalancingStats=" + emitBalancingStats + - ", killUnusedSegmentsInAllDataSources=" + killUnusedSegmentsInAllDataSources + ", specificDataSourcesToKillUnusedSegmentsIn=" + specificDataSourcesToKillUnusedSegmentsIn + ", dataSourcesToNotKillStalePendingSegmentsIn=" + dataSourcesToNotKillStalePendingSegmentsIn + ", maxSegmentsInNodeLoadingQueue=" + maxSegmentsInNodeLoadingQueue + @@ -458,9 +438,6 @@ public boolean equals(Object o) if (emitBalancingStats != that.emitBalancingStats) { return false; } - if (!Objects.equals(killUnusedSegmentsInAllDataSources, that.killUnusedSegmentsInAllDataSources)) { - return false; - } if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) { return false; } @@ -499,7 +476,6 @@ public int hashCode() replicationThrottleLimit, balancerComputeThreads, emitBalancingStats, - killUnusedSegmentsInAllDataSources, maxSegmentsInNodeLoadingQueue, specificDataSourcesToKillUnusedSegmentsIn, dataSourcesToNotKillStalePendingSegmentsIn, @@ -545,7 +521,6 @@ public static class Builder private Boolean emitBalancingStats; private Integer balancerComputeThreads; private Object specificDataSourcesToKillUnusedSegmentsIn; - private Boolean killUnusedSegmentsInAllDataSources; private Object dataSourcesToNotKillStalePendingSegmentsIn; private Integer maxSegmentsInNodeLoadingQueue; private Object decommissioningNodes; @@ -572,7 +547,6 @@ public Builder( @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @JsonProperty("emitBalancingStats") @Nullable Boolean emitBalancingStats, @JsonProperty("killDataSourceWhitelist") @Nullable Object specificDataSourcesToKillUnusedSegmentsIn, - @JsonProperty("killAllDataSources") @Nullable Boolean killUnusedSegmentsInAllDataSources, @JsonProperty("killPendingSegmentsSkipList") @Nullable Object dataSourcesToNotKillStalePendingSegmentsIn, @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue, @JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes, @@ -595,7 +569,6 @@ public Builder( this.balancerComputeThreads = balancerComputeThreads; this.emitBalancingStats = emitBalancingStats; this.specificDataSourcesToKillUnusedSegmentsIn = specificDataSourcesToKillUnusedSegmentsIn; - this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; this.dataSourcesToNotKillStalePendingSegmentsIn = dataSourcesToNotKillStalePendingSegmentsIn; this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; this.decommissioningNodes = decommissioningNodes; @@ -672,12 +645,6 @@ public Builder withSpecificDataSourcesToKillUnusedSegmentsIn(Set dataSou return this; } - public Builder withKillUnusedSegmentsInAllDataSources(boolean killUnusedSegmentsInAllDataSources) - { - this.killUnusedSegmentsInAllDataSources = killUnusedSegmentsInAllDataSources; - return this; - } - public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQueue) { this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue; @@ -731,7 +698,6 @@ public CoordinatorDynamicConfig build() balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : emitBalancingStats, specificDataSourcesToKillUnusedSegmentsIn, - killUnusedSegmentsInAllDataSources, dataSourcesToNotKillStalePendingSegmentsIn, maxSegmentsInNodeLoadingQueue == null ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE @@ -767,9 +733,6 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) specificDataSourcesToKillUnusedSegmentsIn == null ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn() : specificDataSourcesToKillUnusedSegmentsIn, - killUnusedSegmentsInAllDataSources == null - ? defaults.killUnusedSegmentsInAllDataSources - : killUnusedSegmentsInAllDataSources, dataSourcesToNotKillStalePendingSegmentsIn == null ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn() : dataSourcesToNotKillStalePendingSegmentsIn, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 9c94cf4b2d1d..72eb1242d8b8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -29,6 +29,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -86,19 +87,10 @@ public KillUnusedSegments( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - boolean killAllDataSources = params.getCoordinatorDynamicConfig().isKillUnusedSegmentsInAllDataSources(); - Collection specificDataSourcesToKill = + Collection dataSourcesToKill = params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn(); - if (killAllDataSources && specificDataSourcesToKill != null && !specificDataSourcesToKill.isEmpty()) { - log.error( - "killAllDataSources can't be true when specificDataSourcesToKill is non-empty. No kill tasks are scheduled." - ); - return params; - } - - Collection dataSourcesToKill = specificDataSourcesToKill; - if (killAllDataSources) { + if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) { dataSourcesToKill = segmentsMetadataManager.retrieveAllDataSourceNames(); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index d43b603e9982..dc4a6f0abe43 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -1487,7 +1487,6 @@ private CoordinatorDynamicConfig createCoordinatorDynamicConfig() .withBalancerComputeThreads(0) .withEmitBalancingStats(false) .withSpecificDataSourcesToKillUnusedSegmentsIn(null) - .withKillUnusedSegmentsInAllDataSources(false) .withMaxSegmentsInNodeLoadingQueue(1000) .withPauseCoordination(false) .build(); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 206f6745b4cc..aef02e5170af 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -266,7 +266,6 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() null, null, null, - null, ImmutableSet.of("host1"), 5, true, @@ -292,7 +291,6 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme ImmutableSet.of("test1"), null, null, - null, ImmutableSet.of("host1"), 5, true, @@ -316,7 +314,6 @@ public void testConstructorWithSpecificDataSourcesToKillAndKillAllDatasourcesSho 10, true, ImmutableSet.of("test1"), - true, null, null, ImmutableSet.of("host1"), @@ -733,7 +730,6 @@ public void testBuilderWithDefaultKillUnusedSegmentsInAllDatasourcesSpecified() { CoordinatorDynamicConfig defaultConfig = CoordinatorDynamicConfig.builder() - .withKillUnusedSegmentsInAllDataSources(false) .build(); CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig); assertConfig( @@ -816,7 +812,6 @@ public void testUpdate() null, null, null, - null, null ).build(current) ); From d25e5fda9bc23a53ddf226a5347795c51f71dc97 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 26 Jan 2022 09:38:14 -0800 Subject: [PATCH 4/8] test --- .../druid/server/coordinator/DruidCoordinatorConfigTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index 084e6320dc6b..9978c33736bd 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -43,8 +43,8 @@ public void testDeserialization() Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod()); Assert.assertEquals(86400000, config.getCoordinatorKillPeriod().getMillis()); - Assert.assertEquals(-1000, config.getCoordinatorKillDurationToRetain().getMillis()); - Assert.assertEquals(0, config.getCoordinatorKillMaxSegments()); + Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis()); + Assert.assertEquals(100, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay()); Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay()); Assert.assertTrue(config.getCompactionSkipLockedIntervals()); From ae653cd6099b8c410072fa5593831da98f0a9a9f Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 2 Feb 2022 11:23:39 -0800 Subject: [PATCH 5/8] fix IT --- integration-tests/docker/environment-configs/coordinator | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/docker/environment-configs/coordinator b/integration-tests/docker/environment-configs/coordinator index 0acc457cd0ce..84b563c3a444 100644 --- a/integration-tests/docker/environment-configs/coordinator +++ b/integration-tests/docker/environment-configs/coordinator @@ -40,4 +40,6 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/coordinator druid_auth_unsecuredPaths=["/druid/coordinator/v1/loadqueue"] druid_server_https_crlPath=/tls/revocations.crl druid_coordinator_period_indexingPeriod=PT180000S +# 2x indexing period so that kill period is valid +druid_coordinator_kill_period=PT360000S druid_coordinator_period=PT1S From 806dfe6bbe7daf0ac51dac356def9e738ee1fccd Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 4 Feb 2022 16:22:04 -0800 Subject: [PATCH 6/8] fix it --- docs/configuration/index.md | 2 +- .../http/CoordinatorDynamicConfigTest.java | 108 +++--------------- 2 files changed, 15 insertions(+), 95 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 4890c7fb2097..97564bef1f49 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -798,7 +798,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|true| |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| -|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|P90D| +|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|`P90D`| |`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100| |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the coordinator to use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`| |`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait for segment view initialization before creating the `cachingCost` balancing strategy. This property is enabled only when `druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the Coordinator will not start to assign segments, until the segment view is initialized. If set to 'false', the Coordinator will fallback to use the `cost` balancing strategy only if the segment view is not initialized yet. Notes, it may take much time to wait for the initialization since the `cachingCost` balancing strategy involves much computing to build itself.|false| diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index aef02e5170af..73f7cd29e4fd 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.junit.Assert; @@ -224,30 +223,6 @@ public void testSerde() throws Exception true, 10 ); - - // If the default specifies killAllDataSources as false, the builder should build a config with killAllDatasources as false - // even if there are no specific datasources to kill specified. - actual = CoordinatorDynamicConfig.builder().withSpecificDataSourcesToKillUnusedSegmentsIn(ImmutableSet.of()).build(actual); - assertConfig( - actual, - 1, - 1, - 1, - 1, - 10, - 1, - 1, - 2, - true, - ImmutableSet.of(), - false, - 1, - ImmutableSet.of("host1"), - 5, - true, - true, - 10 - ); } @Test @@ -300,30 +275,6 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); } - @Test(expected = IllegalArgumentException.class) - public void testConstructorWithSpecificDataSourcesToKillAndKillAllDatasourcesShouldThrowException() - { - CoordinatorDynamicConfig config = new CoordinatorDynamicConfig(1, - 1, - 1, - 1, - null, - false, - 1, - 2, - 10, - true, - ImmutableSet.of("test1"), - null, - null, - ImmutableSet.of("host1"), - 5, - true, - true, - 10); - Assert.fail(); - } - @Test public void testDecommissioningParametersBackwardCompatibility() throws Exception { @@ -631,23 +582,21 @@ public void testSerdeWithKillAllDataSources() throws Exception Integer.MAX_VALUE ); - //ensure whitelist is empty when killAllDataSources is true - try { - jsonStr = "{\n" - + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"killAllDataSources\": true,\n" - + " \"percentOfSegmentsToConsiderPerMove\": 1\n" - + "}\n"; - mapper.readValue( - jsonStr, - CoordinatorDynamicConfig.class - ); + // killAllDataSources is a config in versions 0.22.x and older and is no longer used. + // This used to be an invalid config, but as of 0.23.0 the killAllDataSources flag no longer exsist, + // so this is a valid config + jsonStr = "{\n" + + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + + " \"killAllDataSources\": true,\n" + + " \"percentOfSegmentsToConsiderPerMove\": 1\n" + + "}\n"; + actual = mapper.readValue( + jsonStr, + CoordinatorDynamicConfig.class + ); - Assert.fail("deserialization should fail."); - } - catch (JsonMappingException e) { - Assert.assertTrue(e.getCause() instanceof IAE); - } + Assert.assertFalse(actual.isKillUnusedSegmentsInAllDataSources()); + Assert.assertEquals(2, actual.getSpecificDataSourcesToKillUnusedSegmentsIn().size()); } @Test @@ -725,35 +674,6 @@ public void testBuilderDefaults() ); } - @Test - public void testBuilderWithDefaultKillUnusedSegmentsInAllDatasourcesSpecified() - { - CoordinatorDynamicConfig defaultConfig = - CoordinatorDynamicConfig.builder() - .build(); - CoordinatorDynamicConfig config = CoordinatorDynamicConfig.builder().build(defaultConfig); - assertConfig( - config, - 900000, - 524288000, - 100, - 5, - 100, - 15, - 10, - 1, - false, - ImmutableSet.of(), - false, - EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, - ImmutableSet.of(), - 70, - false, - false, - Integer.MAX_VALUE - ); - } - @Test public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpecified() { From d569e25b4ea2d24f28032cddb22cc00d4739507c Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 4 Feb 2022 16:26:18 -0800 Subject: [PATCH 7/8] remove from docs --- docs/configuration/index.md | 3 +-- docs/operations/clean-metadata-store.md | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 97564bef1f49..41557895edbb 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -796,7 +796,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S| |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| -|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. Whitelist or All can be set via dynamic configuration `killAllDataSources` and `killDataSourceWhitelist` described later.|true| +|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|true| |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| |`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|`P90D`| |`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100| @@ -902,7 +902,6 @@ Issuing a GET request at the same URL will return the spec that is currently in |`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1| |`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false| |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none| -|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|true if `killDataSourceWhitelist` is not set or empty| |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none| |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100| |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| diff --git a/docs/operations/clean-metadata-store.md b/docs/operations/clean-metadata-store.md index fe16eb885342..0338d9aa702c 100644 --- a/docs/operations/clean-metadata-store.md +++ b/docs/operations/clean-metadata-store.md @@ -75,13 +75,13 @@ If you want to skip the details, check out the [example](#example) for configuri Segment records and segments in deep storage become eligible for deletion when both of the following conditions hold: -- When they meet the eligibility requirement of kill task datasource configuration according to `killDataSourceWhitelist` and `killAllDataSources` set in the Coordinator dynamic configuration. See [Dynamic configuration](../configuration/index.md#dynamic-configuration). +- When they meet the eligibility requirement of kill task datasource configuration according to `killDataSourceWhitelist` set in the Coordinator dynamic configuration. See [Dynamic configuration](../configuration/index.md#dynamic-configuration). - When the `durationToRetain` time has passed since their creation. Kill tasks use the following configuration: - `druid.coordinator.kill.on`: When `true`, enables the Coordinator to submit a kill task for unused segments, which deletes them completely from metadata store and from deep storage. Only applies to the specified datasources in the dynamic configuration parameter `killDataSourceWhitelist`. -If `killDataSourceWhitelist` is not set or empty, `killAllDataSources` defaults to true so that kill tasks can be submitted for all datasources. +If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources. - `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`. - `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion. - `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task. @@ -189,8 +189,8 @@ druid.coordinator.period.metadataStoreManagementPeriod=P1H # Set a kill task to poll every day to delete Segment records and segments # in deep storage > 4 days old. When druid.coordinator.kill.on is set to true, -# you must set either killAllDataSources or killDataSourceWhitelist in the dynamic -# configuration. For this example, assume killAllDataSources is set to true. +# you can set killDataSourceWhitelist in the dynamic configuration to limit +# the datasources that can be killed. # Required also for automated cleanup of rules and compaction configuration. druid.coordinator.kill.on=true From 0d644a031d85814bb29362ad912345199119f0db Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 4 Feb 2022 18:36:20 -0800 Subject: [PATCH 8/8] make coverage bot happy --- .../duty/KillUnusedSegmentsTest.java | 247 ++++++++++++------ 1 file changed, 161 insertions(+), 86 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 371e64b57f28..a1e962693155 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -20,112 +20,187 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import java.util.Collections; import java.util.List; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; /** */ +@RunWith(Enclosed.class) public class KillUnusedSegmentsTest { - @Test - public void testFindIntervalForKill() + /** + * Standing up new tests with mocks was easier than trying to move the existing tests to use mocks for consistency. + * In the future, if all tests are moved to use the same structure, this inner static class can be gotten rid of. + */ + @RunWith(MockitoJUnitRunner.class) + public static class MockedTest { - testFindIntervalForKill(null, null); - testFindIntervalForKill(ImmutableList.of(), null); - - testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), Intervals.of("2014/2015")); - - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")), - Intervals.of("2014/2017") - ); - - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2015/2016")), - Intervals.of("2014/2016") - ); - - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2015/2016"), Intervals.of("2014/2015")), - Intervals.of("2014/2016") - ); - - testFindIntervalForKill( - ImmutableList.of(Intervals.of("2015/2017"), Intervals.of("2014/2016")), - Intervals.of("2014/2017") - ); - - testFindIntervalForKill( - ImmutableList.of( - Intervals.of("2015/2019"), - Intervals.of("2014/2016"), - Intervals.of("2018/2020") - ), - Intervals.of("2014/2020") - ); - - testFindIntervalForKill( - ImmutableList.of( - Intervals.of("2015/2019"), - Intervals.of("2014/2016"), - Intervals.of("2018/2020"), - Intervals.of("2021/2022") - ), - Intervals.of("2014/2022") - ); + private static final Set ALL_DATASOURCES = ImmutableSet.of("DS1", "DS2", "DS3"); + private static final int MAX_SEGMENTS_TO_KILL = 10; + private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2); + private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1); + private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1); + + @Mock + private SegmentsMetadataManager segmentsMetadataManager; + @Mock + private IndexingServiceClient indexingServiceClient; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private DruidCoordinatorConfig config; + + @Mock + private DruidCoordinatorRuntimeParams params; + @Mock + private CoordinatorDynamicConfig coordinatorDynamicConfig; + private KillUnusedSegments target; + + @Before + public void setup() + { + Mockito.doReturn(coordinatorDynamicConfig).when(params).getCoordinatorDynamicConfig(); + Mockito.doReturn(ALL_DATASOURCES).when(segmentsMetadataManager).retrieveAllDataSourceNames(); + Mockito.doReturn(COORDINATOR_KILL_PERIOD).when(config).getCoordinatorKillPeriod(); + Mockito.doReturn(DURATION_TO_RETAIN).when(config).getCoordinatorKillDurationToRetain(); + Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod(); + Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments(); + target = new KillUnusedSegments(segmentsMetadataManager, indexingServiceClient, config); + } + @Test + public void testRunWihNoIntervalShouldNotKillAnySegments() + { + target.run(params); + Mockito.verify(indexingServiceClient, Mockito.never()) + .killUnusedSegments(anyString(), anyString(), any(Interval.class)); + } + + @Test + public void testRunWihSpecificDatasourceAndNoIntervalShouldNotKillAnySegments() + { + Mockito.when(coordinatorDynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn()).thenReturn(Collections.singleton("DS1")); + target.run(params); + Mockito.verify(indexingServiceClient, Mockito.never()) + .killUnusedSegments(anyString(), anyString(), any(Interval.class)); + } } - private void testFindIntervalForKill(List segmentIntervals, Interval expected) + public static class FindIntervalsTest { - SegmentsMetadataManager segmentsMetadataManager = EasyMock.createMock(SegmentsMetadataManager.class); - EasyMock.expect( - segmentsMetadataManager.getUnusedSegmentIntervals( - EasyMock.anyString(), - EasyMock.anyObject(DateTime.class), - EasyMock.anyInt() - ) - ).andReturn(segmentIntervals); - EasyMock.replay(segmentsMetadataManager); - IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class); - - KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( - segmentsMetadataManager, - indexingServiceClient, - new TestDruidCoordinatorConfig( - null, - null, - Duration.parse("PT76400S"), - null, - new Duration(1), - Duration.parse("PT86400S"), - Duration.parse("PT86400S"), - null, - null, - null, - null, - null, - null, - null, - null, - null, - 1000, - Duration.ZERO - ) - ); - - Assert.assertEquals( - expected, - unusedSegmentsKiller.findIntervalForKill("test", 10000) - ); + @Test + public void testFindIntervalForKill() + { + testFindIntervalForKill(null, null); + testFindIntervalForKill(ImmutableList.of(), null); + + testFindIntervalForKill(ImmutableList.of(Intervals.of("2014/2015")), Intervals.of("2014/2015")); + + testFindIntervalForKill( + ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2016/2017")), + Intervals.of("2014/2017") + ); + + testFindIntervalForKill( + ImmutableList.of(Intervals.of("2014/2015"), Intervals.of("2015/2016")), + Intervals.of("2014/2016") + ); + + testFindIntervalForKill( + ImmutableList.of(Intervals.of("2015/2016"), Intervals.of("2014/2015")), + Intervals.of("2014/2016") + ); + + testFindIntervalForKill( + ImmutableList.of(Intervals.of("2015/2017"), Intervals.of("2014/2016")), + Intervals.of("2014/2017") + ); + + testFindIntervalForKill( + ImmutableList.of( + Intervals.of("2015/2019"), + Intervals.of("2014/2016"), + Intervals.of("2018/2020") + ), + Intervals.of("2014/2020") + ); + + testFindIntervalForKill( + ImmutableList.of( + Intervals.of("2015/2019"), + Intervals.of("2014/2016"), + Intervals.of("2018/2020"), + Intervals.of("2021/2022") + ), + Intervals.of("2014/2022") + ); + } + + private void testFindIntervalForKill(List segmentIntervals, Interval expected) + { + SegmentsMetadataManager segmentsMetadataManager = EasyMock.createMock(SegmentsMetadataManager.class); + EasyMock.expect( + segmentsMetadataManager.getUnusedSegmentIntervals( + EasyMock.anyString(), + EasyMock.anyObject(DateTime.class), + EasyMock.anyInt() + ) + ).andReturn(segmentIntervals); + EasyMock.replay(segmentsMetadataManager); + IndexingServiceClient indexingServiceClient = EasyMock.createMock(IndexingServiceClient.class); + + KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments( + segmentsMetadataManager, + indexingServiceClient, + new TestDruidCoordinatorConfig( + null, + null, + Duration.parse("PT76400S"), + null, + new Duration(1), + Duration.parse("PT86400S"), + Duration.parse("PT86400S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + 1000, + Duration.ZERO + ) + ); + + Assert.assertEquals( + expected, + unusedSegmentsKiller.findIntervalForKill("test", 10000) + ); + } } }