From efa30cd535c29af9309ab8109acc0594b6fcbe39 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 10 Jun 2020 17:37:58 -0700 Subject: [PATCH 1/5] Fix broadcast rule drop and docs --- docs/operations/rule-configuration.md | 18 +- .../duty/UnloadUnusedSegments.java | 71 +++-- .../duty/UnloadUnusedSegmentsTest.java | 275 ++++++++++++++++++ 3 files changed, 333 insertions(+), 31 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java diff --git a/docs/operations/rule-configuration.md b/docs/operations/rule-configuration.md index 4228994a30d3..1ea074f72394 100644 --- a/docs/operations/rule-configuration.md +++ b/docs/operations/rule-configuration.md @@ -170,8 +170,9 @@ The interval of a segment will be compared against the specified period. The per ## Broadcast Rules -Broadcast rules indicate how segments of different datasources should be co-located in Historical processes. -Once a broadcast rule is configured for a datasource, all segments of the datasource are broadcasted to the servers holding _any segments_ of the co-located datasources. +Broadcast rules indicate that segments of a data source should be loaded by all servers of a cluster of the following types: historicals, brokers, tasks, and indexers. + +Note that the broadcast segments are only directly queryable through the historicals, but they are currently loaded on other server types to support join queries. ### Forever Broadcast Rule @@ -179,13 +180,13 @@ Forever broadcast rules are of the form: ```json { - "type" : "broadcastForever", - "colocatedDataSources" : [ "target_source1", "target_source2" ] + "type" : "broadcastForever" } ``` * `type` - this should always be "broadcastForever" -* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster. + +This rule applies to all segments of a datasource, covering all intervals. ### Interval Broadcast Rule @@ -194,13 +195,11 @@ Interval broadcast rules are of the form: ```json { "type" : "broadcastByInterval", - "colocatedDataSources" : [ "target_source1", "target_source2" ], "interval" : "2012-01-01/2013-01-01" } ``` * `type` - this should always be "broadcastByInterval" -* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster. * `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted. ### Period Broadcast Rule @@ -210,22 +209,17 @@ Period broadcast rules are of the form: ```json { "type" : "broadcastByPeriod", - "colocatedDataSources" : [ "target_source1", "target_source2" ], "period" : "P1M", "includeFuture" : true } ``` * `type` - this should always be "broadcastByPeriod" -* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster. * `period` - A JSON Object representing ISO-8601 Periods * `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true. The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval. -> broadcast rules don't guarantee that segments of the datasources are always co-located because segments for the colocated datasources are not loaded together atomically. -> If you want to always co-locate the segments of some datasources together, it is recommended to leave colocatedDataSources empty. - ## Permanently deleting data Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index bea7a9d1cc79..8b58e410f46a 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -33,7 +33,7 @@ import java.util.SortedSet; /** - * Unloads segments that are no longer marked as used from Historical servers. + * Unloads segments that are no longer marked as used from servers. */ public class UnloadUnusedSegments implements CoordinatorDuty { @@ -48,29 +48,62 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { - ImmutableDruidServer server = serverHolder.getServer(); + handleUnusedSegmentsForServer( + serverHolder, + usedSegments, + params, + stats + ); + } + } - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - for (DataSegment segment : dataSource.getSegments()) { - if (!usedSegments.contains(segment)) { - LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); + for (ServerHolder serverHolder : cluster.getBrokers()) { + handleUnusedSegmentsForServer( + serverHolder, + usedSegments, + params, + stats + ); + } - if (!queuePeon.getSegmentsToDrop().contains(segment)) { - queuePeon.dropSegment(segment, () -> {}); - stats.addToTieredStat("unneededCount", server.getTier(), 1); - log.info( - "Dropping uneeded segment [%s] from server [%s] in tier [%s]", - segment.getId(), - server.getName(), - server.getTier() - ); - } - } + for (ServerHolder serverHolder : cluster.getRealtimes()) { + handleUnusedSegmentsForServer( + serverHolder, + usedSegments, + params, + stats + ); + } + + return params.buildFromExisting().withCoordinatorStats(stats).build(); + } + + private void handleUnusedSegmentsForServer( + ServerHolder serverHolder, + Set usedSegments, + DruidCoordinatorRuntimeParams params, + CoordinatorStats stats + ) + { + ImmutableDruidServer server = serverHolder.getServer(); + + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + for (DataSegment segment : dataSource.getSegments()) { + if (!usedSegments.contains(segment)) { + LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); + + if (!queuePeon.getSegmentsToDrop().contains(segment)) { + queuePeon.dropSegment(segment, () -> {}); + stats.addToTieredStat("unneededCount", server.getTier(), 1); + log.info( + "Dropping uneeded segment [%s] from server [%s] in tier [%s]", + segment.getId(), + server.getName(), + server.getTier() + ); } } } } - - return params.buildFromExisting().withCoordinatorStats(stats).build(); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java new file mode 100644 index 000000000000..1bcfcb508c5a --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.duty; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.ImmutableDruidServerTests; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; +import org.apache.druid.server.coordinator.CoordinatorStats; +import org.apache.druid.server.coordinator.DruidClusterBuilder; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.LoadQueuePeonTester; +import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +public class UnloadUnusedSegmentsTest +{ + private DruidCoordinator coordinator; + private ImmutableDruidServer historicalServer; + private ImmutableDruidServer historicalServerTier2; + private ImmutableDruidServer brokerServer; + private ImmutableDruidServer indexerServer; + private LoadQueuePeonTester historicalPeon; + private LoadQueuePeonTester historicalTier2Peon; + private LoadQueuePeonTester brokerPeon; + private LoadQueuePeonTester indexerPeon; + private DataSegment segment1; + private DataSegment segment2; + private List segments; + private ImmutableDruidDataSource dataSource1; + private ImmutableDruidDataSource dataSource2; + private List dataSources; + + @Before + public void setUp() + { + coordinator = EasyMock.createMock(DruidCoordinator.class); + historicalServer = EasyMock.createMock(ImmutableDruidServer.class); + historicalServerTier2 = EasyMock.createMock(ImmutableDruidServer.class); + brokerServer = EasyMock.createMock(ImmutableDruidServer.class); + indexerServer = EasyMock.createMock(ImmutableDruidServer.class); + segment1 = EasyMock.createMock(DataSegment.class); + segment2 = EasyMock.createMock(DataSegment.class); + + DateTime start1 = DateTimes.of("2012-01-01"); + DateTime start2 = DateTimes.of("2012-02-01"); + DateTime version = DateTimes.of("2012-03-01"); + segment1 = new DataSegment( + "datasource1", + new Interval(start1, start1.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 11L + ); + segment2 = new DataSegment( + "datasource2", + new Interval(start2, start2.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 7L + ); + + segments = new ArrayList<>(); + segments.add(segment1); + segments.add(segment2); + + historicalPeon = new LoadQueuePeonTester(); + historicalTier2Peon = new LoadQueuePeonTester(); + brokerPeon = new LoadQueuePeonTester(); + indexerPeon = new LoadQueuePeonTester(); + + dataSource1 = new ImmutableDruidDataSource( + "dataSource1", + Collections.emptyMap(), + Collections.singleton(segment1) + ); + + dataSource2 = new ImmutableDruidDataSource( + "dataSource1", + Collections.emptyMap(), + Collections.singleton(segment2) + ); + + dataSources = ImmutableList.of(dataSource1, dataSource2); + } + + @After + public void tearDown() + { + EasyMock.verify(coordinator); + EasyMock.verify(historicalServer); + EasyMock.verify(historicalServerTier2); + EasyMock.verify(brokerServer); + EasyMock.verify(indexerServer); + } + + @Test + public void test_unloadUnusedSegmentsFromAllServers() + { + mockDruidServer( + historicalServer, + ServerType.HISTORICAL, + "historical", + DruidServer.DEFAULT_TIER, + 30L, + 100L, + segments, + dataSources + ); + mockDruidServer( + historicalServerTier2, + ServerType.HISTORICAL, + "historicalTier2", + "tier2", + 30L, + 100L, + segments, + dataSources + ); + mockDruidServer( + brokerServer, + ServerType.BROKER, + "broker", + DruidServer.DEFAULT_TIER, + 30L, + 100L, + segments, + dataSources + ); + mockDruidServer( + indexerServer, + ServerType.INDEXER_EXECUTOR, + "indexer", + DruidServer.DEFAULT_TIER, + 30L, + 100L, + segments, + dataSources + ); + + // Mock stuff that the coordinator needs + mockCoordinator(coordinator); + + // We keep datasource2, drop datasource1 from all servers + Set usedSegments = Collections.singleton(segment2); + + DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers + .newBuilder() + .withDruidCluster( + DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder(historicalServer, historicalPeon, false) + ) + .addTier( + "tier2", + new ServerHolder(historicalServerTier2, historicalTier2Peon, false) + ) + .withBrokers( + new ServerHolder(brokerServer, brokerPeon, false) + ) + .withRealtimes( + new ServerHolder(indexerServer, indexerPeon, false) + ) + .build() + ) + .withLoadManagementPeons( + ImmutableMap.of( + "historical", historicalPeon, + "historicalTier2", historicalTier2Peon, + "broker", brokerPeon, + "indexer", indexerPeon + ) + ) + .withUsedSegmentsInTest(usedSegments) + .build(); + + params = new UnloadUnusedSegments().run(params); + CoordinatorStats stats = params.getCoordinatorStats(); + Assert.assertEquals(3, stats.getTieredStat("unneededCount", DruidServer.DEFAULT_TIER)); + Assert.assertEquals(1, stats.getTieredStat("unneededCount", "tier2")); + Assert.assertEquals(historicalPeon.getSegmentsToDrop(), Collections.singleton(segment1)); + Assert.assertEquals(historicalTier2Peon.getSegmentsToDrop(), Collections.singleton(segment1)); + Assert.assertEquals(brokerPeon.getSegmentsToDrop(), Collections.singleton(segment1)); + Assert.assertEquals(indexerPeon.getSegmentsToDrop(), Collections.singleton(segment1)); + + } + + private static void mockDruidServer( + ImmutableDruidServer druidServer, + ServerType serverType, + String name, + String tier, + long currentSize, + long maxSize, + List segments, + List dataSources + ) + { + EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes(); + EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes(); + EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).anyTimes(); + EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).anyTimes(); + ImmutableDruidServerTests.expectSegments(druidServer, segments); + EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes(); + EasyMock.expect(druidServer.getType()).andReturn(serverType).anyTimes(); + EasyMock.expect(druidServer.getDataSources()).andReturn(dataSources).anyTimes(); + if (!segments.isEmpty()) { + segments.forEach( + s -> EasyMock.expect(druidServer.getSegment(s.getId())).andReturn(s).anyTimes() + ); + } + EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + EasyMock.replay(druidServer); + } + + private static void mockCoordinator(DruidCoordinator coordinator) + { + coordinator.moveSegment( + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject(), + EasyMock.anyObject() + ); + EasyMock.expectLastCall().anyTimes(); + EasyMock.replay(coordinator); + } +} From 47b2cc55b7039c0e3e6d08f4c43b68f1f476da06 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 10 Jun 2020 21:03:42 -0700 Subject: [PATCH 2/5] Remove racy test check --- .../server/coordinator/duty/UnloadUnusedSegmentsTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java index 1bcfcb508c5a..2bd8fa66a1e1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -225,11 +225,6 @@ public void test_unloadUnusedSegmentsFromAllServers() CoordinatorStats stats = params.getCoordinatorStats(); Assert.assertEquals(3, stats.getTieredStat("unneededCount", DruidServer.DEFAULT_TIER)); Assert.assertEquals(1, stats.getTieredStat("unneededCount", "tier2")); - Assert.assertEquals(historicalPeon.getSegmentsToDrop(), Collections.singleton(segment1)); - Assert.assertEquals(historicalTier2Peon.getSegmentsToDrop(), Collections.singleton(segment1)); - Assert.assertEquals(brokerPeon.getSegmentsToDrop(), Collections.singleton(segment1)); - Assert.assertEquals(indexerPeon.getSegmentsToDrop(), Collections.singleton(segment1)); - } private static void mockDruidServer( From af1aff4eb35d9135af1939f05a3c40843afc4555 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 11 Jun 2020 00:10:39 -0700 Subject: [PATCH 3/5] Don't drop non-broadcast segments on tasks, add overshadowing handling --- .../MarkAsUnusedOvershadowedSegments.java | 36 ++++++--- .../duty/UnloadUnusedSegments.java | 24 +++++- .../MarkAsUnusedOvershadowedSegmentsTest.java | 17 ++++- .../duty/UnloadUnusedSegmentsTest.java | 75 ++++++++++++++++--- 4 files changed, 126 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java index febf9a4deafd..3c08a0adb5d7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java @@ -58,19 +58,18 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { - ImmutableDruidServer server = serverHolder.getServer(); - - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { - VersionedIntervalTimeline timeline = timelines - .computeIfAbsent( - dataSource.getName(), - dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder()) - ); - VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator()); - } + addSegmentsFromServer(serverHolder, timelines); } } + for (ServerHolder serverHolder : cluster.getBrokers()) { + addSegmentsFromServer(serverHolder, timelines); + } + + for (ServerHolder serverHolder : cluster.getRealtimes()) { + addSegmentsFromServer(serverHolder, timelines); + } + // Mark all segments as unused in db that are overshadowed by served segments for (DataSegment dataSegment : params.getUsedSegments()) { VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); @@ -83,4 +82,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) return params.buildFromExisting().withCoordinatorStats(stats).build(); } + + private void addSegmentsFromServer( + ServerHolder serverHolder, + Map> timelines + ) + { + ImmutableDruidServer server = serverHolder.getServer(); + + for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + VersionedIntervalTimeline timeline = timelines + .computeIfAbsent( + dataSource.getName(), + dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder()) + ); + VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator()); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 8b58e410f46a..7c89cfdf09d6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -52,7 +52,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) serverHolder, usedSegments, params, - stats + stats, + false ); } } @@ -62,7 +63,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) serverHolder, usedSegments, params, - stats + stats, + false ); } @@ -71,7 +73,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) serverHolder, usedSegments, params, - stats + stats, + true ); } @@ -82,12 +85,25 @@ private void handleUnusedSegmentsForServer( ServerHolder serverHolder, Set usedSegments, DruidCoordinatorRuntimeParams params, - CoordinatorStats stats + CoordinatorStats stats, + boolean dropBroadcastOnly ) { ImmutableDruidServer server = serverHolder.getServer(); for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + // The coordinator tracks used segments by examining the metadata store. + // For tasks, the segments they create are unpublished, so those segments will get dropped + // unless we exclude them here. We currently drop only broadcast segments in that case. + // This check relies on the assumption that queryable stream tasks will never + // ingest data to a broadcast datasource. If a broadcast datasource is switched to become a non-broadcast + // datasource, this will result in the those segments not being dropped from tasks. + // A more robust solution which requires a larger rework could be to expose + // the set of segments that were created by a task/indexer here, and exclude them. + if (dropBroadcastOnly && !params.getBroadcastDatasources().contains(dataSource.getName())) { + continue; + } + for (DataSegment segment : dataSource.getSegments()) { if (!usedSegments.contains(segment)) { LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java index 73b95ee960ce..3e06388a3794 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java @@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.DateTimes; @@ -39,9 +41,11 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; +import org.junit.runner.RunWith; import java.util.List; +@RunWith(JUnitParamsRunner.class) public class MarkAsUnusedOvershadowedSegmentsTest { MarkAsUnusedOvershadowedSegments markAsUnusedOvershadowedSegments; @@ -69,8 +73,17 @@ public class MarkAsUnusedOvershadowedSegmentsTest .build(); @Test - public void testRun() + @Parameters( + { + "historical", + "broker", + "indexer-executor" + } + ) + public void testRun(String serverTypeString) { + ServerType serverType = ServerType.fromString(serverTypeString); + markAsUnusedOvershadowedSegments = new MarkAsUnusedOvershadowedSegments(coordinator); usedSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); @@ -95,7 +108,7 @@ public void testRun() .andReturn("") .anyTimes(); EasyMock.expect(druidServer.getType()) - .andReturn(ServerType.HISTORICAL) + .andReturn(serverType) .anyTimes(); EasyMock.expect(druidServer.getDataSources()) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java index 2bd8fa66a1e1..0b48de2a9336 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; @@ -63,10 +64,17 @@ public class UnloadUnusedSegmentsTest private LoadQueuePeonTester indexerPeon; private DataSegment segment1; private DataSegment segment2; + private DataSegment broadcastSegment; + private DataSegment realtimeOnlySegment; private List segments; + private List segmentsForRealtime; private ImmutableDruidDataSource dataSource1; private ImmutableDruidDataSource dataSource2; + private ImmutableDruidDataSource dataSource2ForRealtime; + private ImmutableDruidDataSource broadcastDatasource; private List dataSources; + private List dataSourcesForRealtime; + private Set broadcastDatasourceNames; @Before public void setUp() @@ -81,7 +89,7 @@ public void setUp() DateTime start1 = DateTimes.of("2012-01-01"); DateTime start2 = DateTimes.of("2012-02-01"); - DateTime version = DateTimes.of("2012-03-01"); + DateTime version = DateTimes.of("2012-05-01"); segment1 = new DataSegment( "datasource1", new Interval(start1, start1.plusHours(1)), @@ -94,6 +102,17 @@ public void setUp() 11L ); segment2 = new DataSegment( + "datasource2", + new Interval(start1, start1.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 7L + ); + realtimeOnlySegment = new DataSegment( "datasource2", new Interval(start2, start2.plusHours(1)), version.toString(), @@ -104,10 +123,26 @@ public void setUp() 0, 7L ); + broadcastSegment = new DataSegment( + "broadcastDatasource", + new Interval(start1, start1.plusHours(1)), + version.toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 7L + ); segments = new ArrayList<>(); segments.add(segment1); segments.add(segment2); + segments.add(broadcastSegment); + + segmentsForRealtime = new ArrayList<>(); + segmentsForRealtime.add(realtimeOnlySegment); + segmentsForRealtime.add(broadcastSegment); historicalPeon = new LoadQueuePeonTester(); historicalTier2Peon = new LoadQueuePeonTester(); @@ -119,14 +154,29 @@ public void setUp() Collections.emptyMap(), Collections.singleton(segment1) ); - dataSource2 = new ImmutableDruidDataSource( - "dataSource1", + "dataSource2", Collections.emptyMap(), Collections.singleton(segment2) ); - dataSources = ImmutableList.of(dataSource1, dataSource2); + broadcastDatasourceNames = Collections.singleton("broadcastDatasource"); + broadcastDatasource = new ImmutableDruidDataSource( + "broadcastDatasource", + Collections.emptyMap(), + Collections.singleton(broadcastSegment) + ); + + dataSources = ImmutableList.of(dataSource1, dataSource2, broadcastDatasource); + + // This simulates a task that is ingesting to an existing non-broadcast datasource, with unpublished segments, + // while also having a broadcast segment loaded. + dataSource2ForRealtime = new ImmutableDruidDataSource( + "dataSource2", + Collections.emptyMap(), + Collections.singleton(realtimeOnlySegment) + ); + dataSourcesForRealtime = ImmutableList.of(dataSource2ForRealtime, broadcastDatasource); } @After @@ -179,15 +229,17 @@ public void test_unloadUnusedSegmentsFromAllServers() DruidServer.DEFAULT_TIER, 30L, 100L, - segments, - dataSources + segmentsForRealtime, + dataSourcesForRealtime ); // Mock stuff that the coordinator needs mockCoordinator(coordinator); - // We keep datasource2, drop datasource1 from all servers - Set usedSegments = Collections.singleton(segment2); + // We keep datasource2 segments only, drop datasource1 and broadcastDatasource from all servers + // realtimeSegment is intentionally missing from the set, to match how a realtime tasks's unpublished segments + // will not appear in the coordinator's view of used segments. + Set usedSegments = ImmutableSet.of(segment2); DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers .newBuilder() @@ -219,12 +271,15 @@ public void test_unloadUnusedSegmentsFromAllServers() ) ) .withUsedSegmentsInTest(usedSegments) + .withBroadcastDatasources(broadcastDatasourceNames) .build(); params = new UnloadUnusedSegments().run(params); CoordinatorStats stats = params.getCoordinatorStats(); - Assert.assertEquals(3, stats.getTieredStat("unneededCount", DruidServer.DEFAULT_TIER)); - Assert.assertEquals(1, stats.getTieredStat("unneededCount", "tier2")); + + // We drop segment1 and broadcast1 from all servers, realtimeSegment is not dropped by the indexer + Assert.assertEquals(5, stats.getTieredStat("unneededCount", DruidServer.DEFAULT_TIER)); + Assert.assertEquals(2, stats.getTieredStat("unneededCount", "tier2")); } private static void mockDruidServer( From 75eb2877010789ed6f6b19a842a87ca8f1cdd243 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 11 Jun 2020 00:20:51 -0700 Subject: [PATCH 4/5] Don't use realtimes for overshadowing --- .../coordinator/duty/MarkAsUnusedOvershadowedSegments.java | 5 ++--- .../duty/MarkAsUnusedOvershadowedSegmentsTest.java | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java index 3c08a0adb5d7..e278a7582302 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegments.java @@ -66,9 +66,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) addSegmentsFromServer(serverHolder, timelines); } - for (ServerHolder serverHolder : cluster.getRealtimes()) { - addSegmentsFromServer(serverHolder, timelines); - } + // Note that we do not include segments from ingestion services such as tasks or indexers, + // to prevent unpublished segments from prematurely overshadowing segments. // Mark all segments as unused in db that are overshadowed by served segments for (DataSegment dataSegment : params.getUsedSegments()) { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java index 3e06388a3794..301b3bd8f9f9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkAsUnusedOvershadowedSegmentsTest.java @@ -76,8 +76,7 @@ public class MarkAsUnusedOvershadowedSegmentsTest @Parameters( { "historical", - "broker", - "indexer-executor" + "broker" } ) public void testRun(String serverTypeString) From e9519fdc28aa0237d00c9616e97ef0cd4fdff119 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 11 Jun 2020 02:15:27 -0700 Subject: [PATCH 5/5] Fix dropping for ingestion services --- .../server/coordinator/duty/RunRules.java | 21 ++++++--- .../duty/UnloadUnusedSegments.java | 39 +++++++++++++--- .../duty/UnloadUnusedSegmentsTest.java | 45 +++++++++++++++++-- 3 files changed, 90 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 3dc7b4d2f918..d8c207148ffe 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.duty; import com.google.common.collect.Lists; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -103,7 +104,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final List segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES); int missingRules = 0; + final Set broadcastDatasources = new HashSet<>(); + for (ImmutableDruidDataSource dataSource : params.getDataSourcesSnapshot().getDataSourcesMap().values()) { + List rules = databaseRuleManager.getRulesWithDefault(dataSource.getName()); + for (Rule rule : rules) { + // A datasource is considered a broadcast datasource if it has any broadcast rules. + // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules + // executes before BalanceSegments. + if (rule instanceof BroadcastDistributionRule) { + broadcastDatasources.add(dataSource.getName()); + break; + } + } + } + for (DataSegment segment : params.getUsedSegments()) { if (overshadowed.contains(segment.getId())) { // Skipping overshadowed segments @@ -115,12 +130,6 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if (rule.appliesTo(segment, now)) { stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment)); foundMatchingRule = true; - - // The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules - // executes before BalanceSegments - if (rule instanceof BroadcastDistributionRule) { - broadcastDatasources.add(segment.getDataSource()); - } break; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 7c89cfdf09d6..bd8b2c30d550 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -27,8 +27,13 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.LoadQueuePeon; import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -46,6 +51,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) Set usedSegments = params.getUsedSegments(); DruidCluster cluster = params.getDruidCluster(); + Map broadcastStatusByDatasource = new HashMap<>(); + for (String broadcastDatasource : params.getBroadcastDatasources()) { + broadcastStatusByDatasource.put(broadcastDatasource, true); + } + for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { handleUnusedSegmentsForServer( @@ -53,7 +63,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) usedSegments, params, stats, - false + false, + broadcastStatusByDatasource ); } } @@ -64,7 +75,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) usedSegments, params, stats, - false + false, + broadcastStatusByDatasource ); } @@ -74,7 +86,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) usedSegments, params, stats, - true + true, + broadcastStatusByDatasource ); } @@ -86,12 +99,26 @@ private void handleUnusedSegmentsForServer( Set usedSegments, DruidCoordinatorRuntimeParams params, CoordinatorStats stats, - boolean dropBroadcastOnly + boolean dropBroadcastOnly, + Map broadcastStatusByDatasource ) { ImmutableDruidServer server = serverHolder.getServer(); - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent( + dataSource.getName(), + (dataSourceName) -> { + List rules = params.getDatabaseRuleManager().getRulesWithDefault(dataSource.getName()); + for (Rule rule : rules) { + // A datasource is considered a broadcast datasource if it has any broadcast rules. + if (rule instanceof BroadcastDistributionRule) { + return true; + } + } + return false; + } + ); + // The coordinator tracks used segments by examining the metadata store. // For tasks, the segments they create are unpublished, so those segments will get dropped // unless we exclude them here. We currently drop only broadcast segments in that case. @@ -100,7 +127,7 @@ private void handleUnusedSegmentsForServer( // datasource, this will result in the those segments not being dropped from tasks. // A more robust solution which requires a larger rework could be to expose // the set of segments that were created by a task/indexer here, and exclude them. - if (dropBroadcastOnly && !params.getBroadcastDatasources().contains(dataSource.getName())) { + if (dropBroadcastOnly && !isBroadcastDatasource) { continue; } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java index 0b48de2a9336..f74762c7ad9d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegmentsTest.java @@ -27,6 +27,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ImmutableDruidServerTests; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers; import org.apache.druid.server.coordinator.CoordinatorStats; @@ -35,6 +36,8 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.LoadQueuePeonTester; import org.apache.druid.server.coordinator.ServerHolder; +import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; +import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -75,6 +78,7 @@ public class UnloadUnusedSegmentsTest private List dataSources; private List dataSourcesForRealtime; private Set broadcastDatasourceNames; + private MetadataRuleManager databaseRuleManager; @Before public void setUp() @@ -86,6 +90,7 @@ public void setUp() indexerServer = EasyMock.createMock(ImmutableDruidServer.class); segment1 = EasyMock.createMock(DataSegment.class); segment2 = EasyMock.createMock(DataSegment.class); + databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); DateTime start1 = DateTimes.of("2012-01-01"); DateTime start2 = DateTimes.of("2012-02-01"); @@ -150,12 +155,12 @@ public void setUp() indexerPeon = new LoadQueuePeonTester(); dataSource1 = new ImmutableDruidDataSource( - "dataSource1", + "datasource1", Collections.emptyMap(), Collections.singleton(segment1) ); dataSource2 = new ImmutableDruidDataSource( - "dataSource2", + "datasource2", Collections.emptyMap(), Collections.singleton(segment2) ); @@ -172,7 +177,7 @@ public void setUp() // This simulates a task that is ingesting to an existing non-broadcast datasource, with unpublished segments, // while also having a broadcast segment loaded. dataSource2ForRealtime = new ImmutableDruidDataSource( - "dataSource2", + "datasource2", Collections.emptyMap(), Collections.singleton(realtimeOnlySegment) ); @@ -187,6 +192,7 @@ public void tearDown() EasyMock.verify(historicalServerTier2); EasyMock.verify(brokerServer); EasyMock.verify(indexerServer); + EasyMock.verify(databaseRuleManager); } @Test @@ -236,6 +242,8 @@ public void test_unloadUnusedSegmentsFromAllServers() // Mock stuff that the coordinator needs mockCoordinator(coordinator); + mockRuleManager(databaseRuleManager); + // We keep datasource2 segments only, drop datasource1 and broadcastDatasource from all servers // realtimeSegment is intentionally missing from the set, to match how a realtime tasks's unpublished segments // will not appear in the coordinator's view of used segments. @@ -272,6 +280,7 @@ public void test_unloadUnusedSegmentsFromAllServers() ) .withUsedSegmentsInTest(usedSegments) .withBroadcastDatasources(broadcastDatasourceNames) + .withDatabaseRuleManager(databaseRuleManager) .build(); params = new UnloadUnusedSegments().run(params); @@ -322,4 +331,34 @@ private static void mockCoordinator(DruidCoordinator coordinator) EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); } + + private static void mockRuleManager(MetadataRuleManager metadataRuleManager) + { + EasyMock.expect(metadataRuleManager.getRulesWithDefault("datasource1")).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, 1, + "tier2", 1 + ) + ) + )).anyTimes(); + + EasyMock.expect(metadataRuleManager.getRulesWithDefault("datasource2")).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of( + DruidServer.DEFAULT_TIER, 1, + "tier2", 1 + ) + ) + )).anyTimes(); + + EasyMock.expect(metadataRuleManager.getRulesWithDefault("broadcastDatasource")).andReturn( + Collections.singletonList( + new ForeverBroadcastDistributionRule() + )).anyTimes(); + + EasyMock.replay(metadataRuleManager); + } }