From 40da93ef05e45425d40ef984073005393257cc6a Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 14 Aug 2023 11:26:34 -0400 Subject: [PATCH 01/15] * store state in auto kill duty so that we dont resubmit kill intervals --- .../metadata/SegmentsMetadataManager.java | 6 ++++ .../metadata/SqlSegmentsMetadataManager.java | 27 ++++++++++++--- .../duty/CoordinatorCustomDuty.java | 1 + .../coordinator/duty/KillUnusedSegments.java | 12 +++++-- .../SqlSegmentsMetadataManagerTest.java | 34 +++++++++++++++++++ .../simulate/TestSegmentsMetadataManager.java | 11 ++++++ 6 files changed, 84 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 9b41f61f9060..900685a6250b 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -144,6 +144,12 @@ Optional> iterateAllUsedNonOvershadowedSegmentsForDatasour */ List getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit); + /** + * Returns top N unused segment intervals with the end time no later than the specified maxEndTime when ordered by + * segment start time, end time. + */ + List getUnusedSegmentIntervals(String dataSource, DateTime minStartTime, DateTime maxEndTime, int limit); + @VisibleForTesting void poll(); } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 46db26a56d6c..7c52b82dc8b2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -57,6 +57,7 @@ import org.skife.jdbi.v2.BaseResultSetMapper; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; @@ -976,6 +977,17 @@ private String getSegmentsTable() @Override public List getUnusedSegmentIntervals(final String dataSource, final DateTime maxEndTime, final int limit) + { + return getUnusedSegmentIntervals(dataSource, null, maxEndTime, limit); + } + + @Override + public List getUnusedSegmentIntervals( + final String dataSource, + @Nullable final DateTime minStartTime, + final DateTime maxEndTime, + final int limit + ) { return connector.inReadOnlyTransaction( new TransactionCallback>() @@ -983,13 +995,14 @@ public List getUnusedSegmentIntervals(final String dataSource, final D @Override public List inTransaction(Handle handle, TransactionStatus status) { - Iterator iter = handle + final Query sql = handle .createQuery( StringUtils.format( "SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND " - + "%2$send%2$s <= :end AND used = false ORDER BY start, %2$send%2$s", + + "%2$send%2$s <= :end AND used = false %3$s ORDER BY start, %2$send%2$s", getSegmentsTable(), - connector.getQuoteString() + connector.getQuoteString(), + null != minStartTime ? "AND start >= :start" : "" ) ) .setFetchSize(connector.getStreamingFetchSize()) @@ -1008,8 +1021,12 @@ protected Interval mapInternal(int index, Map row) ); } } - ) - .iterator(); + ); + if (null != minStartTime) { + sql.bind("start", minStartTime.toString()); + } + + Iterator iter = sql.iterator(); List result = Lists.newArrayListWithCapacity(limit); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java index d48171472ddc..df9545cd60aa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java @@ -50,6 +50,7 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ + @JsonSubTypes.Type(name = "KillUnusedSegments", value = KillUnusedSegments.class), @JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class), @JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class), }) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index bbd58bfe6381..31a9f43c80c5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -44,6 +44,8 @@ import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Completely removes information about unused segments who have an interval end that comes before @@ -54,7 +56,7 @@ *

* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. */ -public class KillUnusedSegments implements CoordinatorDuty +public class KillUnusedSegments implements CoordinatorCustomDuty { public static final String KILL_TASK_TYPE = "kill"; public static final String TASK_ID_PREFIX = "coordinator-issued"; @@ -67,6 +69,8 @@ public class KillUnusedSegments implements CoordinatorDuty private final long retainDuration; private final boolean ignoreRetainDuration; private final int maxSegmentsToKill; + + private final ConcurrentMap datasourceToLastKillIntervalEnd; private long lastKillTime = 0; private final SegmentsMetadataManager segmentsMetadataManager; @@ -98,6 +102,8 @@ public KillUnusedSegments( this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments(); Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0"); + datasourceToLastKillIntervalEnd = new ConcurrentHashMap<>(); + log.info( "Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]", this.period, @@ -179,6 +185,7 @@ private int killUnusedSegments( } final Interval intervalToKill = findIntervalForKill(dataSource); if (intervalToKill == null) { + datasourceToLastKillIntervalEnd.remove(dataSource); continue; } @@ -190,6 +197,7 @@ private int killUnusedSegments( maxSegmentsToKill ), true); ++submittedTasks; + datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); } catch (Exception ex) { log.error(ex, "Failed to submit kill task for dataSource [%s]", dataSource); @@ -230,7 +238,7 @@ private Interval findIntervalForKill(String dataSource) : DateTimes.nowUtc().minus(retainDuration); List unusedSegmentIntervals = segmentsMetadataManager - .getUnusedSegmentIntervals(dataSource, maxEndTime, maxSegmentsToKill); + .getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill); if (CollectionUtils.isNullOrEmpty(unusedSegmentIntervals)) { return null; diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 6dad542644a6..299a02f22d46 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -389,6 +389,40 @@ public void testGetUnusedSegmentIntervals() ); } + @Test + public void testGetUnusedSegmentIntervalsWithMinStartTime() + { + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia"); + Assert.assertEquals(2, numChangedSegments); + + Assert.assertEquals( + ImmutableList.of(segment2.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 1) + ); + + // Test the DateTime maxEndTime argument of getUnusedSegmentIntervals + Assert.assertEquals( + ImmutableList.of(segment2.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of(2012, 1, 7, 0, 0), 1) + ); + Assert.assertEquals( + ImmutableList.of(segment1.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1) + ); + Assert.assertEquals( + ImmutableList.of(), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1) + ); + + Assert.assertEquals( + ImmutableList.of(segment2.getInterval(), segment1.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 5) + ); + } + @Test(timeout = 60_000) public void testMarkAsUnusedAllSegmentsInDataSource() throws IOException, InterruptedException { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index e0d1d887efe2..f87def34e53f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -204,6 +204,17 @@ public List getUnusedSegmentIntervals(String dataSource, DateTime maxE return null; } + @Override + public List getUnusedSegmentIntervals( + final String dataSource, + @Nullable final DateTime minStartTime, + final DateTime maxEndTime, + final int limit + ) + { + return null; + } + @Override public void poll() { From 55ae7bb487121322b7d6ed4fb6deaee5419ffa8a Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 15 Aug 2023 15:16:40 -0400 Subject: [PATCH 02/15] * add tests * fix stuff --- .../metadata/SegmentsMetadataManager.java | 4 +- .../server/coordinator/DruidCoordinator.java | 44 +++++++ .../coordinator/DruidCoordinatorConfig.java | 13 +- .../coordinator/duty/KillUnusedSegments.java | 37 ++++-- .../TestDruidCoordinatorConfig.java | 18 +++ .../duty/KillUnusedSegmentsTest.java | 112 ++++++++++++++++-- .../org/apache/druid/cli/CliCoordinator.java | 7 -- 7 files changed, 202 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 900685a6250b..b83bca0d7726 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -145,8 +145,8 @@ Optional> iterateAllUsedNonOvershadowedSegmentsForDatasour List getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit); /** - * Returns top N unused segment intervals with the end time no later than the specified maxEndTime when ordered by - * segment start time, end time. + * Returns top N unused segment intervals with the start time no earlier than the specified start tiem, and the + * end time no later than the specified maxEndTime when ordered by segment start time, end time. */ List getUnusedSegmentIntervals(String dataSource, DateTime minStartTime, DateTime maxEndTime, int limit); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 272de190109f..0e304a7a0242 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -68,6 +68,7 @@ import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; +import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; @@ -154,6 +155,7 @@ public class DruidCoordinator private final LookupCoordinatorManager lookupCoordinatorManager; private final DruidLeaderSelector coordLeaderSelector; private final CompactSegments compactSegments; + private final KillUnusedSegments killUnusedSegments; private volatile boolean started = false; @@ -222,6 +224,7 @@ public DruidCoordinator( this.lookupCoordinatorManager = lookupCoordinatorManager; this.coordLeaderSelector = coordLeaderSelector; this.compactSegments = initializeCompactSegmentsDuty(compactionSegmentSearchPolicy); + this.killUnusedSegments = initializeKillUnusedSegmentsDuty(); this.loadQueueManager = loadQueueManager; } @@ -590,6 +593,9 @@ List makeIndexingServiceDuties() if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) { duties.addAll(makeCompactSegmentsDuty()); } + if (null != killUnusedSegments) { + duties.addAll(makeKillUnusedSegmentsDuty()); + } log.debug( "Initialized indexing service duties [%s].", duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) @@ -624,6 +630,27 @@ CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy comp } } + @Nullable + @VisibleForTesting + KillUnusedSegments initializeKillUnusedSegmentsDuty() + { + List killUnusedSegmentsDutyFromCustomGroups = getKillUnusedSegmentsDutyFromCustomGroups(); + if (!config.getAutoKillEnabled()) { + return null; + } + if (killUnusedSegmentsDutyFromCustomGroups.isEmpty()) { + return new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + } else { + if (killUnusedSegmentsDutyFromCustomGroups.size() > 1) { + log.warn( + "More than one KillUnusedSegments duty is configured in the Coordinator Custom Duty Group." + + " The first duty will be picked up." + ); + } + return killUnusedSegmentsDutyFromCustomGroups.get(0); + } + } + @VisibleForTesting List getCompactSegmentsDutyFromCustomGroups() { @@ -636,11 +663,28 @@ List getCompactSegmentsDutyFromCustomGroups() .collect(Collectors.toList()); } + @VisibleForTesting + List getKillUnusedSegmentsDutyFromCustomGroups() + { + return customDutyGroups.getCoordinatorCustomDutyGroups() + .stream() + .flatMap(coordinatorCustomDutyGroup -> + coordinatorCustomDutyGroup.getCustomDutyList().stream()) + .filter(duty -> duty instanceof KillUnusedSegments) + .map(duty -> (KillUnusedSegments) duty) + .collect(Collectors.toList()); + } + private List makeCompactSegmentsDuty() { return ImmutableList.of(compactSegments); } + private List makeKillUnusedSegmentsDuty() + { + return ImmutableList.of(killUnusedSegments); + } + private class DutiesRunnable implements Runnable { private final DateTime coordinatorStartTime = DateTimes.nowUtc(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 9495f30f2c87..ea7f25450c94 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -23,6 +23,8 @@ import org.skife.config.Config; import org.skife.config.Default; +import javax.annotation.Nullable; + /** */ public abstract class DruidCoordinatorConfig @@ -43,9 +45,16 @@ public abstract class DruidCoordinatorConfig @Default("PT1H") public abstract Duration getCoordinatorMetadataStoreManagementPeriod(); + @Config("druid.coordinator.kill.on") + @Default("false") + public abstract Boolean getAutoKillEnabled(); + + @Nullable @Config("druid.coordinator.kill.period") - @Default("P1D") - public abstract Duration getCoordinatorKillPeriod(); + public Duration getCoordinatorKillPeriod() + { + return null; + } @Config("druid.coordinator.kill.durationToRetain") @Default("P90D") diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 31a9f43c80c5..aaba0bc5acce 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -19,6 +19,8 @@ package org.apache.druid.server.coordinator.duty; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -44,8 +46,8 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * Completely removes information about unused segments who have an interval end that comes before @@ -56,7 +58,7 @@ *

* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. */ -public class KillUnusedSegments implements CoordinatorCustomDuty +public class KillUnusedSegments implements CoordinatorCustomDuty { public static final String KILL_TASK_TYPE = "kill"; public static final String TASK_ID_PREFIX = "coordinator-issued"; @@ -65,27 +67,28 @@ public class KillUnusedSegments implements CoordinatorCustomDuty && (KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX)); private static final Logger log = new Logger(KillUnusedSegments.class); - private final long period; + @Nullable private final Long period; private final long retainDuration; private final boolean ignoreRetainDuration; private final int maxSegmentsToKill; - private final ConcurrentMap datasourceToLastKillIntervalEnd; + private final Map datasourceToLastKillIntervalEnd; private long lastKillTime = 0; private final SegmentsMetadataManager segmentsMetadataManager; private final OverlordClient overlordClient; @Inject + @JsonCreator public KillUnusedSegments( - SegmentsMetadataManager segmentsMetadataManager, - OverlordClient overlordClient, - DruidCoordinatorConfig config + @JacksonInject SegmentsMetadataManager segmentsMetadataManager, + @JacksonInject OverlordClient overlordClient, + @JacksonInject DruidCoordinatorConfig config ) { - this.period = config.getCoordinatorKillPeriod().getMillis(); + this.period = null != config.getCoordinatorKillPeriod() ? config.getCoordinatorKillPeriod().getMillis() : null; Preconditions.checkArgument( - this.period > config.getCoordinatorIndexingPeriod().getMillis(), + null == this.period || this.period > config.getCoordinatorIndexingPeriod().getMillis(), "coordinator kill period must be greater than druid.coordinator.period.indexingPeriod" ); @@ -118,9 +121,8 @@ public KillUnusedSegments( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - final long currentTimeMillis = System.currentTimeMillis(); - if (lastKillTime + period > currentTimeMillis) { + if (null != period && (lastKillTime + period > currentTimeMillis)) { log.debug("Skipping kill of unused segments as kill period has not elapsed yet."); return params; } @@ -155,6 +157,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } + // any datasources that are no longer being considered for kill should have their + // last kill interval removed from map. + datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill); addStats(taskStats, stats); return params; } @@ -178,7 +183,7 @@ private int killUnusedSegments( if (0 < availableKillTaskSlots && !CollectionUtils.isNullOrEmpty(dataSourcesToKill)) { for (String dataSource : dataSourcesToKill) { if (submittedTasks >= availableKillTaskSlots) { - log.info(StringUtils.format( + log.debug(StringUtils.format( "Submitted [%d] kill tasks and reached kill task slot limit [%d]. Will resume " + "on the next coordinator cycle.", submittedTasks, availableKillTaskSlots)); break; @@ -236,7 +241,6 @@ private Interval findIntervalForKill(String dataSource) final DateTime maxEndTime = ignoreRetainDuration ? DateTimes.COMPARE_DATE_AS_STRING_MAX : DateTimes.nowUtc().minus(retainDuration); - List unusedSegmentIntervals = segmentsMetadataManager .getUnusedSegmentIntervals(dataSource, datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill); @@ -263,6 +267,13 @@ static int getKillTaskCapacity(int totalWorkerCapacity, double killTaskSlotRatio return Math.min((int) (totalWorkerCapacity * Math.min(killTaskSlotRatio, 1.0)), maxKillTaskSlots); } + @VisibleForTesting + Map getDatasourceToLastKillIntervalEnd() + { + return datasourceToLastKillIntervalEnd; + } + + static class TaskStats { int availableTaskSlots; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 271bda113143..b36f187cc284 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -28,6 +28,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorIndexingPeriod; private final Duration metadataStoreManagementPeriod; private final Duration loadTimeoutDelay; + private final Boolean autoKillEnabled; private final Duration coordinatorKillPeriod; private final Duration coordinatorKillDurationToRetain; private final Duration coordinatorSupervisorKillPeriod; @@ -54,6 +55,7 @@ public TestDruidCoordinatorConfig( Duration coordinatorIndexingPeriod, Duration metadataStoreManagementPeriod, Duration loadTimeoutDelay, + Boolean autoKillEnabled, Duration coordinatorKillPeriod, Duration coordinatorKillDurationToRetain, Duration coordinatorSupervisorKillPeriod, @@ -80,6 +82,7 @@ public TestDruidCoordinatorConfig( this.coordinatorIndexingPeriod = coordinatorIndexingPeriod; this.metadataStoreManagementPeriod = metadataStoreManagementPeriod; this.loadTimeoutDelay = loadTimeoutDelay; + this.autoKillEnabled = autoKillEnabled; this.coordinatorKillPeriod = coordinatorKillPeriod; this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain; this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod; @@ -125,6 +128,12 @@ public Duration getCoordinatorMetadataStoreManagementPeriod() return metadataStoreManagementPeriod; } + @Override + public Boolean getAutoKillEnabled() + { + return autoKillEnabled; + } + @Override public Duration getCoordinatorKillPeriod() { @@ -251,6 +260,7 @@ public static class Builder private static final Duration DEFAULT_COORDINATOR_PERIOD = new Duration("PT60s"); private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = new Duration("PT1800s"); private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = new Duration("PT3600s"); + private static final Boolean DEFAULT_AUTO_KILL_ENABLED = false; private static final Duration DEFAULT_COORDINATOR_KILL_PERIOD = new Duration("PT86400s"); private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION = new Duration("PT7776000s"); private static final boolean DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN = false; @@ -277,6 +287,7 @@ public static class Builder private Duration coordinatorPeriod; private Duration coordinatorIndexingPeriod; private Duration metadataStoreManagementPeriod; + private Boolean autoKillEnabled; private Duration coordinatorKillPeriod; private Duration coordinatorKillDurationToRetain; private Boolean coordinatorKillIgnoreDurationToRetain; @@ -326,6 +337,12 @@ public Builder withMetadataStoreManagementPeriod(Duration metadataStoreManagemen return this; } + public Builder witAutoKillEnabled(Boolean autoKillEnabled) + { + this.autoKillEnabled = autoKillEnabled; + return this; + } + public Builder withCoordinatorKillPeriod(Duration coordinatorKillPeriod) { this.coordinatorKillPeriod = coordinatorKillPeriod; @@ -454,6 +471,7 @@ public TestDruidCoordinatorConfig build() coordinatorIndexingPeriod == null ? DEFAULT_COORDINATOR_INDEXING_PERIOD : coordinatorIndexingPeriod, metadataStoreManagementPeriod == null ? DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD : metadataStoreManagementPeriod, loadTimeoutDelay == null ? DEFAULT_LOAD_TIMEOUT_DELAY : loadTimeoutDelay, + autoKillEnabled == null ? DEFAULT_AUTO_KILL_ENABLED : autoKillEnabled, coordinatorKillPeriod == null ? DEFAULT_COORDINATOR_KILL_PERIOD : coordinatorKillPeriod, coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION : coordinatorKillDurationToRetain, coordinatorSupervisorKillPeriod == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD : coordinatorSupervisorKillPeriod, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 5d0c81385c62..85e6838bf053 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; import org.apache.druid.indexer.RunnerTaskState; @@ -55,6 +56,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.mockito.ArgumentMatchers.any; @@ -70,6 +72,7 @@ public class KillUnusedSegmentsTest private static final Duration COORDINATOR_KILL_PERIOD = Duration.standardMinutes(2); private static final Duration DURATION_TO_RETAIN = Duration.standardDays(1); private static final Duration INDEXING_PERIOD = Duration.standardMinutes(1); + private static final String DATASOURCE = "DS1"; @Mock private SegmentsMetadataManager segmentsMetadataManager; @@ -104,7 +107,7 @@ public void setup() Mockito.doReturn(INDEXING_PERIOD).when(config).getCoordinatorIndexingPeriod(); Mockito.doReturn(MAX_SEGMENTS_TO_KILL).when(config).getCoordinatorKillMaxSegments(); - Mockito.doReturn(Collections.singleton("DS1")) + Mockito.doReturn(Collections.singleton(DATASOURCE)) .when(coordinatorDynamicConfig).getSpecificDataSourcesToKillUnusedSegmentsIn(); final DateTime now = DateTimes.nowUtc(); @@ -129,18 +132,22 @@ public void setup() segmentsMetadataManager.getUnusedSegmentIntervals( ArgumentMatchers.anyString(), ArgumentMatchers.any(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt() ) ).thenAnswer(invocation -> { - DateTime maxEndTime = invocation.getArgument(1); + DateTime minStartTime = invocation.getArgument(1); + DateTime maxEndTime = invocation.getArgument(2); long maxEndMillis = maxEndTime.getMillis(); + Long minStartMillis = minStartTime != null ? minStartTime.getMillis() : null; List unusedIntervals = unusedSegments.stream() .map(DataSegment::getInterval) - .filter(i -> i.getEnd().getMillis() <= maxEndMillis) + .filter(i -> i.getEnd().getMillis() <= maxEndMillis + && (null == minStartMillis || i.getStart().getMillis() >= minStartMillis)) .collect(Collectors.toList()); - int limit = invocation.getArgument(2); + int limit = invocation.getArgument(3); return unusedIntervals.size() <= limit ? unusedIntervals : unusedIntervals.subList(0, limit); }); @@ -186,6 +193,7 @@ public void testDurationToRetain() ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); runAndVerifyKillInterval(expectedKillInterval); + verifyState(ImmutableMap.of(DATASOURCE, dayOldSegment.getInterval().getEnd())); verifyStats(9, 1, 10); } @@ -204,6 +212,7 @@ public void testNegativeDurationToRetain() ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); runAndVerifyKillInterval(expectedKillInterval); + verifyState(ImmutableMap.of(DATASOURCE, nextDaySegment.getInterval().getEnd())); verifyStats(9, 1, 10); } @@ -221,6 +230,7 @@ public void testIgnoreDurationToRetain() ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); runAndVerifyKillInterval(expectedKillInterval); + verifyState(ImmutableMap.of(DATASOURCE, nextMonthSegment.getInterval().getEnd())); verifyStats(9, 1, 10); } @@ -234,14 +244,60 @@ public void testMaxSegmentsToKill() mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); // Only 1 unused segment is killed runAndVerifyKillInterval(yearOldSegment.getInterval()); + verifyState(ImmutableMap.of(DATASOURCE, yearOldSegment.getInterval().getEnd())); verifyStats(9, 1, 10); } + @Test + public void testMultipleRuns() + { + Mockito.doReturn(true) + .when(config).getCoordinatorKillIgnoreDurationToRetain(); + Mockito.doReturn(1) + .when(config).getCoordinatorKillMaxSegments(); + Mockito.doReturn(2) + .when(config).getCoordinatorKillMaxSegments(); + Mockito.doReturn(null) + .when(config).getCoordinatorKillPeriod(); + target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + + // All future and past unused segments are killed + Interval expectedKillInterval = new Interval( + yearOldSegment.getInterval().getStart(), + nextMonthSegment.getInterval().getEnd() + ); + + mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); + // Kill segments 2 at a at time + + runAndVerifyKillInterval(new Interval( + yearOldSegment.getInterval().getStart(), + monthOldSegment.getInterval().getEnd() + )); + verifyState(ImmutableMap.of(DATASOURCE, monthOldSegment.getInterval().getEnd())); + + mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); + runAndVerifyKillInterval(new Interval( + dayOldSegment.getInterval().getStart(), + hourOldSegment.getInterval().getEnd() + )); + verifyState(ImmutableMap.of(DATASOURCE, hourOldSegment.getInterval().getEnd())); + + mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); + runAndVerifyKillInterval(new Interval( + nextDaySegment.getInterval().getStart(), + nextMonthSegment.getInterval().getEnd() + )); + verifyState(ImmutableMap.of(DATASOURCE, nextMonthSegment.getInterval().getEnd())); + verifyStats(9, 1, 10, 3); + } + @Test public void testKillTaskSlotRatioNoAvailableTaskCapacityForKill() { mockTaskSlotUsage(0.10, 10, 1, 5); runAndVerifyNoKill(); + verifyState(ImmutableMap.of()); verifyStats(0, 0, 0); } @@ -250,6 +306,8 @@ public void testMaxKillTaskSlotsNoAvailableTaskCapacityForKill() { mockTaskSlotUsage(1.0, 3, 3, 10); runAndVerifyNoKill(); + verifyState(ImmutableMap.of()); + verifyStats(0, 0, 3); } @Test @@ -300,17 +358,53 @@ private void runAndVerifyKillInterval(Interval expectedKillInterval) Mockito.verify(overlordClient, Mockito.times(1)).runKillTask( ArgumentMatchers.anyString(), - ArgumentMatchers.eq("DS1"), + ArgumentMatchers.eq(DATASOURCE), ArgumentMatchers.eq(expectedKillInterval), ArgumentMatchers.eq(limit) ); } + private void runAndVerifyKillIntervals(List expectedKillIntervals) + { + int limit = config.getCoordinatorKillMaxSegments(); + Mockito.doReturn(Futures.immediateFuture("ok")) + .when(overlordClient) + .runKillTask( + ArgumentMatchers.anyString(), + ArgumentMatchers.anyString(), + ArgumentMatchers.any(Interval.class), + ArgumentMatchers.anyInt()); + for (int i = 0; i < expectedKillIntervals.size(); i++) { + target.run(params); + verifyState(ImmutableMap.of(DATASOURCE, yearOldSegment.getInterval().getEnd())); + verifyStats(9, 1, 10); + } + + for (Interval expectedKillInterval : expectedKillIntervals) { + Mockito.verify(overlordClient, Mockito.times(1)).runKillTask( + ArgumentMatchers.anyString(), + ArgumentMatchers.eq(DATASOURCE), + ArgumentMatchers.eq(expectedKillInterval), + ArgumentMatchers.eq(limit) + ); + } + } + private void verifyStats(int availableSlots, int submittedTasks, int maxSlots) { - Mockito.verify(stats).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots); - Mockito.verify(stats).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks); - Mockito.verify(stats).add(Stats.Kill.MAX_SLOTS, maxSlots); + verifyStats(availableSlots, submittedTasks, maxSlots, 1); + } + + private void verifyStats(int availableSlots, int submittedTasks, int maxSlots, int times) + { + Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.AVAILABLE_SLOTS, availableSlots); + Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.SUBMITTED_TASKS, submittedTasks); + Mockito.verify(stats, Mockito.times(times)).add(Stats.Kill.MAX_SLOTS, maxSlots); + } + + private void verifyState(Map expectedDatasourceToLastKillIntervalEnd) + { + Assert.assertEquals(expectedDatasourceToLastKillIntervalEnd, target.getDatasourceToLastKillIntervalEnd()); } private void runAndVerifyNoKill() @@ -363,7 +457,7 @@ private void mockTaskSlotUsage( private DataSegment createSegmentWithEnd(DateTime endTime) { return new DataSegment( - "DS1", + DATASOURCE, new Interval(Period.days(1), endTime), DateTimes.nowUtc().toString(), new HashMap<>(), diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index dcc8c1a95ac0..1b42c8aac9ad 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -91,7 +91,6 @@ import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata; import org.apache.druid.server.coordinator.duty.KillRules; import org.apache.druid.server.coordinator.duty.KillSupervisors; -import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.http.ClusterResource; @@ -264,12 +263,6 @@ public void configure(Binder binder) + "for more details about compaction." ); } - conditionalIndexingServiceDutyMultibind.addConditionBinding( - "druid.coordinator.kill.on", - "false", - Predicates.equalTo("true"), - KillUnusedSegments.class - ); conditionalIndexingServiceDutyMultibind.addConditionBinding( "druid.coordinator.kill.pendingSegments.on", "true", From 55f53cae7ac7c972c98645f789851bec409bb3c5 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 15 Aug 2023 18:12:13 -0400 Subject: [PATCH 03/15] * fix failing test --- .../druid/server/coordinator/duty/KillUnusedSegments.java | 4 ++++ .../server/coordinator/DruidCoordinatorConfigTest.java | 2 +- .../server/coordinator/duty/KillUnusedSegmentsTest.java | 8 -------- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index aaba0bc5acce..a301959176e3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -72,6 +72,10 @@ public class KillUnusedSegments implements CoordinatorCustomDuty private final boolean ignoreRetainDuration; private final int maxSegmentsToKill; + /** + * Used to keep track of the last interval end time that was killed for each + * datasource. + */ private final Map datasourceToLastKillIntervalEnd; private long lastKillTime = 0; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index db264e55e02a..8249ebae7f23 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -42,7 +42,7 @@ public void testDeserialization() Assert.assertEquals(new Duration("PT300s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod()); - Assert.assertEquals(86400000, config.getCoordinatorKillPeriod().getMillis()); + Assert.assertNull(config.getCoordinatorKillPeriod()); Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis()); Assert.assertEquals(100, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 85e6838bf053..7e25f01c9cca 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -261,15 +261,7 @@ public void testMultipleRuns() .when(config).getCoordinatorKillPeriod(); target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); - // All future and past unused segments are killed - Interval expectedKillInterval = new Interval( - yearOldSegment.getInterval().getStart(), - nextMonthSegment.getInterval().getEnd() - ); - mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); - // Kill segments 2 at a at time - runAndVerifyKillInterval(new Interval( yearOldSegment.getInterval().getStart(), monthOldSegment.getInterval().getEnd() From b5988101dc838fc28dbfc741f534b4c03595f824 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 15 Aug 2023 18:14:49 -0400 Subject: [PATCH 04/15] * update docs --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 753045a92a90..4355ba7c72de 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -853,7 +853,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.

When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false| -|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| +|`druid.coordinator.kill.period`| How often to send kill tasks to the indexing service. Value must be greater `druid.coordinator.period.indexingPeriod` if set. Only applies if kill is turned on. | NULL (indexing period is used) | |`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.

Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`| |`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false| |`druid.coordinator.kill.maxSegments`|The number of unused segments to kill per kill task. This number must be greater than 0. This only applies when `druid.coordinator.kill.on=true`.|100| From 0e785e248151e6edcfcd2dc91f29d0566af57bed Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 16 Aug 2023 00:39:23 -0400 Subject: [PATCH 05/15] * remove unnecessary stubbing --- .../server/coordinator/duty/KillUnusedSegmentsTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 7e25f01c9cca..f81d8532c106 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -157,12 +157,6 @@ public void setup() @Test public void testRunWithNoIntervalShouldNotKillAnySegments() { - Mockito.doReturn(null).when(segmentsMetadataManager).getUnusedSegmentIntervals( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt() - ); - mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); target.run(params); Mockito.verify(overlordClient, Mockito.never()) @@ -253,8 +247,6 @@ public void testMultipleRuns() { Mockito.doReturn(true) .when(config).getCoordinatorKillIgnoreDurationToRetain(); - Mockito.doReturn(1) - .when(config).getCoordinatorKillMaxSegments(); Mockito.doReturn(2) .when(config).getCoordinatorKillMaxSegments(); Mockito.doReturn(null) From 8696df14d45bb830eeb574f6cd887019db8c2c5d Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 18 Aug 2023 13:05:15 -0400 Subject: [PATCH 06/15] Apply suggestions from code review Co-authored-by: Suneet Saldanha Co-authored-by: Kashif Faraz --- .../org/apache/druid/server/coordinator/DruidCoordinator.java | 2 +- .../druid/server/coordinator/DruidCoordinatorConfig.java | 3 +-- .../druid/server/coordinator/TestDruidCoordinatorConfig.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 0e304a7a0242..b1e3f7ef2b0d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -634,10 +634,10 @@ CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy comp @VisibleForTesting KillUnusedSegments initializeKillUnusedSegmentsDuty() { - List killUnusedSegmentsDutyFromCustomGroups = getKillUnusedSegmentsDutyFromCustomGroups(); if (!config.getAutoKillEnabled()) { return null; } + List killUnusedSegmentsDutyFromCustomGroups = getKillUnusedSegmentsDutyFromCustomGroups(); if (killUnusedSegmentsDutyFromCustomGroups.isEmpty()) { return new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); } else { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index ea7f25450c94..adc17caea74c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -49,11 +49,10 @@ public abstract class DruidCoordinatorConfig @Default("false") public abstract Boolean getAutoKillEnabled(); - @Nullable @Config("druid.coordinator.kill.period") public Duration getCoordinatorKillPeriod() { - return null; + return getCoordinatorIndexingPeriod(); } @Config("druid.coordinator.kill.durationToRetain") diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index b36f187cc284..f346d74d3583 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -337,7 +337,7 @@ public Builder withMetadataStoreManagementPeriod(Duration metadataStoreManagemen return this; } - public Builder witAutoKillEnabled(Boolean autoKillEnabled) + public Builder withAutoKillEnabled(Boolean autoKillEnabled) { this.autoKillEnabled = autoKillEnabled; return this; From 0f6d7950859386c24e08f75b5428e0af67c4cf2b Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 18 Aug 2023 14:59:14 -0400 Subject: [PATCH 07/15] * address review comments --- .../metadata/SegmentsMetadataManager.java | 6 --- .../metadata/SqlSegmentsMetadataManager.java | 6 --- .../server/coordinator/DruidCoordinator.java | 8 ++-- .../coordinator/DruidCoordinatorConfig.java | 2 - .../duty/CoordinatorCustomDuty.java | 2 +- .../coordinator/duty/KillUnusedSegments.java | 15 ++++--- .../SqlSegmentsMetadataManagerTest.java | 26 ------------ .../duty/KillUnusedSegmentsTest.java | 42 ++++++++++++++++--- .../simulate/TestSegmentsMetadataManager.java | 6 --- .../org/apache/druid/cli/CliCoordinator.java | 16 ++++--- 10 files changed, 60 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index b83bca0d7726..44e1bf779c10 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -138,12 +138,6 @@ Optional> iterateAllUsedNonOvershadowedSegmentsForDatasour */ Set retrieveAllDataSourceNames(); - /** - * Returns top N unused segment intervals with the end time no later than the specified maxEndTime when ordered by - * segment start time, end time. - */ - List getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit); - /** * Returns top N unused segment intervals with the start time no earlier than the specified start tiem, and the * end time no later than the specified maxEndTime when ordered by segment start time, end time. diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 7c52b82dc8b2..03954ecaebe0 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -975,12 +975,6 @@ private String getSegmentsTable() return dbTables.get().getSegmentsTable(); } - @Override - public List getUnusedSegmentIntervals(final String dataSource, final DateTime maxEndTime, final int limit) - { - return getUnusedSegmentIntervals(dataSource, null, maxEndTime, limit); - } - @Override public List getUnusedSegmentIntervals( final String dataSource, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index b1e3f7ef2b0d..e49dd94ebd4e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -46,6 +46,7 @@ import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; @@ -639,13 +640,10 @@ KillUnusedSegments initializeKillUnusedSegmentsDuty() } List killUnusedSegmentsDutyFromCustomGroups = getKillUnusedSegmentsDutyFromCustomGroups(); if (killUnusedSegmentsDutyFromCustomGroups.isEmpty()) { - return new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + return new KillUnusedSegments(segmentsMetadataManager, overlordClient, config, config.getCoordinatorIndexingPeriod()); } else { if (killUnusedSegmentsDutyFromCustomGroups.size() > 1) { - log.warn( - "More than one KillUnusedSegments duty is configured in the Coordinator Custom Duty Group." - + " The first duty will be picked up." - ); + throw new IAE("More than one KillUnusedSegments duty is configured in the Coordinator Custom Duty Group."); } return killUnusedSegmentsDutyFromCustomGroups.get(0); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index adc17caea74c..b6cfb4156a5c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -23,8 +23,6 @@ import org.skife.config.Config; import org.skife.config.Default; -import javax.annotation.Nullable; - /** */ public abstract class DruidCoordinatorConfig diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java index df9545cd60aa..690fa18f7d6d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java @@ -50,7 +50,7 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(name = "KillUnusedSegments", value = KillUnusedSegments.class), + @JsonSubTypes.Type(name = "killUnusedSegments", value = KillUnusedSegments.class), @JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class), @JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class), }) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index a301959176e3..e97f7bd22197 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -40,6 +41,7 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -67,7 +69,7 @@ public class KillUnusedSegments implements CoordinatorCustomDuty && (KILL_TASK_TYPE.equals(status.getType()) && status.getId().startsWith(TASK_ID_PREFIX)); private static final Logger log = new Logger(KillUnusedSegments.class); - @Nullable private final Long period; + private final long period; private final long retainDuration; private final boolean ignoreRetainDuration; private final int maxSegmentsToKill; @@ -87,13 +89,14 @@ public class KillUnusedSegments implements CoordinatorCustomDuty public KillUnusedSegments( @JacksonInject SegmentsMetadataManager segmentsMetadataManager, @JacksonInject OverlordClient overlordClient, - @JacksonInject DruidCoordinatorConfig config + @JacksonInject DruidCoordinatorConfig config, + @JsonProperty("dutyPeriod") Duration dutyPeriod ) { - this.period = null != config.getCoordinatorKillPeriod() ? config.getCoordinatorKillPeriod().getMillis() : null; + this.period = config.getCoordinatorKillPeriod().getMillis(); Preconditions.checkArgument( - null == this.period || this.period > config.getCoordinatorIndexingPeriod().getMillis(), - "coordinator kill period must be greater than druid.coordinator.period.indexingPeriod" + this.period >= dutyPeriod.getMillis(), + StringUtils.format("coordinator kill period must be greater than the duty period [%s]", dutyPeriod) ); this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain(); @@ -126,7 +129,7 @@ public KillUnusedSegments( public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { final long currentTimeMillis = System.currentTimeMillis(); - if (null != period && (lastKillTime + period > currentTimeMillis)) { + if (lastKillTime + period > currentTimeMillis) { log.debug("Skipping kill of unused segments as kill period has not elapsed yet."); return params; } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 299a02f22d46..579a9a248d9a 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -372,32 +372,6 @@ public void testGetUnusedSegmentIntervals() int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia"); Assert.assertEquals(2, numChangedSegments); - Assert.assertEquals( - ImmutableList.of(segment2.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 1) - ); - - // Test the DateTime maxEndTime argument of getUnusedSegmentIntervals - Assert.assertEquals( - ImmutableList.of(segment2.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), 1) - ); - - Assert.assertEquals( - ImmutableList.of(segment2.getInterval(), segment1.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 5) - ); - } - - @Test - public void testGetUnusedSegmentIntervalsWithMinStartTime() - { - sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.poll(); - Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia"); - Assert.assertEquals(2, numChangedSegments); - Assert.assertEquals( ImmutableList.of(segment2.getInterval()), sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 1) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index f81d8532c106..8e04179f4b22 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -151,7 +151,12 @@ public void setup() return unusedIntervals.size() <= limit ? unusedIntervals : unusedIntervals.subList(0, limit); }); - target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + target = new KillUnusedSegments( + segmentsMetadataManager, + overlordClient, + config, + config.getCoordinatorIndexingPeriod() + ); } @Test @@ -168,7 +173,12 @@ public void testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments() { Mockito.doReturn(Duration.standardDays(400)) .when(config).getCoordinatorKillDurationToRetain(); - target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + target = new KillUnusedSegments( + segmentsMetadataManager, + overlordClient, + config, + config.getCoordinatorIndexingPeriod() + ); // No unused segment is older than the retention period mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); @@ -197,7 +207,12 @@ public void testNegativeDurationToRetain() // Duration to retain = -1 day, reinit target for config to take effect Mockito.doReturn(DURATION_TO_RETAIN.negated()) .when(config).getCoordinatorKillDurationToRetain(); - target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + target = new KillUnusedSegments( + segmentsMetadataManager, + overlordClient, + config, + config.getCoordinatorIndexingPeriod() + ); // Segments upto 1 day in the future are killed Interval expectedKillInterval = new Interval( @@ -215,7 +230,12 @@ public void testIgnoreDurationToRetain() { Mockito.doReturn(true) .when(config).getCoordinatorKillIgnoreDurationToRetain(); - target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + target = new KillUnusedSegments( + segmentsMetadataManager, + overlordClient, + config, + config.getCoordinatorIndexingPeriod() + ); // All future and past unused segments are killed Interval expectedKillInterval = new Interval( @@ -233,7 +253,12 @@ public void testMaxSegmentsToKill() { Mockito.doReturn(1) .when(config).getCoordinatorKillMaxSegments(); - target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + target = new KillUnusedSegments( + segmentsMetadataManager, + overlordClient, + config, + config.getCoordinatorIndexingPeriod() + ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); // Only 1 unused segment is killed @@ -251,7 +276,12 @@ public void testMultipleRuns() .when(config).getCoordinatorKillMaxSegments(); Mockito.doReturn(null) .when(config).getCoordinatorKillPeriod(); - target = new KillUnusedSegments(segmentsMetadataManager, overlordClient, config); + target = new KillUnusedSegments( + segmentsMetadataManager, + overlordClient, + config, + config.getCoordinatorIndexingPeriod() + ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); runAndVerifyKillInterval(new Interval( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index f87def34e53f..deb11260b5df 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -198,12 +198,6 @@ public Set retrieveAllDataSourceNames() return null; } - @Override - public List getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit) - { - return null; - } - @Override public List getUnusedSegmentIntervals( final String dataSource, diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 1b42c8aac9ad..214332d18e05 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -418,6 +418,11 @@ public CoordinatorCustomDutyGroups get() } List dutyForGroup = jsonMapper.readValue(props.getProperty(dutyListProperty), new TypeReference>() {}); List coordinatorCustomDuties = new ArrayList<>(); + String groupPeriodPropKey = StringUtils.format("druid.coordinator.%s.period", coordinatorCustomDutyGroupName); + if (Strings.isNullOrEmpty(props.getProperty(groupPeriodPropKey))) { + throw new IAE("Run period for coordinator custom duty group must be set for group %s", coordinatorCustomDutyGroupName); + } + Duration groupPeriod = new Duration(props.getProperty(groupPeriodPropKey)); for (String dutyName : dutyForGroup) { final String dutyPropertyBase = StringUtils.format( "druid.coordinator.%s.duty.%s", @@ -436,6 +441,12 @@ public CoordinatorCustomDutyGroups get() } else { adjustedProps.put(typeProperty, dutyName); } + String dutyPeriodProperty = StringUtils.format("%s.dutyPeriod", dutyPropertyBase); + if (adjustedProps.containsKey(dutyPeriodProperty)) { + throw new IAE("'dutyPeriod' property [%s] is reserved.", dutyPeriodProperty); + } else { + adjustedProps.put(dutyPeriodProperty, props.getProperty(groupPeriodPropKey)); + } coordinatorCustomDutyProvider.inject(adjustedProps, configurator); CoordinatorCustomDuty coordinatorCustomDuty = coordinatorCustomDutyProvider.get(); if (coordinatorCustomDuty == null) { @@ -443,11 +454,6 @@ public CoordinatorCustomDutyGroups get() } coordinatorCustomDuties.add(coordinatorCustomDuty); } - String groupPeriodPropKey = StringUtils.format("druid.coordinator.%s.period", coordinatorCustomDutyGroupName); - if (Strings.isNullOrEmpty(props.getProperty(groupPeriodPropKey))) { - throw new IAE("Run period for coordinator custom duty group must be set for group %s", coordinatorCustomDutyGroupName); - } - Duration groupPeriod = new Duration(props.getProperty(groupPeriodPropKey)); coordinatorCustomDutyGroups.add(new CoordinatorCustomDutyGroup(coordinatorCustomDutyGroupName, groupPeriod, coordinatorCustomDuties)); } return new CoordinatorCustomDutyGroups(coordinatorCustomDutyGroups); From d444b719ef5ce67b0f3007afab2d2739268d128b Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 18 Aug 2023 15:33:19 -0400 Subject: [PATCH 08/15] * update docs --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 85936a4b9a92..342cfb3c1321 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -855,7 +855,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.

When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false| -|`druid.coordinator.kill.period`| How often to send kill tasks to the indexing service. Value must be greater `druid.coordinator.period.indexingPeriod` if set. Only applies if kill is turned on. | NULL (indexing period is used) | +|`druid.coordinator.kill.period`| How often to send kill tasks to the indexing service. Value must be greater `druid.coordinator.period.indexingPeriod` if set. Only applies if kill is turned on. | indexing period | |`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.

Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`| |`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false| |`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`| From ed0f11a8449b70040f53aa687d1ba96fe1c86a37 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 18 Aug 2023 15:54:56 -0400 Subject: [PATCH 09/15] * move minStartTime condition as last condition in query --- .../org/apache/druid/metadata/SqlSegmentsMetadataManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 9e76678d3687..14c6ef6c1fcf 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -1106,7 +1106,7 @@ public List inTransaction(Handle handle, TransactionStatus status) .createQuery( StringUtils.format( "SELECT start, %2$send%2$s FROM %1$s WHERE dataSource = :dataSource AND " - + "%2$send%2$s <= :end AND used = false %3$s AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated ORDER BY start, %2$send%2$s", + + "%2$send%2$s <= :end AND used = false AND used_status_last_updated IS NOT NULL AND used_status_last_updated <= :used_status_last_updated %3$s ORDER BY start, %2$send%2$s", getSegmentsTable(), connector.getQuoteString(), null != minStartTime ? "AND start >= :start" : "" From d0bbaa711c5bb0c4bfdf76958b7b112bd1c173cc Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 18 Aug 2023 17:40:58 -0400 Subject: [PATCH 10/15] * fix failing test --- .../druid/server/coordinator/DruidCoordinatorConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index 1299b15dde01..4574b43a2cc3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -42,7 +42,7 @@ public void testDeserialization() Assert.assertEquals(new Duration("PT300s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod()); - Assert.assertNull(config.getCoordinatorKillPeriod()); + Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorKillPeriod()); Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis()); Assert.assertEquals(100, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay()); From 384b6d4d83a3b011e2208c810a718f033e345e2a Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 21 Aug 2023 13:28:30 -0400 Subject: [PATCH 11/15] * fix docs and defaults --- docs/configuration/index.md | 2 +- .../druid/server/coordinator/DruidCoordinatorConfig.java | 6 ++---- .../server/coordinator/DruidCoordinatorConfigTest.java | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 342cfb3c1321..640c60bd34c5 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -855,7 +855,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.

When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false| -|`druid.coordinator.kill.period`| How often to send kill tasks to the indexing service. Value must be greater `druid.coordinator.period.indexingPeriod` if set. Only applies if kill is turned on. | indexing period | +|`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. However, if the `killUnusedSegments` duty is assigned to a custom duty group, the value must be greater than or equal to the period set for the duty group. Only applies if kill is turned on.|P1D (1 day)| |`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.

Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`| |`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false| |`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index d461a046a239..2e3695f258f4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -48,10 +48,8 @@ public abstract class DruidCoordinatorConfig public abstract Boolean getAutoKillEnabled(); @Config("druid.coordinator.kill.period") - public Duration getCoordinatorKillPeriod() - { - return getCoordinatorIndexingPeriod(); - } + @Default("P1D") + public abstract Duration getCoordinatorKillPeriod(); @Config("druid.coordinator.kill.durationToRetain") @Default("P90D") diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index 4574b43a2cc3..53c2808aa723 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -42,7 +42,7 @@ public void testDeserialization() Assert.assertEquals(new Duration("PT300s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod()); - Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorKillPeriod()); + Assert.assertEquals(86400000, config.getCoordinatorKillPeriod().getMillis()); Assert.assertEquals(7776000000L, config.getCoordinatorKillDurationToRetain().getMillis()); Assert.assertEquals(100, config.getCoordinatorKillMaxSegments()); Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay()); From b50069b7859e83e1f090eb00e172b85107e67d13 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 22 Aug 2023 18:30:09 -0400 Subject: [PATCH 12/15] * do not run auto kill in indexing duties if defined in custom group. --- .../org/apache/druid/server/coordinator/DruidCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index a12488994d8e..bcff83b0d39d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -622,7 +622,7 @@ List makeIndexingServiceDuties() if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) { duties.addAll(makeCompactSegmentsDuty()); } - if (null != killUnusedSegments) { + if (null != killUnusedSegments && getKillUnusedSegmentsDutyFromCustomGroups().isEmpty()) { duties.addAll(makeKillUnusedSegmentsDuty()); } log.debug( From 3f9c050a8ad277d382413edb435139783f419e40 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 22 Aug 2023 23:24:10 -0400 Subject: [PATCH 13/15] * dont use custom duty group --- docs/configuration/index.md | 2 +- .../server/coordinator/DruidCoordinator.java | 42 ------------------- .../coordinator/DruidCoordinatorConfig.java | 4 -- .../coordinator/duty/KillUnusedSegments.java | 18 +++----- .../TestDruidCoordinatorConfig.java | 18 -------- .../duty/KillUnusedSegmentsTest.java | 18 +++----- .../org/apache/druid/cli/CliCoordinator.java | 23 +++++----- 7 files changed, 25 insertions(+), 100 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 640c60bd34c5..b8cd80d60ecb 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -855,7 +855,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical process.|PT15M| |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true| |`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, permanently delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.

When `druid.coordinator.kill.on` is true, segments are eligible for permanent deletion once their data intervals are older than `druid.coordinator.kill.durationToRetain` relative to the current time. If a segment's data interval is older than this threshold at the time it is marked unused, it is eligible for permanent deletion immediately after being marked unused.|false| -|`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. However, if the `killUnusedSegments` duty is assigned to a custom duty group, the value must be greater than or equal to the period set for the duty group. Only applies if kill is turned on.|P1D (1 day)| +|`druid.coordinator.kill.period`| The frequency of sending kill tasks to the indexing service. The value must be greater than or equal to `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 day)| |`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.

Note that the `durationToRetain` parameter applies to the segment interval, not the time that the segment was last marked unused. For example, if `durationToRetain` is set to `P90D`, then a segment for a time chunk 90 days in the past is eligible for permanent deletion immediately after being marked unused.|`P90D`| |`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false| |`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`P30D`| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index bcff83b0d39d..832f7790adc4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -46,7 +46,6 @@ import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -70,7 +69,6 @@ import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup; import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; -import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; @@ -157,7 +155,6 @@ public class DruidCoordinator private final LookupCoordinatorManager lookupCoordinatorManager; private final DruidLeaderSelector coordLeaderSelector; private final CompactSegments compactSegments; - private final KillUnusedSegments killUnusedSegments; private volatile boolean started = false; @@ -226,7 +223,6 @@ public DruidCoordinator( this.lookupCoordinatorManager = lookupCoordinatorManager; this.coordLeaderSelector = coordLeaderSelector; this.compactSegments = initializeCompactSegmentsDuty(compactionSegmentSearchPolicy); - this.killUnusedSegments = initializeKillUnusedSegmentsDuty(); this.loadQueueManager = loadQueueManager; } @@ -622,9 +618,6 @@ List makeIndexingServiceDuties() if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) { duties.addAll(makeCompactSegmentsDuty()); } - if (null != killUnusedSegments && getKillUnusedSegmentsDutyFromCustomGroups().isEmpty()) { - duties.addAll(makeKillUnusedSegmentsDuty()); - } log.debug( "Initialized indexing service duties [%s].", duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) @@ -659,24 +652,6 @@ CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy comp } } - @Nullable - @VisibleForTesting - KillUnusedSegments initializeKillUnusedSegmentsDuty() - { - if (!config.getAutoKillEnabled()) { - return null; - } - List killUnusedSegmentsDutyFromCustomGroups = getKillUnusedSegmentsDutyFromCustomGroups(); - if (killUnusedSegmentsDutyFromCustomGroups.isEmpty()) { - return new KillUnusedSegments(segmentsMetadataManager, overlordClient, config, config.getCoordinatorIndexingPeriod()); - } else { - if (killUnusedSegmentsDutyFromCustomGroups.size() > 1) { - throw new IAE("More than one KillUnusedSegments duty is configured in the Coordinator Custom Duty Group."); - } - return killUnusedSegmentsDutyFromCustomGroups.get(0); - } - } - @VisibleForTesting List getCompactSegmentsDutyFromCustomGroups() { @@ -689,28 +664,11 @@ List getCompactSegmentsDutyFromCustomGroups() .collect(Collectors.toList()); } - @VisibleForTesting - List getKillUnusedSegmentsDutyFromCustomGroups() - { - return customDutyGroups.getCoordinatorCustomDutyGroups() - .stream() - .flatMap(coordinatorCustomDutyGroup -> - coordinatorCustomDutyGroup.getCustomDutyList().stream()) - .filter(duty -> duty instanceof KillUnusedSegments) - .map(duty -> (KillUnusedSegments) duty) - .collect(Collectors.toList()); - } - private List makeCompactSegmentsDuty() { return ImmutableList.of(compactSegments); } - private List makeKillUnusedSegmentsDuty() - { - return ImmutableList.of(killUnusedSegments); - } - private class DutiesRunnable implements Runnable { private final DateTime coordinatorStartTime = DateTimes.nowUtc(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 2e3695f258f4..4b5610f91151 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -43,10 +43,6 @@ public abstract class DruidCoordinatorConfig @Default("PT1H") public abstract Duration getCoordinatorMetadataStoreManagementPeriod(); - @Config("druid.coordinator.kill.on") - @Default("false") - public abstract Boolean getAutoKillEnabled(); - @Config("druid.coordinator.kill.period") @Default("P1D") public abstract Duration getCoordinatorKillPeriod(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index d6fbd8a2b2fb..60d5c2e1d1a4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -19,9 +19,6 @@ package org.apache.druid.server.coordinator.duty; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -41,7 +38,6 @@ import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; -import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -60,7 +56,7 @@ *

* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask. */ -public class KillUnusedSegments implements CoordinatorCustomDuty +public class KillUnusedSegments implements CoordinatorDuty { public static final String KILL_TASK_TYPE = "kill"; public static final String TASK_ID_PREFIX = "coordinator-issued"; @@ -86,18 +82,16 @@ public class KillUnusedSegments implements CoordinatorCustomDuty private final OverlordClient overlordClient; @Inject - @JsonCreator public KillUnusedSegments( - @JacksonInject SegmentsMetadataManager segmentsMetadataManager, - @JacksonInject OverlordClient overlordClient, - @JacksonInject DruidCoordinatorConfig config, - @JsonProperty("dutyPeriod") Duration dutyPeriod + SegmentsMetadataManager segmentsMetadataManager, + OverlordClient overlordClient, + DruidCoordinatorConfig config ) { this.period = config.getCoordinatorKillPeriod().getMillis(); Preconditions.checkArgument( - this.period >= dutyPeriod.getMillis(), - StringUtils.format("coordinator kill period must be greater than the duty period [%s]", dutyPeriod) + this.period > config.getCoordinatorIndexingPeriod().getMillis(), + "coordinator kill period must be greater than druid.coordinator.period.indexingPeriod" ); this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index a748835363d8..93b1246af165 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -28,7 +28,6 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorIndexingPeriod; private final Duration metadataStoreManagementPeriod; private final Duration loadTimeoutDelay; - private final Boolean autoKillEnabled; private final Duration coordinatorKillPeriod; private final Duration coordinatorKillDurationToRetain; private final Duration coordinatorSupervisorKillPeriod; @@ -55,7 +54,6 @@ public TestDruidCoordinatorConfig( Duration coordinatorIndexingPeriod, Duration metadataStoreManagementPeriod, Duration loadTimeoutDelay, - Boolean autoKillEnabled, Duration coordinatorKillPeriod, Duration coordinatorKillDurationToRetain, Duration coordinatorSupervisorKillPeriod, @@ -82,7 +80,6 @@ public TestDruidCoordinatorConfig( this.coordinatorIndexingPeriod = coordinatorIndexingPeriod; this.metadataStoreManagementPeriod = metadataStoreManagementPeriod; this.loadTimeoutDelay = loadTimeoutDelay; - this.autoKillEnabled = autoKillEnabled; this.coordinatorKillPeriod = coordinatorKillPeriod; this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain; this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod; @@ -128,12 +125,6 @@ public Duration getCoordinatorMetadataStoreManagementPeriod() return metadataStoreManagementPeriod; } - @Override - public Boolean getAutoKillEnabled() - { - return autoKillEnabled; - } - @Override public Duration getCoordinatorKillPeriod() { @@ -260,7 +251,6 @@ public static class Builder private static final Duration DEFAULT_COORDINATOR_PERIOD = new Duration("PT60s"); private static final Duration DEFAULT_COORDINATOR_INDEXING_PERIOD = new Duration("PT1800s"); private static final Duration DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD = new Duration("PT3600s"); - private static final Boolean DEFAULT_AUTO_KILL_ENABLED = false; private static final Duration DEFAULT_COORDINATOR_KILL_PERIOD = new Duration("PT86400s"); private static final Duration DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION = new Duration("PT7776000s"); private static final boolean DEFAULT_COORDINATOR_KILL_IGNORE_DURATION_TO_RETAIN = false; @@ -287,7 +277,6 @@ public static class Builder private Duration coordinatorPeriod; private Duration coordinatorIndexingPeriod; private Duration metadataStoreManagementPeriod; - private Boolean autoKillEnabled; private Duration coordinatorKillPeriod; private Duration coordinatorKillDurationToRetain; private Boolean coordinatorKillIgnoreDurationToRetain; @@ -337,12 +326,6 @@ public Builder withMetadataStoreManagementPeriod(Duration metadataStoreManagemen return this; } - public Builder withAutoKillEnabled(Boolean autoKillEnabled) - { - this.autoKillEnabled = autoKillEnabled; - return this; - } - public Builder withCoordinatorKillPeriod(Duration coordinatorKillPeriod) { this.coordinatorKillPeriod = coordinatorKillPeriod; @@ -471,7 +454,6 @@ public TestDruidCoordinatorConfig build() coordinatorIndexingPeriod == null ? DEFAULT_COORDINATOR_INDEXING_PERIOD : coordinatorIndexingPeriod, metadataStoreManagementPeriod == null ? DEFAULT_METADATA_STORE_MANAGEMENT_PERIOD : metadataStoreManagementPeriod, loadTimeoutDelay == null ? DEFAULT_LOAD_TIMEOUT_DELAY : loadTimeoutDelay, - autoKillEnabled == null ? DEFAULT_AUTO_KILL_ENABLED : autoKillEnabled, coordinatorKillPeriod == null ? DEFAULT_COORDINATOR_KILL_PERIOD : coordinatorKillPeriod, coordinatorKillDurationToRetain == null ? DEFAULT_COORDINATOR_KILL_DURATION_TO_RETAION : coordinatorKillDurationToRetain, coordinatorSupervisorKillPeriod == null ? DEFAULT_COORDINATOR_SUPERVISOR_KILL_PERIOD : coordinatorSupervisorKillPeriod, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 7c823c9788c2..5adb345c9fa9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -156,8 +156,7 @@ public void setup() target = new KillUnusedSegments( segmentsMetadataManager, overlordClient, - config, - config.getCoordinatorIndexingPeriod() + config ); } @@ -186,8 +185,7 @@ public void testRunWithSpecificDatasourceAndNoIntervalShouldNotKillAnySegments() target = new KillUnusedSegments( segmentsMetadataManager, overlordClient, - config, - config.getCoordinatorIndexingPeriod() + config ); // No unused segment is older than the retention period @@ -220,8 +218,7 @@ public void testNegativeDurationToRetain() target = new KillUnusedSegments( segmentsMetadataManager, overlordClient, - config, - config.getCoordinatorIndexingPeriod() + config ); // Segments upto 1 day in the future are killed @@ -243,8 +240,7 @@ public void testIgnoreDurationToRetain() target = new KillUnusedSegments( segmentsMetadataManager, overlordClient, - config, - config.getCoordinatorIndexingPeriod() + config ); // All future and past unused segments are killed @@ -266,8 +262,7 @@ public void testMaxSegmentsToKill() target = new KillUnusedSegments( segmentsMetadataManager, overlordClient, - config, - config.getCoordinatorIndexingPeriod() + config ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); @@ -287,8 +282,7 @@ public void testMultipleRuns() target = new KillUnusedSegments( segmentsMetadataManager, overlordClient, - config, - config.getCoordinatorIndexingPeriod() + config ); mockTaskSlotUsage(1.0, Integer.MAX_VALUE, 1, 10); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 8fda2e778618..5327f0b3a764 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -92,6 +92,7 @@ import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata; import org.apache.druid.server.coordinator.duty.KillRules; import org.apache.druid.server.coordinator.duty.KillSupervisors; +import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.http.ClusterResource; import org.apache.druid.server.http.CompactionResource; @@ -263,6 +264,12 @@ public void configure(Binder binder) + "for more details about compaction." ); } + conditionalIndexingServiceDutyMultibind.addConditionBinding( + "druid.coordinator.kill.on", + "false", + Predicates.equalTo("true"), + KillUnusedSegments.class + ); conditionalIndexingServiceDutyMultibind.addConditionBinding( "druid.coordinator.kill.pendingSegments.on", "true", @@ -418,11 +425,6 @@ public CoordinatorCustomDutyGroups get() } List dutyForGroup = jsonMapper.readValue(props.getProperty(dutyListProperty), new TypeReference>() {}); List coordinatorCustomDuties = new ArrayList<>(); - String groupPeriodPropKey = StringUtils.format("druid.coordinator.%s.period", coordinatorCustomDutyGroupName); - if (Strings.isNullOrEmpty(props.getProperty(groupPeriodPropKey))) { - throw new IAE("Run period for coordinator custom duty group must be set for group %s", coordinatorCustomDutyGroupName); - } - Duration groupPeriod = new Duration(props.getProperty(groupPeriodPropKey)); for (String dutyName : dutyForGroup) { final String dutyPropertyBase = StringUtils.format( "druid.coordinator.%s.duty.%s", @@ -441,12 +443,6 @@ public CoordinatorCustomDutyGroups get() } else { adjustedProps.put(typeProperty, dutyName); } - String dutyPeriodProperty = StringUtils.format("%s.dutyPeriod", dutyPropertyBase); - if (adjustedProps.containsKey(dutyPeriodProperty)) { - throw new IAE("'dutyPeriod' property [%s] is reserved.", dutyPeriodProperty); - } else { - adjustedProps.put(dutyPeriodProperty, props.getProperty(groupPeriodPropKey)); - } coordinatorCustomDutyProvider.inject(adjustedProps, configurator); CoordinatorCustomDuty coordinatorCustomDuty = coordinatorCustomDutyProvider.get(); if (coordinatorCustomDuty == null) { @@ -454,6 +450,11 @@ public CoordinatorCustomDutyGroups get() } coordinatorCustomDuties.add(coordinatorCustomDuty); } + String groupPeriodPropKey = StringUtils.format("druid.coordinator.%s.period", coordinatorCustomDutyGroupName); + if (Strings.isNullOrEmpty(props.getProperty(groupPeriodPropKey))) { + throw new IAE("Run period for coordinator custom duty group must be set for group %s", coordinatorCustomDutyGroupName); + } + Duration groupPeriod = new Duration(props.getProperty(groupPeriodPropKey)); coordinatorCustomDutyGroups.add(new CoordinatorCustomDutyGroup(coordinatorCustomDutyGroupName, groupPeriod, coordinatorCustomDuties)); } return new CoordinatorCustomDutyGroups(coordinatorCustomDutyGroups); From 96914355c51a715e0cded5480fe6d23f408d5001 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 22 Aug 2023 23:29:46 -0400 Subject: [PATCH 14/15] * greater than or equal to --- .../druid/server/coordinator/duty/KillUnusedSegments.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 60d5c2e1d1a4..cd1ca59b36c7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -90,8 +90,8 @@ public KillUnusedSegments( { this.period = config.getCoordinatorKillPeriod().getMillis(); Preconditions.checkArgument( - this.period > config.getCoordinatorIndexingPeriod().getMillis(), - "coordinator kill period must be greater than druid.coordinator.period.indexingPeriod" + this.period >= config.getCoordinatorIndexingPeriod().getMillis(), + "coordinator kill period must be greater than or equal to druid.coordinator.period.indexingPeriod" ); this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain(); From e170d4632f71cd9ce9b7dd31c5d44ecd90b637d2 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 22 Aug 2023 23:40:20 -0400 Subject: [PATCH 15/15] * remove erroneous customDuty group --- .../druid/server/coordinator/duty/CoordinatorCustomDuty.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java index 690fa18f7d6d..d48171472ddc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java @@ -50,7 +50,6 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(name = "killUnusedSegments", value = KillUnusedSegments.class), @JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class), @JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class), })