From 1a830eb649e5a986b39fad46b69f0bd5d5825bdb Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sun, 9 May 2021 23:58:53 -0700 Subject: [PATCH 01/13] add auto clean up datasource metadata --- docs/configuration/index.md | 3 + docs/operations/metrics.md | 1 + .../IndexerMetadataStorageCoordinator.java | 9 + .../IndexerSQLMetadataStorageCoordinator.java | 39 ++++ .../coordinator/DruidCoordinatorConfig.java | 8 + .../duty/KillDatasourceMetadata.java | 107 +++++++++ ...exerSQLMetadataStorageCoordinatorTest.java | 78 +++++++ .../CuratorDruidCoordinatorTest.java | 2 + .../coordinator/DruidCoordinatorTest.java | 2 + .../coordinator/HttpLoadQueuePeonTest.java | 2 + .../server/coordinator/LoadQueuePeonTest.java | 6 + .../coordinator/LoadQueuePeonTester.java | 2 + .../TestDruidCoordinatorConfig.java | 18 ++ .../coordinator/duty/KillAuditLogTest.java | 8 + .../duty/KillDatasourceMetadataTest.java | 219 ++++++++++++++++++ .../coordinator/duty/KillRulesTest.java | 8 + .../coordinator/duty/KillSupervisorsTest.java | 8 + .../duty/KillUnusedSegmentsTest.java | 2 + .../org/apache/druid/cli/CliCoordinator.java | 6 + 19 files changed, 528 insertions(+) create mode 100644 server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 0d76a7193d68..ccbcaf995361 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -756,6 +756,9 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.rule.on`| Boolean value for whether to enable automatic deletion of rules. If set to true, Coordinator will periodically remove rules of inactive datasource (datasource with no used and unused segments) from the rule table in metadata storage.| No | False| |`druid.coordinator.kill.rule.period`| How often to do automatic deletion of rules in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.rule.on` is set to "True".| No| `P1D`| |`druid.coordinator.kill.rule.durationToRetain`| Duration of rules to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.rule.on` is set to "True".| Yes if `druid.coordinator.kill.rule.on` is set to "True".| None| +|`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage. | No | False| +|`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to "True".| No| `P1D`| +|`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to "True".| Yes if `druid.coordinator.kill.datasource.on` is set to "True".| None| ##### Segment Management |Property|Possible Values|Description|Default| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index b68dcca58307..a4aa9abbe142 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -259,6 +259,7 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`metadata/kill/supervisor/count`|Total number of terminated supervisors that were automatically deleted from metadata store per each Coordinator kill supervisor duty run. This metric can help adjust `druid.coordinator.kill.supervisor.durationToRetain` configuration based on whether more or less terminated supervisors need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.supervisor.on` is set to true.| |Varies.| |`metadata/kill/audit/count`|Total number of audit logs that were automatically deleted from metadata store per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on whether more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.| |`metadata/kill/rule/count`|Total number of rules that were automatically deleted from metadata store per each Coordinator kill rule duty run. This metric can help adjust `druid.coordinator.kill.rule.durationToRetain` configuration based on whether more or less rules need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.rule.on` is set to true.| |Varies.| +|`metadata/kill/datasource/count`|Total number of datasource metadata that were automatically deleted from metadata store per each Coordinator kill datasource duty run (Note: datasource metadata only exists for datasource created from supervisor). This metric can help adjust `druid.coordinator.kill.datasource.durationToRetain` configuration based on whether more or less datasource metadata need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.datasource.on` is set to true.| |Varies.| If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 3a1d2578c5a0..5931f43c5343 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -267,6 +267,15 @@ SegmentPublishResult announceHistoricalSegments( */ boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata); + /** + * Remove datasource metadata created before the given timestamp and not in given excludeDatasources set. + * + * @param timestamp timestamp in milliseconds + * @param excludeDatasources set of datasource names to exclude from removal + * @return number of datasource metadata removed + */ + int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set excludeDatasources); + /** * Similar to {@link #announceHistoricalSegments(Set)}, but meant for streaming ingestion tasks for handling * the case where the task ingested no records and created no segments, but still needs to update the metadata diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 05fcb563dad4..974016a70779 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -37,6 +37,8 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -57,6 +59,7 @@ import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionIds; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; import org.skife.jdbi.v2.Batch; @@ -1423,4 +1426,40 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me .execute() ); } + + @Override + public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set excludeDatasources) + { + DateTime dateTime = DateTimes.utc(timestamp); + List datasources = connector.getDBI().withHandle( + handle -> handle + .createQuery( + StringUtils.format( + "SELECT dataSource FROM %1$s WHERE created_date < '%2$s'", + dbTables.getDataSourceTable(), + dateTime.toString() + ) + ) + .mapTo(String.class) + .list() + ); + return connector.getDBI().withHandle( + handle -> { + final PreparedBatch batch = handle.prepareBatch( + StringUtils.format( + "DELETE FROM %1$s WHERE dataSource = :dataSource AND created_date < '%2$s'", + dbTables.getDataSourceTable(), + dateTime.toString() + ) + ); + for (String datasourceMetadataInDb : datasources) { + if (!excludeDatasources.contains(datasourceMetadataInDb)) { + batch.bind("dataSource", datasourceMetadataInDb).add(); + } + } + int[] result = batch.execute(); + return IntStream.of(result).sum(); + } + ); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 7985b5c48329..14da3c8afbed 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -79,6 +79,14 @@ public abstract class DruidCoordinatorConfig @Default("PT-1s") public abstract Duration getCoordinatorRuleKillDurationToRetain(); + @Config("druid.coordinator.kill.datasource.period") + @Default("P1D") + public abstract Duration getCoordinatorDatasourceKillPeriod(); + + @Config("druid.coordinator.kill.datasource.durationToRetain") + @Default("PT-1s") + public abstract Duration getCoordinatorDatasourceKillDurationToRetain(); + @Config("druid.coordinator.load.timeout") public Duration getLoadTimeoutDelay() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java new file mode 100644 index 000000000000..6d7ac8a8299a --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * CoordinatorDuty for automatic deletion of datasource metadata from the datasource table in metadata storage. + * (Note: datasource metadata only exists for datasource created from supervisor). + */ +public class KillDatasourceMetadata implements CoordinatorDuty +{ + private static final Logger log = new Logger(KillDatasourceMetadata.class); + + private final long period; + private final long retainDuration; + private long lastKillTime = 0; + + private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + private final MetadataSupervisorManager metadataSupervisorManager; + + @Inject + public KillDatasourceMetadata( + DruidCoordinatorConfig config, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + MetadataSupervisorManager metadataSupervisorManager + ) + { + this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; + this.metadataSupervisorManager = metadataSupervisorManager; + this.period = config.getCoordinatorDatasourceKillPeriod().getMillis(); + Preconditions.checkArgument( + this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(), + "Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod" + ); + this.retainDuration = config.getCoordinatorDatasourceKillDurationToRetain().getMillis(); + Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator datasource metadata kill retainDuration must be >= 0"); + log.debug( + "Datasource Metadata Kill Task scheduling enabled with period [%s], retainDuration [%s]", + this.period, + this.retainDuration + ); + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + if ((lastKillTime + period) < System.currentTimeMillis()) { + lastKillTime = System.currentTimeMillis(); + long timestamp = System.currentTimeMillis() - retainDuration; + // Get all datasources with active supervisor + Map allSupervisor = metadataSupervisorManager.getLatest(); + Set allDatasourceWithActiveSupervisor = allSupervisor.values() + .stream() + .filter(supervisorSpec -> !(supervisorSpec instanceof NoopSupervisorSpec)) + .map(supervisorSpec -> supervisorSpec.getDataSources()) + .flatMap(Collection::stream) + .filter(datasource -> !Strings.isNullOrEmpty(datasource)) + .collect(Collectors.toSet()); + + int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(timestamp, allDatasourceWithActiveSupervisor); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/datasource/count", + datasourceMetadataRemovedCount + ) + ); + log.info("Finished running KillDatasourceMetadata duty. Removed %,d datasource metadata", datasourceMetadataRemovedCount); + } + return params; + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 924ad92bdcd4..bbffb5088327 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1471,4 +1471,82 @@ public void testDropSegmentsWithHandleForSegmentThatDoesNotExist() Assert.assertEquals(IndexerSQLMetadataStorageCoordinator.DataStoreMetadataUpdateResult.TRY_AGAIN, result); } } + + @Test + public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDeleted() throws Exception + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + ImmutableSet.of(), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + + // Try delete. Datasource should not be deleted as it is in excluded set + int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of("fooDataSource")); + + // Datasource should not be deleted + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + Assert.assertEquals(0, deletedCount); + } + + @Test + public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThanTimeShouldBeDeleted() throws Exception + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + ImmutableSet.of(), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + + // Try delete. Datasource should be deleted as it is not in excluded set and created time older than given time + int deletedCount = coordinator.removeDataSourceMetadataOlderThan(System.currentTimeMillis(), ImmutableSet.of()); + + // Datasource should be deleted + Assert.assertNull( + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + Assert.assertEquals(0, deletedCount); + } + + @Test + public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() throws Exception + { + coordinator.announceHistoricalSegments( + ImmutableSet.of(defaultSegment), + ImmutableSet.of(), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")) + ); + + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + + // Do delete. Datasource metadata should not be deleted. Datasource is not active but it was created just now so it's + // created timestamp will be later than the timestamp 2012-01-01T00:00:00Z + int deletedCount = coordinator.removeDataSourceMetadataOlderThan(DateTimes.of("2012-01-01T00:00:00Z").getMillis(), ImmutableSet.of()); + + // Datasource should not be deleted + Assert.assertEquals( + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + coordinator.retrieveDataSourceMetadata("fooDataSource") + ); + Assert.assertEquals(0, deletedCount); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index b8e3103681ef..adad57e7a8ea 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -178,6 +178,8 @@ public void setUp() throws Exception null, null, null, + null, + null, 10, new Duration("PT0s") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 97c220d3626c..2d88d6ee4f13 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -150,6 +150,8 @@ public void setUp() throws Exception null, null, null, + null, + null, 10, new Duration("PT0s") ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java index 2a8550003998..4991f1373f2f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -87,6 +87,8 @@ public class HttpLoadQueuePeonTest null, null, null, + null, + null, 10, Duration.ZERO ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java index 8b2f486d1ace..ea4322a02f8a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java @@ -102,6 +102,8 @@ public void testMultipleLoadDropSegments() throws Exception null, null, null, + null, + null, 10, Duration.millis(0) ) @@ -304,6 +306,8 @@ public void testFailAssignForNonTimeoutFailures() throws Exception null, null, null, + null, + null, 10, new Duration("PT1s") ) @@ -363,6 +367,8 @@ public void testFailAssignForLoadDropTimeout() throws Exception null, null, null, + null, + null, 10, new Duration("PT1s") ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java index 57ac65b88b8c..326bc764113c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java @@ -51,6 +51,8 @@ public LoadQueuePeonTester() null, null, null, + null, + null, 10, new Duration("PT1s") ) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 2c1b7067cc4a..e2c05703dc6e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -37,6 +37,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final Duration coordinatorAuditKillDurationToRetain; private final Duration coordinatorRuleKillPeriod; private final Duration coordinatorRuleKillDurationToRetain; + private final Duration coordinatorDatasourceKillPeriod; + private final Duration coordinatorDatasourceKillDurationToRetain; private final Duration getLoadQueuePeonRepeatDelay; private final int coordinatorKillMaxSegments; @@ -54,6 +56,8 @@ public TestDruidCoordinatorConfig( Duration coordinatorAuditKillDurationToRetain, Duration coordinatorRuleKillPeriod, Duration coordinatorRuleKillDurationToRetain, + Duration coordinatorDatasourceKillPeriod, + Duration coordinatorDatasourceKillDurationToRetain, int coordinatorKillMaxSegments, Duration getLoadQueuePeonRepeatDelay ) @@ -71,6 +75,8 @@ public TestDruidCoordinatorConfig( this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain; this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod; this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain; + this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod; + this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain; this.coordinatorKillMaxSegments = coordinatorKillMaxSegments; this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay; } @@ -147,6 +153,18 @@ public Duration getCoordinatorRuleKillDurationToRetain() return coordinatorRuleKillDurationToRetain; } + @Override + public Duration getCoordinatorDatasourceKillPeriod() + { + return coordinatorDatasourceKillPeriod; + } + + @Override + public Duration getCoordinatorDatasourceKillDurationToRetain() + { + return coordinatorDatasourceKillDurationToRetain; + } + @Override public int getCoordinatorKillMaxSegments() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java index aeb0bd9cbc5d..4b883f38e783 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillAuditLogTest.java @@ -68,6 +68,8 @@ public void testRunSkipIfLastRunLessThanPeriod() new Duration("PT1S"), null, null, + null, + null, 10, null ); @@ -94,6 +96,8 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() new Duration("PT1S"), null, null, + null, + null, 10, null ); @@ -120,6 +124,8 @@ public void testConstructorFailIfInvalidPeriod() new Duration("PT1S"), null, null, + null, + null, 10, null ); @@ -145,6 +151,8 @@ public void testConstructorFailIfInvalidRetainDuration() new Duration("PT-1S"), null, null, + null, + null, 10, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java new file mode 100644 index 000000000000..ac28838af15b --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.TestSupervisorSpec; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Map; + +@RunWith(MockitoJUnitRunner.class) +public class KillDatasourceMetadataTest +{ + @Mock + private IndexerMetadataStorageCoordinator mockIndexerMetadataStorageCoordinator; + + @Mock + private MetadataSupervisorManager mockMetadataSupervisorManager; + + @Mock + private DruidCoordinatorRuntimeParams mockDruidCoordinatorRuntimeParams; + + @Mock + private TestSupervisorSpec mockKinesisSupervisorSpec; + + @Mock + private ServiceEmitter mockServiceEmitter; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private KillDatasourceMetadata killDatasourceMetadata; + + @Test + public void testRunSkipIfLastRunLessThanPeriod() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration(Long.MAX_VALUE), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verifyZeroInteractions(mockIndexerMetadataStorageCoordinator); + Mockito.verifyZeroInteractions(mockMetadataSupervisorManager); + } + + @Test + public void testRunNotSkipIfLastRunMoreThanPeriod() + { + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.anySet()); + Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } + + @Test + public void testConstructorFailIfInvalidPeriod() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT3S"), + new Duration("PT1S"), + 10, + null + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + } + + @Test + public void testConstructorFailIfInvalidRetainDuration() + { + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT-1S"), + 10, + null + ); + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0"); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + } + + @Test + public void testRunWithFilterExcludedDatasource() + { + String terminatedDatasource = "datasource-1"; + String activeDatasource = "datasource-2"; + Mockito.when(mockKinesisSupervisorSpec.getDataSources()).thenReturn(ImmutableList.of(activeDatasource)); + Map existingSpecs = ImmutableMap.of( + "id1", new NoopSupervisorSpec(null, ImmutableList.of(terminatedDatasource)), + "id2", mockKinesisSupervisorSpec + ); + Mockito.when(mockMetadataSupervisorManager.getLatest()).thenReturn(existingSpecs); + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of(activeDatasource))); + Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java index d3268f21150f..8d63e9136554 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillRulesTest.java @@ -75,6 +75,8 @@ public void testRunSkipIfLastRunLessThanPeriod() null, new Duration(Long.MAX_VALUE), new Duration("PT1S"), + null, + null, 10, null ); @@ -101,6 +103,8 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() null, new Duration("PT6S"), new Duration("PT1S"), + null, + null, 10, null ); @@ -127,6 +131,8 @@ public void testConstructorFailIfInvalidPeriod() null, new Duration("PT3S"), new Duration("PT1S"), + null, + null, 10, null ); @@ -152,6 +158,8 @@ public void testConstructorFailIfInvalidRetainDuration() null, new Duration("PT6S"), new Duration("PT-1S"), + null, + null, 10, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java index 9aeb992f9b14..9ab3fb6b3084 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillSupervisorsTest.java @@ -68,6 +68,8 @@ public void testRunSkipIfLastRunLessThanPeriod() null, null, null, + null, + null, 10, null ); @@ -94,6 +96,8 @@ public void testRunNotSkipIfLastRunMoreThanPeriod() null, null, null, + null, + null, 10, null ); @@ -120,6 +124,8 @@ public void testConstructorFailIfInvalidPeriod() null, null, null, + null, + null, 10, null ); @@ -145,6 +151,8 @@ public void testConstructorFailIfInvalidRetainDuration() null, null, null, + null, + null, 10, null ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 34d020f5cd72..8a6eef3b62f4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -115,6 +115,8 @@ private void testFindIntervalForKill(List segmentIntervals, Interval e null, null, null, + null, + null, 1000, Duration.ZERO ) diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 584023c4d6f9..469ad866c98f 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -73,6 +73,7 @@ import org.apache.druid.server.coordinator.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.duty.CoordinatorDuty; import org.apache.druid.server.coordinator.duty.KillAuditLog; +import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata; import org.apache.druid.server.coordinator.duty.KillRules; import org.apache.druid.server.coordinator.duty.KillSupervisors; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; @@ -268,6 +269,11 @@ public void configure(Binder binder) Predicates.equalTo("true"), KillRules.class ); + conditionalMetadataStoreManagementDutyMultibind.addConditionBinding( + "druid.coordinator.kill.datasource.on", + Predicates.equalTo("true"), + KillDatasourceMetadata.class + ); bindNodeRoleAndAnnouncer( binder, From b09699120a6847c07634dbfefba1fd9f7510f081 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sun, 9 May 2021 23:59:50 -0700 Subject: [PATCH 02/13] add test --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index bbffb5088327..7acd90f0f3f4 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1520,7 +1520,7 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThan Assert.assertNull( coordinator.retrieveDataSourceMetadata("fooDataSource") ); - Assert.assertEquals(0, deletedCount); + Assert.assertEquals(1, deletedCount); } @Test From c2c4c327145e3fa11d029ac2400c64aa5daa3109 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 00:01:43 -0700 Subject: [PATCH 03/13] fix checkstyle --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 2 -- .../druid/server/coordinator/duty/KillDatasourceMetadata.java | 1 - .../server/coordinator/duty/KillDatasourceMetadataTest.java | 1 - 3 files changed, 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 974016a70779..6c652fcb2cc2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -37,8 +37,6 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; -import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java index 6d7ac8a8299a..53511351728f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -25,7 +25,6 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index ac28838af15b..473c210f185f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; -import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; From 92c37abd744cfdd5c1550fb7be97e0eaf5a08374 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 00:14:06 -0700 Subject: [PATCH 04/13] add comments --- .../server/coordinator/duty/KillDatasourceMetadata.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java index 53511351728f..f229f69be54a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -81,16 +81,20 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if ((lastKillTime + period) < System.currentTimeMillis()) { lastKillTime = System.currentTimeMillis(); long timestamp = System.currentTimeMillis() - retainDuration; - // Get all datasources with active supervisor + // Datasource metadata only exists for datasource with supervisor + // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource + // is still active or not Map allSupervisor = metadataSupervisorManager.getLatest(); Set allDatasourceWithActiveSupervisor = allSupervisor.values() .stream() + // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // (NoopSupervisorSpec is used as a tombstone marker) .filter(supervisorSpec -> !(supervisorSpec instanceof NoopSupervisorSpec)) .map(supervisorSpec -> supervisorSpec.getDataSources()) .flatMap(Collection::stream) .filter(datasource -> !Strings.isNullOrEmpty(datasource)) .collect(Collectors.toSet()); - + // We exclude removing datasource metadata with active supervisor int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(timestamp, allDatasourceWithActiveSupervisor); ServiceEmitter emitter = params.getEmitter(); emitter.emit( From 244783089ea0bdc1bb8b066ced078ba6ce1158b1 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 10:45:21 -0700 Subject: [PATCH 05/13] fix error --- .../test/TestIndexerMetadataStorageCoordinator.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index decaff87f875..fb274f348459 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -34,6 +34,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -139,6 +140,13 @@ public SegmentPublishResult commitMetadataOnly( throw new UnsupportedOperationException("Not implemented, no test uses this currently."); } + @Override + public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set excludeDatasources) + { + throw new UnsupportedOperationException("Not implemented, no test uses this currently."); + } + + @Override public SegmentIdWithShardSpec allocatePendingSegment( String dataSource, From 5032ecfcff07b991a84c9a84e9e921abf5c1a68f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 12:42:53 -0700 Subject: [PATCH 06/13] address comments --- .../IndexerMetadataStorageCoordinator.java | 3 +- .../IndexerSQLMetadataStorageCoordinator.java | 4 +- .../duty/KillDatasourceMetadata.java | 65 +++++++++++-------- .../duty/KillDatasourceMetadataTest.java | 32 +++++++++ 4 files changed, 76 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 5931f43c5343..513f5b98c598 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -26,6 +26,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -274,7 +275,7 @@ SegmentPublishResult announceHistoricalSegments( * @param excludeDatasources set of datasource names to exclude from removal * @return number of datasource metadata removed */ - int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set excludeDatasources); + int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set excludeDatasources); /** * Similar to {@link #announceHistoricalSegments(Set)}, but meant for streaming ingestion tasks for handling diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 6c652fcb2cc2..a78210474c57 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -74,6 +74,7 @@ import org.skife.jdbi.v2.util.ByteArrayMapper; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.io.IOException; import java.sql.ResultSet; import java.util.ArrayList; @@ -1426,7 +1427,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me } @Override - public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set excludeDatasources) + public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set excludeDatasources) { DateTime dateTime = DateTimes.utc(timestamp); List datasources = connector.getDBI().withHandle( @@ -1441,6 +1442,7 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @Nullable Set datasourcesToDelete = return connector.getDBI().withHandle( handle -> { final PreparedBatch batch = handle.prepareBatch( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java index f229f69be54a..c2f637c7b2e5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -78,32 +78,45 @@ public KillDatasourceMetadata( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - long timestamp = System.currentTimeMillis() - retainDuration; - // Datasource metadata only exists for datasource with supervisor - // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource - // is still active or not - Map allSupervisor = metadataSupervisorManager.getLatest(); - Set allDatasourceWithActiveSupervisor = allSupervisor.values() - .stream() - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec - // (NoopSupervisorSpec is used as a tombstone marker) - .filter(supervisorSpec -> !(supervisorSpec instanceof NoopSupervisorSpec)) - .map(supervisorSpec -> supervisorSpec.getDataSources()) - .flatMap(Collection::stream) - .filter(datasource -> !Strings.isNullOrEmpty(datasource)) - .collect(Collectors.toSet()); - // We exclude removing datasource metadata with active supervisor - int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan(timestamp, allDatasourceWithActiveSupervisor); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/datasource/count", - datasourceMetadataRemovedCount - ) - ); - log.info("Finished running KillDatasourceMetadata duty. Removed %,d datasource metadata", datasourceMetadataRemovedCount); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + // Datasource metadata only exists for datasource with supervisor + // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource + // is still active or not + Map allSupervisor = metadataSupervisorManager.getLatest(); + Set allDatasourceWithActiveSupervisor = allSupervisor.values() + .stream() + // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // (NoopSupervisorSpec is used as a tombstone marker) + .filter(supervisorSpec -> !(supervisorSpec instanceof NoopSupervisorSpec)) + .map(supervisorSpec -> supervisorSpec.getDataSources()) + .flatMap(Collection::stream) + .filter(datasource -> !Strings.isNullOrEmpty( + datasource)) + .collect(Collectors.toSet()); + // We exclude removing datasource metadata with active supervisor + int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan( + timestamp, + allDatasourceWithActiveSupervisor + ); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/datasource/count", + datasourceMetadataRemovedCount + ) + ); + log.info( + "Finished running KillDatasourceMetadata duty. Removed %,d datasource metadata", + datasourceMetadataRemovedCount + ); + } + catch (Exception e) { + log.error(e, "Failed to kill datasource metadata"); + } } return params; } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index 473c210f185f..9af62283c683 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -215,4 +215,36 @@ public void testRunWithFilterExcludedDatasource() Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of(activeDatasource))); Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); } + + @Test + public void testRunWithEmptyFilterExcludedDatasource() + { + Map existingSpecs = ImmutableMap.of(); + Mockito.when(mockMetadataSupervisorManager.getLatest()).thenReturn(existingSpecs); + Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); + + TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( + null, + null, + null, + new Duration("PT5S"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + new Duration("PT6S"), + new Duration("PT1S"), + 10, + null + ); + killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); + killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); + Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of())); + Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); + } } From 3bda833ac4400955a34a111d4e66da9f456bcde0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 13:52:29 -0700 Subject: [PATCH 07/13] Address comments --- .../IndexerSQLMetadataStorageCoordinator.java | 10 ++--- .../metadata/MetadataSupervisorManager.java | 19 +++++++++ .../SQLMetadataSupervisorManager.java | 41 +++++++++++++++---- .../server/coordinator/duty/KillAuditLog.java | 32 +++++++++------ .../duty/KillDatasourceMetadata.java | 20 ++++----- .../server/coordinator/duty/KillRules.java | 32 +++++++++------ .../coordinator/duty/KillSupervisors.java | 32 +++++++++------ .../SQLMetadataSupervisorManagerTest.java | 40 ++++++++++++++++++ 8 files changed, 161 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a78210474c57..46d6c0e069d3 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1430,7 +1430,7 @@ public boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata me public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set excludeDatasources) { DateTime dateTime = DateTimes.utc(timestamp); - List datasources = connector.getDBI().withHandle( + List datasourcesToDelete = connector.getDBI().withHandle( handle -> handle .createQuery( StringUtils.format( @@ -1442,7 +1442,7 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set datasourcesToDelete = + datasourcesToDelete.removeAll(excludeDatasources); return connector.getDBI().withHandle( handle -> { final PreparedBatch batch = handle.prepareBatch( @@ -1452,10 +1452,8 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set> getAll(); + /** + * Return latest supervisors (both active and terminated) + * + * @return latest terminated supervisors + */ Map getLatest(); + /** + * Only return the latest active supervisors + * + * @return latest active supervisors + */ + Map getLatestActiveOnly(); + + /** + * Only return the latest terminated supervisors + * + * @return latest terminated supervisors + */ + Map getLatestTerminatedOnly(); + /** * Remove terminated supervisors created before the given timestamp. * diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index e995a14aac16..0480c451f9f4 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -253,11 +253,41 @@ public Map fold( ); } + @Override + public Map getLatestActiveOnly() + { + Map supervisors = getLatest(); + Map activeSupervisors = new HashMap<>(); + for (Map.Entry entry : supervisors.entrySet()) { + // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // (NoopSupervisorSpec is used as a tombstone marker) + if (!(entry.getValue() instanceof NoopSupervisorSpec)) { + activeSupervisors.put(entry.getKey(), entry.getValue()); + } + } + return ImmutableMap.copyOf(activeSupervisors); + } + + @Override + public Map getLatestTerminatedOnly() + { + Map supervisors = getLatest(); + Map activeSupervisors = new HashMap<>(); + for (Map.Entry entry : supervisors.entrySet()) { + // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec + // (NoopSupervisorSpec is used as a tombstone marker) + if (entry.getValue() instanceof NoopSupervisorSpec) { + activeSupervisors.put(entry.getKey(), entry.getValue()); + } + } + return ImmutableMap.copyOf(activeSupervisors); + } + @Override public int removeTerminatedSupervisorsOlderThan(long timestamp) { DateTime dateTime = DateTimes.utc(timestamp); - Map supervisors = getLatest(); + Map terminatedSupervisors = getLatestTerminatedOnly(); return dbi.withHandle( handle -> { final PreparedBatch batch = handle.prepareBatch( @@ -267,13 +297,8 @@ public int removeTerminatedSupervisorsOlderThan(long timestamp) dateTime.toString() ) ); - for (Map.Entry supervisor : supervisors.entrySet()) { - final SupervisorSpec spec = supervisor.getValue(); - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec - // (NoopSupervisorSpec is used as a tombstone marker) - if (spec instanceof NoopSupervisorSpec) { - batch.bind("spec_id", supervisor.getKey()).add(); - } + for (Map.Entry supervisor : terminatedSupervisors.entrySet()) { + batch.bind("spec_id", supervisor.getKey()).add(); } int[] result = batch.execute(); return IntStream.of(result).sum(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java index 651cf6b60c96..ac735c6594aa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillAuditLog.java @@ -51,6 +51,7 @@ public KillAuditLog( ); this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator audit kill retainDuration cannot be greater than current time in ms"); log.debug( "Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -62,19 +63,24 @@ public KillAuditLog( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - long timestamp = System.currentTimeMillis() - retainDuration; - int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/audit/count", - auditRemoved - ) - ); - log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/audit/count", + auditRemoved + ) + ); + log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved); + } + catch (Exception e) { + log.error(e, "Failed to kill audit log"); + } } return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java index c2f637c7b2e5..40fc5b263a55 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -23,7 +23,6 @@ import com.google.common.base.Strings; import com.google.inject.Inject; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -68,6 +67,7 @@ public KillDatasourceMetadata( ); this.retainDuration = config.getCoordinatorDatasourceKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator datasource metadata kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator datasource metadata kill retainDuration cannot be greater than current time in ms"); log.debug( "Datasource Metadata Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -86,17 +86,13 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // Datasource metadata only exists for datasource with supervisor // To determine if datasource metadata is still active, we check if the supervisor for that particular datasource // is still active or not - Map allSupervisor = metadataSupervisorManager.getLatest(); - Set allDatasourceWithActiveSupervisor = allSupervisor.values() - .stream() - // Terminated supervisor will have it's latest supervisorSpec as NoopSupervisorSpec - // (NoopSupervisorSpec is used as a tombstone marker) - .filter(supervisorSpec -> !(supervisorSpec instanceof NoopSupervisorSpec)) - .map(supervisorSpec -> supervisorSpec.getDataSources()) - .flatMap(Collection::stream) - .filter(datasource -> !Strings.isNullOrEmpty( - datasource)) - .collect(Collectors.toSet()); + Map allActiveSupervisor = metadataSupervisorManager.getLatestActiveOnly(); + Set allDatasourceWithActiveSupervisor = allActiveSupervisor.values() + .stream() + .map(supervisorSpec -> supervisorSpec.getDataSources()) + .flatMap(Collection::stream) + .filter(datasource -> !Strings.isNullOrEmpty(datasource)) + .collect(Collectors.toSet()); // We exclude removing datasource metadata with active supervisor int datasourceMetadataRemovedCount = indexerMetadataStorageCoordinator.removeDataSourceMetadataOlderThan( timestamp, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java index eb4f0186cdd4..50b1740e8643 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillRules.java @@ -47,6 +47,7 @@ public KillRules( ); this.retainDuration = config.getCoordinatorRuleKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "coordinator rule kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator rule kill retainDuration cannot be greater than current time in ms"); log.debug( "Rule Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -57,19 +58,24 @@ public KillRules( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - long timestamp = System.currentTimeMillis() - retainDuration; - int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/rule/count", - ruleRemoved - ) - ); - log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + int ruleRemoved = params.getDatabaseRuleManager().removeRulesForEmptyDatasourcesOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/rule/count", + ruleRemoved + ) + ); + log.info("Finished running KillRules duty. Removed %,d rule", ruleRemoved); + } + catch (Exception e) { + log.error(e, "Failed to kill rules metadata"); + } } return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java index a1c2a3f28326..8a0b484f3eaa 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillSupervisors.java @@ -55,6 +55,7 @@ public KillSupervisors( ); this.retainDuration = config.getCoordinatorSupervisorKillDurationToRetain().getMillis(); Preconditions.checkArgument(this.retainDuration >= 0, "Coordinator supervisor kill retainDuration must be >= 0"); + Preconditions.checkArgument(this.retainDuration < System.currentTimeMillis(), "Coordinator supervisor kill retainDuration cannot be greater than current time in ms"); log.debug( "Supervisor Kill Task scheduling enabled with period [%s], retainDuration [%s]", this.period, @@ -65,19 +66,24 @@ public KillSupervisors( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - if ((lastKillTime + period) < System.currentTimeMillis()) { - lastKillTime = System.currentTimeMillis(); - - long timestamp = System.currentTimeMillis() - retainDuration; - int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp); - ServiceEmitter emitter = params.getEmitter(); - emitter.emit( - new ServiceMetricEvent.Builder().build( - "metadata/kill/supervisor/count", - supervisorRemoved - ) - ); - log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved); + long currentTimeMillis = System.currentTimeMillis(); + if ((lastKillTime + period) < currentTimeMillis) { + lastKillTime = currentTimeMillis; + long timestamp = currentTimeMillis - retainDuration; + try { + int supervisorRemoved = metadataSupervisorManager.removeTerminatedSupervisorsOlderThan(timestamp); + ServiceEmitter emitter = params.getEmitter(); + emitter.emit( + new ServiceMetricEvent.Builder().build( + "metadata/kill/supervisor/count", + supervisorRemoved + ) + ); + log.info("Finished running KillSupervisors duty. Removed %,d supervisor", supervisorRemoved); + } + catch (Exception e) { + log.error(e, "Failed to kill terminated supervisor metadata"); + } } return params; } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 5e4f4833b091..9547387bbbf0 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -243,6 +243,46 @@ public void testSkipDeserializingBadSpecs() Assert.assertNull(specs.get(0).getSpec()); } + @Test + public void testGetLatestActiveOnly() + { + final String supervisor1 = "test-supervisor-1"; + final String datasource1 = "datasource-1"; + final String supervisor2 = "test-supervisor-2"; + final Map data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1"); + Assert.assertTrue(supervisorManager.getAll().isEmpty()); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1)); + // supervisor1 is terminated + supervisorManager.insert(supervisor1, new NoopSupervisorSpec(supervisor1, ImmutableList.of(datasource1))); + // supervisor2 is still active + supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data1rev1)); + // get latest active should only return supervisor2 + Map actual = supervisorManager.getLatestActiveOnly(); + Assert.assertEquals(1, actual.size()); + Assert.assertTrue(actual.containsKey(supervisor2)); + } + + + @Test + public void testGetLatestTerminatedOnly() + { + final String supervisor1 = "test-supervisor-1"; + final String datasource1 = "datasource-1"; + final String supervisor2 = "test-supervisor-2"; + final Map data1rev1 = ImmutableMap.of("key1-1", "value1-1-1", "key1-2", "value1-2-1"); + Assert.assertTrue(supervisorManager.getAll().isEmpty()); + supervisorManager.insert(supervisor1, new TestSupervisorSpec(supervisor1, data1rev1)); + // supervisor1 is terminated + supervisorManager.insert(supervisor1, new NoopSupervisorSpec(supervisor1, ImmutableList.of(datasource1))); + // supervisor2 is still active + supervisorManager.insert(supervisor2, new TestSupervisorSpec(supervisor2, data1rev1)); + // get latest terminated should only return supervisor1 + Map actual = supervisorManager.getLatestTerminatedOnly(); + Assert.assertEquals(1, actual.size()); + Assert.assertTrue(actual.containsKey(supervisor1)); + } + + private static class BadSupervisorSpec implements SupervisorSpec { private final String id; From 5ab230842a9b843da444f29783b780505fb7f02f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 14:06:11 -0700 Subject: [PATCH 08/13] fix test --- .../duty/KillDatasourceMetadataTest.java | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index 9af62283c683..592005393d75 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -178,44 +178,6 @@ public void testConstructorFailIfInvalidRetainDuration() killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); } - @Test - public void testRunWithFilterExcludedDatasource() - { - String terminatedDatasource = "datasource-1"; - String activeDatasource = "datasource-2"; - Mockito.when(mockKinesisSupervisorSpec.getDataSources()).thenReturn(ImmutableList.of(activeDatasource)); - Map existingSpecs = ImmutableMap.of( - "id1", new NoopSupervisorSpec(null, ImmutableList.of(terminatedDatasource)), - "id2", mockKinesisSupervisorSpec - ); - Mockito.when(mockMetadataSupervisorManager.getLatest()).thenReturn(existingSpecs); - Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); - - TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( - null, - null, - null, - new Duration("PT5S"), - null, - null, - null, - null, - null, - null, - null, - null, - null, - new Duration("PT6S"), - new Duration("PT1S"), - 10, - null - ); - killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager); - killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams); - Mockito.verify(mockIndexerMetadataStorageCoordinator).removeDataSourceMetadataOlderThan(ArgumentMatchers.anyLong(), ArgumentMatchers.eq(ImmutableSet.of(activeDatasource))); - Mockito.verify(mockServiceEmitter).emit(ArgumentMatchers.any(ServiceEventBuilder.class)); - } - @Test public void testRunWithEmptyFilterExcludedDatasource() { From 411bf44271c3541515cf15bb7c7cb4aad3bb3b1f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 14:06:20 -0700 Subject: [PATCH 09/13] fix test --- .../server/coordinator/duty/KillDatasourceMetadataTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index 592005393d75..2fef6cf2a2bc 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -19,11 +19,9 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; From a3d57e386e2ea5c28549af746d99883e20607fb5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 16:04:50 -0700 Subject: [PATCH 10/13] fix typo --- services/src/main/java/org/apache/druid/cli/CliCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 469ad866c98f..c864592b3ee9 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -255,7 +255,7 @@ public void configure(Binder binder) CoordinatorMetadataStoreManagementDuty.class ); conditionalMetadataStoreManagementDutyMultibind.addConditionBinding( - "druid.coordinator.kill.rule.on", + "druid.coordinator.kill.supervisor.on", Predicates.equalTo("true"), KillSupervisors.class ); From f9c912c7289109782d46e69d286d19a35c0acb80 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 16:32:21 -0700 Subject: [PATCH 11/13] add comment --- .../druid/server/coordinator/duty/KillDatasourceMetadata.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java index 40fc5b263a55..9c9535b2502a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadata.java @@ -39,6 +39,8 @@ /** * CoordinatorDuty for automatic deletion of datasource metadata from the datasource table in metadata storage. * (Note: datasource metadata only exists for datasource created from supervisor). + * Note that this class relies on the supervisorSpec.getDataSources names to match with the + * 'datasource' column of the datasource metadata table. */ public class KillDatasourceMetadata implements CoordinatorDuty { From 50fd55b49d5d16013292b6663e418409014319f5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 22:25:34 -0700 Subject: [PATCH 12/13] fix test --- .../server/coordinator/duty/KillDatasourceMetadataTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index 2fef6cf2a2bc..1204857dfdb9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -179,8 +179,6 @@ public void testConstructorFailIfInvalidRetainDuration() @Test public void testRunWithEmptyFilterExcludedDatasource() { - Map existingSpecs = ImmutableMap.of(); - Mockito.when(mockMetadataSupervisorManager.getLatest()).thenReturn(existingSpecs); Mockito.when(mockDruidCoordinatorRuntimeParams.getEmitter()).thenReturn(mockServiceEmitter); TestDruidCoordinatorConfig druidCoordinatorConfig = new TestDruidCoordinatorConfig( From 118973d59657b7c9adc72a10b479128f9b34b4e1 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 10 May 2021 22:26:19 -0700 Subject: [PATCH 13/13] fix test --- .../server/coordinator/duty/KillDatasourceMetadataTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java index 1204857dfdb9..ff29136cc2e8 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillDatasourceMetadataTest.java @@ -19,10 +19,8 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataSupervisorManager; @@ -39,8 +37,6 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Map; - @RunWith(MockitoJUnitRunner.class) public class KillDatasourceMetadataTest {