diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 38f715f848aa..653dc8a08aae 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -21,13 +21,15 @@ import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.util.ArrayList; import java.util.List; public class StubServiceEmitter extends ServiceEmitter { - private List events = new ArrayList<>(); + private final List events = new ArrayList<>(); + private final List metricEvents = new ArrayList<>(); public StubServiceEmitter(String service, String host) { @@ -37,14 +39,28 @@ public StubServiceEmitter(String service, String host) @Override public void emit(Event event) { + if (event instanceof ServiceMetricEvent) { + metricEvents.add((ServiceMetricEvent) event); + } events.add(event); } + /** + * Gets all the events emitted since the previous {@link #flush()}. + */ public List getEvents() { return events; } + /** + * Gets all the metric events emitted since the previous {@link #flush()}. + */ + public List getMetricEvents() + { + return metricEvents; + } + @Override public void start() { @@ -53,6 +69,8 @@ public void start() @Override public void flush() { + events.clear(); + metricEvents.clear(); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 6b1e29d49167..df13471aa819 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -973,6 +973,14 @@ public List getDuties() { return duties; } + + @Override + public String toString() + { + return "DutiesRunnable{" + + "dutiesRunnableAlias='" + dutiesRunnableAlias + '\'' + + '}'; + } } /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index d2a1c4c8daaf..198a7cf5e8f6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -92,6 +92,7 @@ private void balanceTier( ) { + log.info("Balancing segments in tier [%s]", tier); if (params.getUsedSegments().size() == 0) { log.info("Metadata segments are not available. Cannot balance."); // suppress emit zero stats diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index d4a89abb3d23..a7c594fe094c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -30,7 +30,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; -import org.hamcrest.Matchers; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; @@ -277,9 +276,9 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() params = new BalanceSegmentsTester(coordinator).run(params); EasyMock.verify(strategy); Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); - Assert.assertThat( - peon3.getSegmentsToLoad(), - Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1, segment3, segment4))) + Assert.assertEquals( + ImmutableSet.of(segment1, segment3, segment4), + peon3.getSegmentsToLoad() ); } @@ -289,7 +288,7 @@ public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove() DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(0); params = new BalanceSegmentsTester(coordinator).run(params); Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); - Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1)))); + Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad()); } @Test @@ -298,7 +297,7 @@ public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove() DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10); params = new BalanceSegmentsTester(coordinator).run(params); Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); - Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2)))); + Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad()); } /** @@ -347,9 +346,9 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi params = new BalanceSegmentsTester(coordinator).run(params); EasyMock.verify(strategy); Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); - Assert.assertThat( - peon3.getSegmentsToLoad(), - Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2, segment3, segment4))) + Assert.assertEquals( + ImmutableSet.of(segment2, segment3, segment4), + peon3.getSegmentsToLoad() ); } @@ -603,10 +602,7 @@ public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() params = new BalanceSegmentsTester(coordinator).run(params); EasyMock.verify(strategy); Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); - Assert.assertThat( - peon3.getSegmentsToLoad(), - Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3))) - ); + Assert.assertEquals(ImmutableSet.of(segment3), peon3.getSegmentsToLoad()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java new file mode 100644 index 000000000000..8f2c1238915d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Test utility to create {@link DataSegment}s for a given datasource. + */ +public class CreateDataSegments +{ + private final String datasource; + + private DateTime startTime; + private Granularity granularity; + private int numPartitions; + private int numIntervals; + + public static CreateDataSegments ofDatasource(String datasource) + { + return new CreateDataSegments(datasource); + } + + private CreateDataSegments(String datasource) + { + this.datasource = datasource; + } + + public CreateDataSegments forIntervals(int numIntervals, Granularity intervalSize) + { + this.numIntervals = numIntervals; + this.granularity = intervalSize; + return this; + } + + public CreateDataSegments startingAt(String startOfFirstInterval) + { + this.startTime = DateTimes.of(startOfFirstInterval); + return this; + } + + public CreateDataSegments withNumPartitions(int numPartitions) + { + this.numPartitions = numPartitions; + return this; + } + + public List eachOfSizeInMb(long sizeMb) + { + final List segments = new ArrayList<>(); + + int uniqueIdInInterval = 0; + DateTime nextStart = startTime; + for (int numInterval = 0; numInterval < numIntervals; ++numInterval) { + Interval nextInterval = new Interval(nextStart, granularity.increment(nextStart)); + for (int numPartition = 0; numPartition < numPartitions; ++numPartition) { + segments.add( + new NumberedDataSegment( + datasource, + nextInterval, + new NumberedShardSpec(numPartition, numPartitions), + ++uniqueIdInInterval, + sizeMb + ) + ); + } + nextStart = granularity.increment(nextStart); + } + + return Collections.unmodifiableList(segments); + } + + /** + * Simple implementation of DataSegment with a unique integer id to make debugging easier. + */ + private static class NumberedDataSegment extends DataSegment + { + private final int uniqueId; + + private NumberedDataSegment( + String datasource, + Interval interval, + NumberedShardSpec shardSpec, + int uinqueId, + long size + ) + { + super( + datasource, + interval, + "1", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + shardSpec, + IndexIO.CURRENT_VERSION_ID, + size + ); + this.uniqueId = uinqueId; + } + + @Override + public String toString() + { + return "{" + getDataSource() + "::" + uniqueId + "}"; + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index dc4a6f0abe43..eb3be4c89510 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -26,6 +26,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; @@ -41,8 +42,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; -import org.joda.time.DateTime; -import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -81,24 +80,11 @@ public void setUp() databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class); - DateTime start = DateTimes.of("2012-01-01"); - usedSegments = new ArrayList<>(); - for (int i = 0; i < 24; i++) { - usedSegments.add( - new DataSegment( - "test", - new Interval(start, start.plusHours(1)), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - IndexIO.CURRENT_VERSION_ID, - 1 - ) - ); - start = start.plusHours(1); - } + usedSegments = CreateDataSegments.ofDatasource("test") + .forIntervals(24, Granularities.HOUR) + .startingAt("2012-01-01") + .withNumPartitions(1) + .eachOfSizeInMb(1); ruleRunner = new RunRules(new ReplicationThrottler(24, 1, false), coordinator); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java new file mode 100644 index 000000000000..fc59a6bd9d43 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * An executor that keeps submitted tasks in a queue until they are explicitly + * invoked by calling one of these methods: + *
    + *
  • {@link #finishNextPendingTask()}
  • + *
  • {@link #finishNextPendingTasks(int)}
  • + *
  • {@link #finishAllPendingTasks()}
  • + *
+ */ +public class BlockingExecutorService implements ExecutorService +{ + private static final Logger log = new Logger(BlockingExecutorService.class); + + private final String nameFormat; + private final Queue> taskQueue = new ConcurrentLinkedQueue<>(); + + public BlockingExecutorService(String nameFormat) + { + this.nameFormat = nameFormat; + } + + public boolean hasPendingTasks() + { + return !taskQueue.isEmpty(); + } + + /** + * Executes the next pending task on the calling thread itself. + */ + public int finishNextPendingTask() + { + log.debug("[%s] Executing next pending task", nameFormat); + Task task = taskQueue.poll(); + if (task != null) { + task.executeNow(); + return 1; + } else { + return 0; + } + } + + /** + * Executes the next {@code numTasksToExecute} pending tasks on the calling + * thread itself. + */ + public int finishNextPendingTasks(int numTasksToExecute) + { + log.debug("[%s] Executing %d pending tasks", nameFormat, numTasksToExecute); + int executedTaskCount = 0; + for (; executedTaskCount < numTasksToExecute; ++executedTaskCount) { + Task task = taskQueue.poll(); + if (task == null) { + break; + } else { + task.executeNow(); + } + } + return executedTaskCount; + } + + /** + * Executes all the remaining pending tasks on the calling thread itself. + *

+ * Note: This method can keep running forever if another thread keeps submitting + * new tasks to the executor. + */ + public int finishAllPendingTasks() + { + log.debug("[%s] Executing all pending tasks", nameFormat); + Task task; + int executedTaskCount = 0; + while ((task = taskQueue.poll()) != null) { + task.executeNow(); + ++executedTaskCount; + } + + return executedTaskCount; + } + + // Task submission operations + @Override + public Future submit(Callable task) + { + return addTaskToQueue(task); + } + + @Override + public Future submit(Runnable task, T result) + { + return addTaskToQueue(() -> { + task.run(); + return result; + }); + } + + @Override + public Future submit(Runnable task) + { + return addTaskToQueue(() -> { + task.run(); + return null; + }); + } + + @Override + public void execute(Runnable command) + { + submit(command); + } + + private Future addTaskToQueue(Callable callable) + { + Task task = new Task<>(callable); + taskQueue.add(task); + return task.future; + } + + // Termination operations + @Override + public void shutdown() + { + taskQueue.clear(); + } + + @Override + public List shutdownNow() + { + return null; + } + + @Override + public boolean isShutdown() + { + return false; + } + + @Override + public boolean isTerminated() + { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + { + return false; + } + + // Unsupported operations + @Override + public List> invokeAll(Collection> tasks) + { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll( + Collection> tasks, + long timeout, + TimeUnit unit + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks) + { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + { + throw new UnsupportedOperationException(); + } + + /** + * Task that can be invoked to complete the corresponding future. + */ + private static class Task + { + private final Callable callable; + private final CompletableFuture future = new CompletableFuture<>(); + + private Task(Callable callable) + { + this.callable = callable; + } + + private void executeNow() + { + try { + T result = callable.call(); + future.complete(result); + } + catch (Exception e) { + throw new ISE("Error while executing task", e); + } + } + } + +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java new file mode 100644 index 000000000000..6822419f79a0 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import java.util.List; + +/** + * Runner for a coordinator simulation. + */ +public interface CoordinatorSimulation +{ + /** + * Starts the simulation if not already started. + */ + void start(); + + /** + * Stops the simulation. + */ + void stop(); + + /** + * State of the coordinator during the simulation. + */ + CoordinatorState coordinator(); + + /** + * State of the cluster during the simulation. + */ + ClusterState cluster(); + + static CoordinatorSimulationBuilder builder() + { + return new CoordinatorSimulationBuilder(); + } + + interface CoordinatorState + { + /** + * Runs a single coordinator cycle. + */ + void runCoordinatorCycle(); + + /** + * Synchronizes the inventory view maintained by the coordinator with the + * actual state of the cluster. + */ + void syncInventoryView(); + + /** + * Sets the CoordinatorDynamicConfig. + */ + void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig); + + /** + * Gets the inventory view of the specified server as maintained by the + * coordinator. + */ + DruidServer getInventoryView(String serverName); + + /** + * Returns the metric events emitted in the previous coordinator run. + */ + List getMetricEvents(); + + /** + * Gets the load percentage of the specified datasource as seen by the coordinator. + */ + double getLoadPercentage(String datasource); + } + + interface ClusterState + { + /** + * Finishes load of all the segments that were queued in the previous + * coordinator run. Also handles the responses and executes the respective + * callbacks on the coordinator. + */ + void loadQueuedSegments(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java new file mode 100644 index 000000000000..60e8e428244c --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.server.coordinator.rules.ForeverLoadRule; +import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.timeline.DataSegment; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base test for coordinator simulations. + *

+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start + * the simulation. {@link CoordinatorSimulation#stop()} should not be called as + * the simulation is stopped when cleaning up after the test in {@link #tearDown()}. + *

+ * Tests that verify balancing behaviour should set + * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true. + * Otherwise, the segment sampling is random and can produce repeated values + * leading to flakiness in the tests. The simulation sets this field to true by + * default. + */ +public abstract class CoordinatorSimulationBaseTest + implements CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState +{ + static final double DOUBLE_DELTA = 10e-9; + + private CoordinatorSimulation sim; + private final Map> latestMetricEvents = new HashMap<>(); + + @Before + public abstract void setUp(); + + @After + public void tearDown() + { + if (sim != null) { + sim.stop(); + sim = null; + } + } + + /** + * This must be called to start the simulation and set the correct state. + */ + void startSimulation(CoordinatorSimulation simulation) + { + this.sim = simulation; + simulation.start(); + } + + @Override + public void runCoordinatorCycle() + { + latestMetricEvents.clear(); + sim.coordinator().runCoordinatorCycle(); + + // Extract the metric values of this run + for (ServiceMetricEvent event : sim.coordinator().getMetricEvents()) { + latestMetricEvents.computeIfAbsent(event.getMetric(), m -> new ArrayList<>()) + .add(event); + } + } + + @Override + public List getMetricEvents() + { + return sim.coordinator().getMetricEvents(); + } + + @Override + public DruidServer getInventoryView(String serverName) + { + return sim.coordinator().getInventoryView(serverName); + } + + @Override + public void syncInventoryView() + { + sim.coordinator().syncInventoryView(); + } + + @Override + public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig) + { + sim.coordinator().setDynamicConfig(dynamicConfig); + } + + @Override + public void loadQueuedSegments() + { + sim.cluster().loadQueuedSegments(); + } + + @Override + public double getLoadPercentage(String datasource) + { + return sim.coordinator().getLoadPercentage(datasource); + } + + // Verification methods + void verifyDatasourceIsFullyLoaded(String datasource) + { + Assert.assertEquals(100.0, getLoadPercentage(datasource), DOUBLE_DELTA); + } + + void verifyNoEvent(String metricName) + { + Assert.assertTrue(getMetricValues(metricName, null).isEmpty()); + } + + /** + * Verifies the value of the specified metric emitted in the previous run. + */ + void verifyValue(String metricName, Number expectedValue) + { + verifyValue(metricName, null, expectedValue); + } + + /** + * Verifies the value of the event corresponding to the specified metric and + * dimensionFilters emitted in the previous run. + */ + void verifyValue(String metricName, Map dimensionFilters, Number expectedValue) + { + Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters)); + } + + /** + * Gets the value of the event corresponding to the specified metric and + * dimensionFilters emitted in the previous run. + */ + Number getValue(String metricName, Map dimensionFilters) + { + List values = getMetricValues(metricName, dimensionFilters); + Assert.assertEquals( + "Metric must have been emitted exactly once for the given dimensions.", + 1, + values.size() + ); + return values.get(0); + } + + private List getMetricValues(String metricName, Map dimensionFilters) + { + final List values = new ArrayList<>(); + final List events = latestMetricEvents.getOrDefault(metricName, Collections.emptyList()); + final Map filters = dimensionFilters == null + ? Collections.emptyMap() : dimensionFilters; + for (ServiceMetricEvent event : events) { + final Map userDims = event.getUserDims(); + boolean match = filters.keySet().stream() + .map(d -> filters.get(d).equals(userDims.get(d))) + .reduce((a, b) -> a && b) + .orElse(true); + if (match) { + values.add(event.getValue()); + } + } + + return values; + } + + // Utility methods + + /** + * Creates a {@link CoordinatorDynamicConfig} with the specified values of: + * {@code maxSegmentsToMove, maxSegmentsInNodeLoadingQueue and replicationThrottleLimit}. + * The created config always has {@code useBatchedSegmentSampler=true} to avoid + * flakiness in tests. + * + * @see CoordinatorSimulationBaseTest + */ + static CoordinatorDynamicConfig createDynamicConfig( + int maxSegmentsToMove, + int maxSegmentsInNodeLoadingQueue, + int replicationThrottleLimit + ) + { + return CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(maxSegmentsToMove) + .withReplicationThrottleLimit(replicationThrottleLimit) + .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInNodeLoadingQueue) + .withUseBatchedSegmentSampler(true) + .build(); + } + + /** + * Creates a map containing dimension key-values to filter out metric events. + */ + static Map filter(String... dimensionValues) + { + if (dimensionValues.length < 2 || dimensionValues.length % 2 == 1) { + throw new IllegalArgumentException("Dimension key-values must be specified in pairs."); + } + + final Map filters = new HashMap<>(); + for (int i = 0; i < dimensionValues.length; ) { + filters.put(dimensionValues[i], dimensionValues[i + 1]); + i += 2; + } + return filters; + } + + /** + * Creates a historical. The {@code uniqueIdInTier} must be correctly specified + * as it is used to identify the historical throughout the simulation. + */ + static DruidServer createHistorical(int uniqueIdInTier, String tier, long serverSizeMb) + { + final String name = tier + "__" + "hist__" + uniqueIdInTier; + return new DruidServer(name, name, name, serverSizeMb, ServerType.HISTORICAL, tier, 1); + } + + // Utility and constant holder classes + + static class DS + { + static final String WIKI = "wiki"; + } + + static class Tier + { + static final String T1 = "tier_t1"; + static final String T2 = "tier_t2"; + static final String T3 = "tier_t3"; + } + + static class Metric + { + static final String ASSIGNED_COUNT = "segment/assigned/count"; + static final String MOVED_COUNT = "segment/moved/count"; + static final String DROPPED_COUNT = "segment/dropped/count"; + static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count"; + } + + static class Segments + { + /** + * Segments of datasource {@link DS#WIKI}, size 500 MB each, + * spanning 1 day containing 10 partitions. + */ + static final List WIKI_10X1D = + CreateDataSegments.ofDatasource(DS.WIKI) + .forIntervals(1, Granularities.DAY) + .startingAt("2022-01-01") + .withNumPartitions(10) + .eachOfSizeInMb(500); + } + + /** + * Builder for a load rule. + */ + static class Load + { + private final Map tieredReplicants = new HashMap<>(); + + static Load on(String tier, int numReplicas) + { + Load load = new Load(); + load.tieredReplicants.put(tier, numReplicas); + return load; + } + + Load andOn(String tier, int numReplicas) + { + tieredReplicants.put(tier, numReplicas); + return this; + } + + Rule forever() + { + return new ForeverLoadRule(tieredReplicants); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java new file mode 100644 index 000000000000..29301ea033d4 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.client.DruidServer; +import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.curator.ZkEnablementConfig; +import org.apache.druid.curator.discovery.ServiceAnnouncer; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.concurrent.DirectExecutorService; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.server.coordinator.BalancerStrategyFactory; +import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig; +import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory; +import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.CostBalancerStrategyFactory; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; +import org.apache.druid.server.coordinator.LoadQueueTaskMaster; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; +import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups; +import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.server.initialization.ZkPathsConfig; +import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; +import org.apache.druid.timeline.DataSegment; +import org.easymock.EasyMock; +import org.joda.time.Duration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Builder for {@link CoordinatorSimulation}. + */ +public class CoordinatorSimulationBuilder +{ + private static final long DEFAULT_COORDINATOR_PERIOD = 100L; + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper() + .setInjectableValues( + new InjectableValues.Std().addValue( + DataSegment.PruneSpecsHolder.class, + DataSegment.PruneSpecsHolder.DEFAULT + ) + ); + + private BalancerStrategyFactory balancerStrategyFactory; + private CoordinatorDynamicConfig dynamicConfig = + CoordinatorDynamicConfig.builder() + .withUseBatchedSegmentSampler(true) + .build(); + private List servers; + private List segments; + private final Map> datasourceRules = new HashMap<>(); + private boolean loadImmediately = false; + private boolean autoSyncInventory = true; + + /** + * Specifies the balancer strategy to be used. + *

+ * Default: "cost" ({@link CostBalancerStrategyFactory}) + */ + public CoordinatorSimulationBuilder withBalancer(BalancerStrategyFactory strategyFactory) + { + this.balancerStrategyFactory = strategyFactory; + return this; + } + + public CoordinatorSimulationBuilder withServers(List servers) + { + this.servers = servers; + return this; + } + + public CoordinatorSimulationBuilder withServers(DruidServer... servers) + { + return withServers(Arrays.asList(servers)); + } + + public CoordinatorSimulationBuilder withSegments(List segments) + { + this.segments = segments; + return this; + } + + public CoordinatorSimulationBuilder withRules(String datasource, Rule... rules) + { + this.datasourceRules.put(datasource, Arrays.asList(rules)); + return this; + } + + /** + * Specifies whether segments should be loaded as soon as they are queued. + *

+ * Default: false + */ + public CoordinatorSimulationBuilder withImmediateSegmentLoading(boolean loadImmediately) + { + this.loadImmediately = loadImmediately; + return this; + } + + /** + * Specifies whether the inventory view maintained by the coordinator + * should be auto-synced as soon as any change is made to the cluster. + *

+ * Default: true + */ + public CoordinatorSimulationBuilder withAutoInventorySync(boolean autoSync) + { + this.autoSyncInventory = autoSync; + return this; + } + + /** + * Specifies the CoordinatorDynamicConfig to be used in the simulation. + *

+ * Default values: {@code useBatchedSegmentSampler = true}, other params as + * specified in {@link CoordinatorDynamicConfig.Builder}. + *

+ * Tests that verify balancing behaviour should set + * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true. + * Otherwise, the segment sampling is random and can produce repeated values + * leading to flakiness in the tests. The simulation sets this field to true by + * default. + */ + public CoordinatorSimulationBuilder withDynamicConfig(CoordinatorDynamicConfig dynamicConfig) + { + this.dynamicConfig = dynamicConfig; + return this; + } + + public CoordinatorSimulation build() + { + Preconditions.checkArgument( + servers != null && !servers.isEmpty(), + "Cannot run simulation for an empty cluster" + ); + + // Prepare the environment + final TestServerInventoryView serverInventoryView = new TestServerInventoryView(); + servers.forEach(serverInventoryView::addServer); + + final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager(); + if (segments != null) { + segments.forEach(segmentManager::addSegment); + } + + final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager(); + datasourceRules.forEach( + (datasource, rules) -> + ruleManager.overrideRule(datasource, rules, null) + ); + + final Environment env = new Environment( + serverInventoryView, + segmentManager, + ruleManager, + dynamicConfig, + loadImmediately, + autoSyncInventory + ); + + // Build the coordinator + final DruidCoordinator coordinator = new DruidCoordinator( + env.coordinatorConfig, + new ZkPathsConfig(), + env.jacksonConfigManager, + env.segmentManager, + env.coordinatorInventoryView, + env.ruleManager, + () -> null, + env.serviceEmitter, + env.executorFactory, + null, + env.loadQueueTaskMaster, + new ServiceAnnouncer.Noop(), + null, + Collections.emptySet(), + null, + new CoordinatorCustomDutyGroups(Collections.emptySet()), + balancerStrategyFactory != null ? balancerStrategyFactory + : new CostBalancerStrategyFactory(), + env.lookupCoordinatorManager, + env.leaderSelector, + OBJECT_MAPPER, + ZkEnablementConfig.ENABLED + ); + + return new SimulationImpl(coordinator, env); + } + + private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env) + { + try { + return new CachingCostBalancerStrategyFactory( + env.coordinatorInventoryView, + env.lifecycle, + new CachingCostBalancerStrategyConfig() + ); + } + catch (Exception e) { + throw new ISE(e, "Error building balancer strategy"); + } + } + + /** + * Implementation of {@link CoordinatorSimulation}. + */ + private static class SimulationImpl implements CoordinatorSimulation, + CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState + { + private final AtomicBoolean running = new AtomicBoolean(false); + + private final Environment env; + private final DruidCoordinator coordinator; + + private SimulationImpl(DruidCoordinator coordinator, Environment env) + { + this.env = env; + this.coordinator = coordinator; + } + + @Override + public void start() + { + if (!running.compareAndSet(false, true)) { + throw new ISE("Simulation is already running"); + } + + try { + env.setUp(); + coordinator.start(); + } + catch (Exception e) { + throw new ISE(e, "Exception while running simulation"); + } + } + + @Override + public void stop() + { + coordinator.stop(); + env.leaderSelector.stopBeingLeader(); + env.tearDown(); + } + + @Override + public CoordinatorState coordinator() + { + return this; + } + + @Override + public ClusterState cluster() + { + return this; + } + + @Override + public void runCoordinatorCycle() + { + verifySimulationRunning(); + env.serviceEmitter.flush(); + + // Invoke historical duties and metadata duties + env.executorFactory.coordinatorRunner.finishNextPendingTasks(2); + } + + @Override + public void syncInventoryView() + { + verifySimulationRunning(); + Preconditions.checkState( + !env.autoSyncInventory, + "Cannot invoke syncInventoryView as simulation is running in auto-sync mode." + ); + env.coordinatorInventoryView.sync(env.historicalInventoryView); + } + + @Override + public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig) + { + env.setDynamicConfig(dynamicConfig); + } + + @Override + public DruidServer getInventoryView(String serverName) + { + return env.coordinatorInventoryView.getInventoryValue(serverName); + } + + @Override + public void loadQueuedSegments() + { + verifySimulationRunning(); + Preconditions.checkState( + !env.loadImmediately, + "Cannot invoke loadQueuedSegments as simulation is running in immediate loading mode." + ); + + final BlockingExecutorService loadQueueExecutor = env.executorFactory.loadQueueExecutor; + while (loadQueueExecutor.hasPendingTasks()) { + // Drain all the items from the load queue executor + // This sends at most 1 load/drop request to each server + loadQueueExecutor.finishAllPendingTasks(); + + // Load all the queued segments, handle their responses and execute callbacks + int loadedSegments = env.executorFactory.historicalLoader.finishAllPendingTasks(); + loadQueueExecutor.finishNextPendingTasks(loadedSegments); + env.executorFactory.loadCallbackExecutor.finishAllPendingTasks(); + } + } + + private void verifySimulationRunning() + { + if (!running.get()) { + throw new ISE("Simulation hasn't been started yet."); + } + } + + @Override + public double getLoadPercentage(String datasource) + { + return coordinator.getLoadStatus().get(datasource); + } + + @Override + public List getMetricEvents() + { + return new ArrayList<>(env.serviceEmitter.getMetricEvents()); + } + } + + /** + * Environment for a coordinator simulation. + */ + private static class Environment + { + private final Lifecycle lifecycle = new Lifecycle("coord-sim"); + + // Executors + private final ExecutorFactory executorFactory; + + private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector(); + private final TestSegmentsMetadataManager segmentManager; + private final TestMetadataRuleManager ruleManager; + private final TestServerInventoryView historicalInventoryView; + + private final LoadQueueTaskMaster loadQueueTaskMaster; + private final StubServiceEmitter serviceEmitter + = new StubServiceEmitter("coordinator", "coordinator"); + private final TestServerInventoryView coordinatorInventoryView; + + private final AtomicReference dynamicConfig = new AtomicReference<>(); + private final JacksonConfigManager jacksonConfigManager; + private final LookupCoordinatorManager lookupCoordinatorManager; + private final DruidCoordinatorConfig coordinatorConfig; + private final boolean loadImmediately; + private final boolean autoSyncInventory; + + private final List mocks = new ArrayList<>(); + + private Environment( + TestServerInventoryView clusterInventory, + TestSegmentsMetadataManager segmentManager, + TestMetadataRuleManager ruleManager, + CoordinatorDynamicConfig dynamicConfig, + boolean loadImmediately, + boolean autoSyncInventory + ) + { + this.historicalInventoryView = clusterInventory; + this.segmentManager = segmentManager; + this.ruleManager = ruleManager; + this.loadImmediately = loadImmediately; + this.autoSyncInventory = autoSyncInventory; + + this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder() + .withCoordinatorStartDelay(new Duration(1L)) + .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD)) + .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD)) + .withLoadQueuePeonRepeatDelay(new Duration("PT0S")) + .withLoadQueuePeonType("http") + .withCoordinatorKillIgnoreDurationToRetain(false) + .build(); + + this.executorFactory = new ExecutorFactory(loadImmediately); + this.coordinatorInventoryView = autoSyncInventory + ? clusterInventory + : new TestServerInventoryView(); + HttpClient httpClient = new TestSegmentLoadingHttpClient( + OBJECT_MAPPER, + clusterInventory::getChangeHandlerForHost, + executorFactory.create(1, ExecutorFactory.HISTORICAL_LOADER) + ); + + this.loadQueueTaskMaster = new LoadQueueTaskMaster( + null, + OBJECT_MAPPER, + executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR), + executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR), + coordinatorConfig, + httpClient, + null + ); + + this.jacksonConfigManager = mockConfigManager(); + setDynamicConfig(dynamicConfig); + + this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class); + mocks.add(jacksonConfigManager); + mocks.add(lookupCoordinatorManager); + } + + private void setUp() throws Exception + { + EmittingLogger.registerEmitter(serviceEmitter); + historicalInventoryView.setUp(); + coordinatorInventoryView.setUp(); + lifecycle.start(); + executorFactory.setUp(); + leaderSelector.becomeLeader(); + EasyMock.replay(mocks.toArray()); + } + + private void tearDown() + { + EasyMock.verify(mocks.toArray()); + executorFactory.tearDown(); + lifecycle.stop(); + } + + private void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig) + { + this.dynamicConfig.set(dynamicConfig); + } + + private JacksonConfigManager mockConfigManager() + { + final JacksonConfigManager jacksonConfigManager + = EasyMock.createMock(JacksonConfigManager.class); + EasyMock.expect( + jacksonConfigManager.watch( + EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), + EasyMock.eq(CoordinatorDynamicConfig.class), + EasyMock.anyObject() + ) + ).andReturn(dynamicConfig).anyTimes(); + + EasyMock.expect( + jacksonConfigManager.watch( + EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY), + EasyMock.eq(CoordinatorCompactionConfig.class), + EasyMock.anyObject() + ) + ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes(); + + return jacksonConfigManager; + } + } + + /** + * Implementation of {@link ScheduledExecutorFactory} used to create and keep + * a handle on the various executors used inside the coordinator. + */ + private static class ExecutorFactory implements ScheduledExecutorFactory + { + static final String HISTORICAL_LOADER = "historical-loader-%d"; + static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d"; + static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d"; + static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d"; + + private final Map blockingExecutors = new HashMap<>(); + private final boolean directExecution; + + private BlockingExecutorService historicalLoader; + private BlockingExecutorService loadQueueExecutor; + private BlockingExecutorService loadCallbackExecutor; + private BlockingExecutorService coordinatorRunner; + + private ExecutorFactory(boolean directExecution) + { + this.directExecution = directExecution; + } + + @Override + public ScheduledExecutorService create(int corePoolSize, String nameFormat) + { + boolean isCoordinatorRunner = COORDINATOR_RUNNER.equals(nameFormat); + + // Coordinator running executor must always be blocked + final ExecutorService executorService = + (directExecution && !isCoordinatorRunner) + ? new DirectExecutorService() + : blockingExecutors.computeIfAbsent(nameFormat, BlockingExecutorService::new); + + return new WrappingScheduledExecutorService(nameFormat, executorService, !isCoordinatorRunner); + } + + private BlockingExecutorService findExecutor(String nameFormat) + { + return blockingExecutors.get(nameFormat); + } + + private void setUp() + { + coordinatorRunner = findExecutor(COORDINATOR_RUNNER); + historicalLoader = findExecutor(HISTORICAL_LOADER); + loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR); + loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR); + } + + private void tearDown() + { + blockingExecutors.values().forEach(BlockingExecutorService::shutdown); + } + } + +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md new file mode 100644 index 000000000000..a1562c518754 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md @@ -0,0 +1,141 @@ + + +# Coordinator simulations + +The simulation framework allows developers to recreate arbitrary cluster setups and verify coordinator behaviour. Tests +written using the framework can also help identify performance bottlenecks or potential bugs in the system and even +compare different balancing strategies. + +As opposed to unit tests, simulations are meant to test the coordinator as a whole and verify the interactions of all +the underlying parts. In that regard, these simulations resemble integration tests more closely. + +## Test targets + +The primary test target is the `DruidCoordinator` itself. The behaviour of the following entities can also be verified +using simulations: + +- `LoadQueuePeon`, `LoadQueueTaskMaster` +- All coordinator duties, e.g. `BalanceSegments`, `RunRules` +- All retention rules + +## Capabilities + +The framework provides control over the following aspects of the setup: + +| Input | Details | Actions | +|-------|---------|---------| +|cluster | server name, type, tier, size | add a server, remove a server| +|segment |datasource, interval, version, partition num, size | add/remove from server, mark used/unused, publish new segments| +|rules | type (foreverLoad, drop, etc), replica count per tier | set rules for a datasource| +|configs |coordinator period, load queue type, load queue size, max segments to balance | set or update a config | + +The above actions can be performed at any point after building the simulation. So, you could even recreate scenarios +where during a coordinator run, a server crashes or the retention rules of a datasource change, and verify the behaviour +of the coordinator in these situations. + +## Design + +1. __Execution__: A tight dependency on time durations such as the period of a repeating task or the delay before a + scheduled task makes it difficult to reliably reproduce a test scenario. As a result, the tests become flaky. Thus, + all the executors required for coordinator operations have been allowed only two possible modes of execution: + - __immediate__: Execute tasks on the calling thread itself. + - __blocked__: Keep tasks in a queue until explicitly invoked. +2. __Internal dependencies__: In order to allow realistic reproductions of the coordinator behaviour, none of the + internal parts of the coordinator have been mocked in the framework and new tests need not mock anything at all. +3. __External dependencies__: Since these tests are meant to verify the behaviour of only the coordinator, the + interfaces to communicate with external dependencies have been provided as simple in-memory implementations: + - communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager` + - communication with historicals: `HttpClient`, `ServerInventoryView` +4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two + modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately + reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the + cluster state. + +## Limitations + +- The framework does not expose the coordinator HTTP endpoints. +- It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the + balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between, + say two balancing strategies. + +## Usage + +Writing a test class: + +- Extend `CoordinatorSimulationBaseTest`. This base test exposes methods to get or set the state of the cluster and + coordinator during a simulation. +- Build a simulation using `CoordinatorSimulation.builder()` with specified segments, servers, rules and configs. +- Start the simulation with `startSimulation(simulation)`. +- Invoke coordinator runs with `runCoordinatorCycle()` +- Verify emitted metrics and current cluster state + +Example: + +```java +public class SimpleSimulationTest extends CoordinatorSimulationBaseTest +{ + @Test + public void testShiftSegmentsToDifferentTier() + { + // Create segments + List segments = + CreateDataSegments.ofDatasource("wiki") + .forIntervals(30, Granularities.DAY) + .startingAt("2022-01-01") + .withNumPartitions(10) + .eachOfSizeInMb(500); + + // Create servers + DruidServer historicalTier1 = createHistoricalTier(1, "tier_1", 10000); + DruidServer historicalTier2 = createHistoricalTier(1, "tier_2", 20000); + + // Build simulation + CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withServers(historicalTier1, historicalTier2) + .withSegments(segments) + .withRules("wiki".Load.on("tier_2", 1).forever()) + .build(); + + // Start the simulation with all segments loaded on tier_1 + segments.forEach(historicalTier1::addSegment); + startSimulation(sim); + + // Run a few coordinator cycles + int totalLoadedOnT2 = 0; + int totalDroppedFromT1 = 0; + for (int i = 0; i < 10; ++i) { + runCoordinatorCycle(); + loadQueuedSegments(); + totalLoadedOnT2 += getValue("segment/assigned/count", filter("tier", "tier_2")); + totalDroppedFromT1 += getValue("segment/dropped/count", filter("tier", "tier_1")); + } + + // Verify that some segments have been loaded/dropped + Assert.assertTrue(totalLoadedOnT2 > 0 && totalLoadedOnT2 <= segments.size()); + Assert.assertTrue(totalDroppedFromT1 > 0 && totalDroppedFromT1 <= segments.size()); + Assert.assertTrue(totalDroppedFromT1 <= totalLoadedOnT2); + } +} +``` + +## More examples + +See `org.apache.druid.server.coordinator.simulate.SegmentLoadingTest` diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java new file mode 100644 index 000000000000..77b9820a9533 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * Coordinator simulation test to verify behaviour of segment balancing. + */ +public class SegmentBalancingTest extends CoordinatorSimulationBaseTest +{ + private DruidServer historicalT11; + private DruidServer historicalT12; + + private final String datasource = DS.WIKI; + private final List segments = Segments.WIKI_10X1D; + + @Override + public void setUp() + { + // Setup historicals for 2 tiers, size 10 GB each + historicalT11 = createHistorical(1, Tier.T1, 10_000); + historicalT12 = createHistorical(2, Tier.T1, 10_000); + } + + @Test + public void testBalancingWithSyncedInventory() + { + // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10); + + // historicals = 2(T1), replicas = 1(T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig(dynamicConfig) + .withAutoInventorySync(true) + .build(); + + // Put all the segments on histT11 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that segments have been chosen for balancing + verifyValue(Metric.MOVED_COUNT, 5L); + + loadQueuedSegments(); + + // Verify that segments have now been balanced out + Assert.assertEquals(5, historicalT11.getTotalSegments()); + Assert.assertEquals(5, historicalT12.getTotalSegments()); + verifyDatasourceIsFullyLoaded(datasource); + } + + @Test + public void testBalancingOfFullyReplicatedSegment() + { + // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10); + + // historicals = 2(in T1), replicas = 1(T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT12) + .withDynamicConfig(dynamicConfig) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .build(); + + // Put all the segments on histT11 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that there are segments in the load queue for balancing + verifyValue(Metric.MOVED_COUNT, 5L); + verifyValue( + Metric.LOAD_QUEUE_COUNT, + filter(DruidMetrics.SERVER, historicalT12.getName()), + 5 + ); + + runCoordinatorCycle(); + + // Verify that the segments in the load queue are not considered as over-replicated + verifyValue("segment/dropped/count", 0L); + verifyValue( + Metric.LOAD_QUEUE_COUNT, + filter(DruidMetrics.SERVER, historicalT12.getName()), + 5 + ); + + // Finish and verify balancing + loadQueuedSegments(); + Assert.assertEquals(5, historicalT11.getTotalSegments()); + Assert.assertEquals(5, historicalT12.getTotalSegments()); + verifyDatasourceIsFullyLoaded(datasource); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java new file mode 100644 index 000000000000..52dc7c0933b9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * Contains negative tests that verify existing erroneous behaviour of segment + * loading. The underlying issues should be fixed and the modified tests + * should be migrated to {@link SegmentLoadingTest}. + *

+ * Identified issues: + * Apache #12881 + */ +public class SegmentLoadingNegativeTest extends CoordinatorSimulationBaseTest +{ + private DruidServer historicalT11; + private DruidServer historicalT12; + private DruidServer historicalT21; + + private final String datasource = DS.WIKI; + private final List segments = Segments.WIKI_10X1D; + + @Override + public void setUp() + { + // Setup historicals for 2 tiers, size 10 GB each + historicalT11 = createHistorical(1, Tier.T1, 10_000); + historicalT12 = createHistorical(2, Tier.T1, 10_000); + historicalT21 = createHistorical(1, Tier.T2, 10_000); + } + + /** + * Correct behaviour: replicationThrottleLimit should not be violated even if + * segment loading is fast. + *

+ * Fix Apache #12881 to fix this test. + */ + @Test + public void testImmediateLoadingViolatesThrottleLimit() + { + // Disable balancing, infinite load queue size, replicationThrottleLimit = 2 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2); + + // historicals = 2(in T1), segments = 10*1day + // replicas = 2(on T1), immediate segment loading + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .withImmediateSegmentLoading(true) + .withDynamicConfig(dynamicConfig) + .build(); + + // Put the first replica of all the segments on histT11 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that number of replicas assigned exceeds the replicationThrottleLimit + verifyValue(Metric.ASSIGNED_COUNT, 10L); + + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(10, historicalT12.getTotalSegments()); + verifyDatasourceIsFullyLoaded(datasource); + } + + /** + * Correct behaviour: The first replica on any tier should not be throttled. + *

+ * Fix Apache #12881 to fix this test. + */ + @Test + public void testFirstReplicaOnAnyTierIsThrottled() + { + // Disable balancing, infinite load queue size, replicateThrottleLimit = 2 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2); + + // historicals = 1(in T1) + 1(in T2) + // replicas = 1(on T1) + 1(on T2) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT21) + .withDynamicConfig(dynamicConfig) + .withRules( + datasource, + Load.on(Tier.T1, 1).andOn(Tier.T2, 1).forever() + ) + .build(); + + // Put the first replica of all the segments on T1 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that num replicas assigned to T2 are equal to the replicationthrottleLimit + verifyValue( + Metric.ASSIGNED_COUNT, + filter(DruidMetrics.TIER, Tier.T2), + 2L + ); + + loadQueuedSegments(); + + verifyDatasourceIsFullyLoaded(datasource); + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(2, historicalT21.getTotalSegments()); + } + + /** + * Correct behaviour: Historical should not get overassigned even if loading is fast. + *

+ * Fix Apache #12881 to fix this test. + */ + @Test + public void testImmediateLoadingOverassignsHistorical() + { + // historicals = 1(in T1), size 1 GB + final DruidServer historicalT11 = createHistorical(1, Tier.T1, 1000); + + // disable balancing, unlimited load queue, replicationThrottleLimit = 10 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10); + + // segments = 10*1day, size 500 MB + // strategy = cost, replicas = 1(T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11) + .withDynamicConfig(dynamicConfig) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withImmediateSegmentLoading(true) + .build(); + + startSimulation(sim); + runCoordinatorCycle(); + + // The historical is assigned several segments but loads only upto its capacity + verifyValue(Metric.ASSIGNED_COUNT, 10L); + Assert.assertEquals(2, historicalT11.getTotalSegments()); + } + + /** + * Correct behaviour: For a fully replicated segment, items that are in the load + * queue should get cancelled so that the coordinator does not have to wait + * for the loads to finish and then take remedial action. + *

+ * Fix Apache #12881 to fix this test case. + */ + @Test + public void testLoadOfFullyReplicatedSegmentIsNotCancelled() + { + // disable balancing, unlimited load queue, replicationThrottleLimit = 10 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10); + + // historicals = 2(in T1), replicas = 2(on T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT12) + .withDynamicConfig(dynamicConfig) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .build(); + + // Put the first replica of all the segments on histT11 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that there are segments in the load queue + verifyValue(Metric.ASSIGNED_COUNT, 10L); + verifyValue( + Metric.LOAD_QUEUE_COUNT, + filter(DruidMetrics.SERVER, historicalT12.getName()), + 10 + ); + + // Put the second replica of all the segments on histT12 + segments.forEach(historicalT12::addDataSegment); + + runCoordinatorCycle(); + + // Verify that the segments are still in the load queue + verifyValue( + Metric.LOAD_QUEUE_COUNT, + filter(DruidMetrics.SERVER, historicalT12.getName()), + 10 + ); + } + + /** + * Correct behaviour: Balancing should never cause over-replication, even when + * the inventory view is not updated. + *

+ * Fix Apache #12881 to fix this test. + */ + @Test + public void testBalancingWithStaleInventoryCausesOverReplication() + { + // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10); + + // historicals = 2(T1), replicas = 1(T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withDynamicConfig(dynamicConfig) + .withAutoInventorySync(false) + .build(); + + // Put all the segments on histT11 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + syncInventoryView(); + runCoordinatorCycle(); + + // Verify that segments have been chosen for balancing + verifyValue(Metric.MOVED_COUNT, 5L); + + loadQueuedSegments(); + + // Verify that segments have now been balanced out + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(5, historicalT12.getTotalSegments()); + verifyDatasourceIsFullyLoaded(datasource); + } + +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java new file mode 100644 index 000000000000..1edeab8a370f --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + * Coordinator simulation test to verify behaviour of segment loading. + */ +public class SegmentLoadingTest extends CoordinatorSimulationBaseTest +{ + private DruidServer historicalT11; + private DruidServer historicalT12; + private DruidServer historicalT21; + private DruidServer historicalT22; + + private final String datasource = DS.WIKI; + private final List segments = Segments.WIKI_10X1D; + + @Override + public void setUp() + { + // Setup historicals for 2 tiers, size 10 GB each + historicalT11 = createHistorical(1, Tier.T1, 10_000); + historicalT12 = createHistorical(2, Tier.T1, 10_000); + + historicalT21 = createHistorical(1, Tier.T2, 10_000); + historicalT22 = createHistorical(2, Tier.T2, 10_000); + } + + @Test + public void testSecondReplicaOnAnyTierIsThrottled() + { + // Disable balancing, infinite load queue size, replicateThrottleLimit = 2 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2); + + // historicals = 2(in T1) + // replicas = 2(on T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11, historicalT12) + .withRules(datasource, Load.on(Tier.T1, 2).forever()) + .withDynamicConfig(dynamicConfig) + .build(); + + // Put the first replica of all the segments on histT11 + segments.forEach(historicalT11::addDataSegment); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that that replicationThrottleLimit is honored + verifyValue(Metric.ASSIGNED_COUNT, 2L); + + loadQueuedSegments(); + Assert.assertEquals(10, historicalT11.getTotalSegments()); + Assert.assertEquals(2, historicalT12.getTotalSegments()); + } + + @Test + public void testLoadingDoesNotOverassignHistorical() + { + // historicals = 1(in T1), size 1 GB + final DruidServer historicalT11 = createHistorical(1, Tier.T1, 1000); + + // disable balancing, unlimited load queue, replicationThrottleLimit = 10 + CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10); + + // segments = 10*1day, size 500 MB + // strategy = cost, replicas = 1(T1) + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withServers(historicalT11) + .withDynamicConfig(dynamicConfig) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withImmediateSegmentLoading(false) + .build(); + + startSimulation(sim); + runCoordinatorCycle(); + + // Verify that the number of segments assigned is within the historical capacity + verifyValue(Metric.ASSIGNED_COUNT, 2L); + loadQueuedSegments(); + Assert.assertEquals(2, historicalT11.getTotalSegments()); + } + + @Test + public void testDropHappensAfterTargetReplicationOnEveryTier() + { + // maxNonPrimaryReplicants = 33 ensures that all target replicas (total 4) + // are assigned for some segments in the first run itself (pigeon-hole) + CoordinatorDynamicConfig dynamicConfig = + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(0) + .withReplicationThrottleLimit(10) + .withMaxNonPrimaryReplicantsToLoad(33) + .build(); + + // historicals = 1(in T1) + 2(in T2) + 2(in T3) + // segments = 10 * 1day, replicas = 2(T2) + 2(T3) + final DruidServer historicalT31 = createHistorical(1, Tier.T3, 10_000); + final DruidServer historicalT32 = createHistorical(2, Tier.T3, 10_000); + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withSegments(segments) + .withDynamicConfig(dynamicConfig) + .withRules(datasource, Load.on(Tier.T2, 2).andOn(Tier.T3, 2).forever()) + .withServers( + historicalT11, + historicalT21, + historicalT22, + historicalT31, + historicalT32 + ) + .build(); + + // At the start, T1 has all the segments + segments.forEach(historicalT11::addDataSegment); + + // Run 1: Nothing is dropped from T1 but things are assigned to T2 and T3 + startSimulation(sim); + runCoordinatorCycle(); + + verifyNoEvent(Metric.DROPPED_COUNT); + int totalAssignedInRun1 + = getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T2)).intValue() + + getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T3)).intValue(); + Assert.assertTrue(totalAssignedInRun1 > 0 && totalAssignedInRun1 < 40); + + // Run 2: Segments still queued, nothing is dropped from T1 + runCoordinatorCycle(); + loadQueuedSegments(); + + verifyNoEvent(Metric.DROPPED_COUNT); + int totalLoadedAfterRun2 + = historicalT21.getTotalSegments() + historicalT22.getTotalSegments() + + historicalT31.getTotalSegments() + historicalT32.getTotalSegments(); + Assert.assertEquals(totalAssignedInRun1, totalLoadedAfterRun2); + + // Run 3: Some segments have been loaded + // segments fully replicated on T2 and T3 will now be dropped from T1 + runCoordinatorCycle(); + loadQueuedSegments(); + + int totalDroppedInRun3 + = getValue(Metric.DROPPED_COUNT, filter(DruidMetrics.TIER, Tier.T1)).intValue(); + Assert.assertTrue(totalDroppedInRun3 > 0 && totalDroppedInRun3 < 10); + int totalLoadedAfterRun3 + = historicalT21.getTotalSegments() + historicalT22.getTotalSegments() + + historicalT31.getTotalSegments() + historicalT32.getTotalSegments(); + Assert.assertEquals(40, totalLoadedAfterRun3); + + // Run 4: All segments are fully replicated on T2 and T3 + runCoordinatorCycle(); + loadQueuedSegments(); + + int totalDroppedInRun4 + = getValue(Metric.DROPPED_COUNT, filter(DruidMetrics.TIER, Tier.T1)).intValue(); + + Assert.assertEquals(10, totalDroppedInRun3 + totalDroppedInRun4); + Assert.assertEquals(0, historicalT11.getTotalSegments()); + verifyDatasourceIsFullyLoaded(datasource); + } + +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java new file mode 100644 index 000000000000..d84cbcff6efe --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.discovery.DruidLeaderSelector; + +import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestDruidLeaderSelector implements DruidLeaderSelector +{ + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private volatile Listener listener; + + public void becomeLeader() + { + if (isLeader.compareAndSet(false, true) && listener != null) { + listener.becomeLeader(); + } + } + + public void stopBeingLeader() + { + if (isLeader.compareAndSet(true, false) && listener != null) { + listener.stopBeingLeader(); + } + } + + @Nullable + @Override + public String getCurrentLeader() + { + return "me"; + } + + @Override + public boolean isLeader() + { + return isLeader.get(); + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) + { + this.listener = listener; + if (isLeader()) { + listener.becomeLeader(); + } + } + + @Override + public void unregisterListener() + { + listener = null; + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java new file mode 100644 index 000000000000..9ca037b0cfbf --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.audit.AuditInfo; +import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.server.coordinator.rules.ForeverLoadRule; +import org.apache.druid.server.coordinator.rules.Rule; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestMetadataRuleManager implements MetadataRuleManager +{ + private final Map> rules = new HashMap<>(); + + private static final String DEFAULT_DATASOURCE = "_default"; + + public TestMetadataRuleManager() + { + rules.put( + DEFAULT_DATASOURCE, + Collections.singletonList(new ForeverLoadRule(null)) + ); + } + + @Override + public void start() + { + // do nothing + } + + @Override + public void stop() + { + // do nothing + } + + @Override + public void poll() + { + // do nothing + } + + @Override + public Map> getAllRules() + { + return rules; + } + + @Override + public List getRules(final String dataSource) + { + List retVal = rules.get(dataSource); + return retVal == null ? new ArrayList<>() : retVal; + } + + @Override + public List getRulesWithDefault(final String dataSource) + { + List retVal = new ArrayList<>(); + final Map> theRules = rules; + if (theRules.get(dataSource) != null) { + retVal.addAll(theRules.get(dataSource)); + } + if (theRules.get(DEFAULT_DATASOURCE) != null) { + retVal.addAll(theRules.get(DEFAULT_DATASOURCE)); + } + return retVal; + } + + @Override + public boolean overrideRule(final String dataSource, final List newRules, final AuditInfo auditInfo) + { + rules.put(dataSource, newRules); + return true; + } + + @Override + public int removeRulesForEmptyDatasourcesOlderThan(long timestamp) + { + return 0; + } + + public void removeRulesForDatasource(String dataSource) + { + if (!DEFAULT_DATASOURCE.equals(dataSource)) { + rules.remove(dataSource); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java new file mode 100644 index 000000000000..0b91e7009026 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.server.coordination.DataSegmentChangeCallback; +import org.apache.druid.server.coordination.DataSegmentChangeHandler; +import org.apache.druid.server.coordination.DataSegmentChangeRequest; +import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Duration; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class TestSegmentLoadingHttpClient implements HttpClient +{ + private static final HttpResponseHandler.TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L; + private static final DataSegmentChangeCallback NOOP_CALLBACK = () -> { + }; + + private final ObjectMapper objectMapper; + private final Function hostToHandler; + + private final ListeningScheduledExecutorService executorService; + + public TestSegmentLoadingHttpClient( + ObjectMapper objectMapper, + Function hostToHandler, + ScheduledExecutorService executorService + ) + { + this.objectMapper = objectMapper; + this.hostToHandler = hostToHandler; + this.executorService = MoreExecutors.listeningDecorator(executorService); + } + + @Override + public ListenableFuture go( + Request request, + HttpResponseHandler handler + ) + { + return go(request, handler, null); + } + + @Override + public ListenableFuture go( + Request request, + HttpResponseHandler handler, + Duration readTimeout + ) + { + return executorService.submit(() -> processRequest(request, handler)); + } + + private Final processRequest( + Request request, + HttpResponseHandler handler + ) + { + try { + // Fail the request if there is no handler for this host + final DataSegmentChangeHandler changeHandler = hostToHandler + .apply(request.getUrl().getHost()); + if (changeHandler == null) { + final HttpResponse failureResponse = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); + failureResponse.setContent(ChannelBuffers.EMPTY_BUFFER); + handler.handleResponse(failureResponse, NOOP_TRAFFIC_COP); + return (Final) new ByteArrayInputStream(new byte[0]); + } + + // Handle change requests and serialize + final byte[] serializedContent; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + objectMapper.writeValue(baos, processRequest(request, changeHandler)); + serializedContent = baos.toByteArray(); + } + + // Set response content and status + final HttpResponse response = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setContent(ChannelBuffers.EMPTY_BUFFER); + handler.handleResponse(response, NOOP_TRAFFIC_COP); + return (Final) new ByteArrayInputStream(serializedContent); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Processes all the changes in the request. + */ + private List processRequest( + Request request, + DataSegmentChangeHandler changeHandler + ) throws IOException + { + final List changeRequests = objectMapper.readValue( + request.getContent().array(), + new TypeReference>() + { + } + ); + + return changeRequests + .stream() + .map(changeRequest -> processRequest(changeRequest, changeHandler)) + .collect(Collectors.toList()); + } + + /** + * Processes each DataSegmentChangeRequest using the handler. + */ + private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus processRequest( + DataSegmentChangeRequest request, + DataSegmentChangeHandler handler + ) + { + SegmentLoadDropHandler.Status status; + try { + request.go(handler, NOOP_CALLBACK); + status = SegmentLoadDropHandler.Status.SUCCESS; + } + catch (Exception e) { + status = SegmentLoadDropHandler.Status.failed(e.getMessage()); + } + + return new SegmentLoadDropHandler + .DataSegmentChangeRequestAndStatus(request, status); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java new file mode 100644 index 000000000000..43a96d600707 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TestSegmentsMetadataManager implements SegmentsMetadataManager +{ + private final ConcurrentMap segments = new ConcurrentHashMap<>(); + private final ConcurrentMap usedSegments = new ConcurrentHashMap<>(); + + public void addSegment(DataSegment segment) + { + segments.put(segment.getId().toString(), segment); + usedSegments.put(segment.getId().toString(), segment); + } + + public void removeSegment(DataSegment segment) + { + segments.remove(segment.getId().toString()); + usedSegments.remove(segment.getId().toString()); + } + + @Override + public void startPollingDatabasePeriodically() + { + + } + + @Override + public void stopPollingDatabasePeriodically() + { + + } + + @Override + public boolean isPollingDatabasePeriodically() + { + return true; + } + + @Override + public int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource) + { + return 0; + } + + @Override + public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval) + { + return 0; + } + + @Override + public int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) + { + return 0; + } + + @Override + public boolean markSegmentAsUsed(String segmentId) + { + if (!segments.containsKey(segmentId)) { + return false; + } + + usedSegments.put(segmentId, segments.get(segmentId)); + return true; + } + + @Override + public int markAsUnusedAllSegmentsInDataSource(String dataSource) + { + return 0; + } + + @Override + public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval) + { + return 0; + } + + @Override + public int markSegmentsAsUnused(Set segmentIds) + { + int numModifiedSegments = 0; + for (SegmentId segmentId : segmentIds) { + if (usedSegments.remove(segmentId.toString()) != null) { + ++numModifiedSegments; + } + } + return numModifiedSegments; + } + + @Override + public boolean markSegmentAsUnused(SegmentId segmentId) + { + return usedSegments.remove(segmentId.toString()) != null; + } + + @Nullable + @Override + public ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSource) + { + return null; + } + + @Override + public Collection getImmutableDataSourcesWithAllUsedSegments() + { + return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments(); + } + + @Override + public Set getOvershadowedSegments() + { + return getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments(); + } + + @Override + public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() + { + return DataSourcesSnapshot.fromUsedSegments(usedSegments.values(), ImmutableMap.of()); + } + + @Override + public Iterable iterateAllUsedSegments() + { + return usedSegments.values(); + } + + @Override + public Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + String datasource, + Interval interval, + boolean requiresLatest + ) + { + VersionedIntervalTimeline usedSegmentsTimeline + = getSnapshotOfDataSourcesWithAllUsedSegments().getUsedSegmentsTimelinesPerDataSource().get(datasource); + return Optional.fromNullable(usedSegmentsTimeline) + .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval( + interval, + Partitions.ONLY_COMPLETE + )); + } + + @Override + public Set retrieveAllDataSourceNames() + { + return null; + } + + @Override + public List getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit) + { + return null; + } + + @Override + public void poll() + { + + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java new file mode 100644 index 000000000000..fedafc45c9d6 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ServerInventoryView; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordination.DataSegmentChangeCallback; +import org.apache.druid.server.coordination.DataSegmentChangeHandler; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +public class TestServerInventoryView implements ServerInventoryView +{ + private static final Logger log = new Logger(TestServerInventoryView.class); + + private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap segmentChangeHandlers = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap segmentCallbacks = new ConcurrentHashMap<>(); + private final List serverChangeHandlers = new ArrayList<>(); + + public void setUp() + { + segmentCallbacks.forEach( + (segmentCallback, executor) -> + executor.execute(segmentCallback::segmentViewInitialized) + ); + } + + /** + * Synchronizes this inventory view with the given inventory view. + */ + public void sync(ServerInventoryView other) + { + // Clear the current inventory + for (ServerChangeHandler handler : serverChangeHandlers) { + servers.values().forEach(handler::removeServer); + } + servers.clear(); + segmentChangeHandlers.clear(); + + for (DruidServer server : other.getInventory()) { + addServer(new DruidServer( + server.getName(), + server.getHostAndPort(), + server.getHostAndTlsPort(), + server.getMaxSize(), + server.getType(), + server.getTier(), + server.getPriority() + )); + DataSegmentChangeHandler handler = getChangeHandlerForHost(server.getName()); + for (DataSegment segment : server.iterateAllSegments()) { + handler.addSegment(segment, null); + } + } + } + + public void addServer(DruidServer server) + { + servers.put(server.getName(), server); + segmentChangeHandlers.put(server.getName(), new SegmentChangeHandler(server)); + } + + public void removeServer(DruidServer server) + { + servers.remove(server.getName()); + segmentChangeHandlers.remove(server.getName()); + + for (ServerChangeHandler handler : serverChangeHandlers) { + handler.removeServer(server); + } + } + + public DataSegmentChangeHandler getChangeHandlerForHost(String serverName) + { + return segmentChangeHandlers.get(serverName); + } + + @Nullable + @Override + public DruidServer getInventoryValue(String serverKey) + { + return servers.get(serverKey); + } + + @Override + public Collection getInventory() + { + return Collections.unmodifiableCollection(servers.values()); + } + + @Override + public boolean isStarted() + { + return true; + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + DruidServer server = servers.get(serverKey); + return server != null && server.getSegment(segment.getId()) != null; + } + + @Override + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + { + serverChangeHandlers.add(new ServerChangeHandler(callback, exec)); + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + segmentCallbacks.put(callback, exec); + } + + private class SegmentChangeHandler implements DataSegmentChangeHandler + { + private final DruidServer server; + + private SegmentChangeHandler(DruidServer server) + { + this.server = server; + } + + @Override + public void addSegment( + DataSegment segment, + @Nullable DataSegmentChangeCallback callback + ) + { + log.debug("Adding segment [%s] to server [%s]", segment.getId(), server.getName()); + + if (server.getMaxSize() - server.getCurrSize() >= segment.getSize()) { + server.addDataSegment(segment); + segmentCallbacks.forEach( + (segmentCallback, executor) -> executor.execute( + () -> segmentCallback.segmentAdded(server.getMetadata(), segment) + ) + ); + } else { + throw new ISE( + "Not enough free space on server %s. Segment size [%d]. Current free space [%d]", + server.getName(), + segment.getSize(), + server.getMaxSize() - server.getCurrSize() + ); + } + } + + @Override + public void removeSegment( + DataSegment segment, + @Nullable DataSegmentChangeCallback callback + ) + { + log.debug("Removing segment [%s] from server [%s]", segment.getId(), server.getName()); + server.removeDataSegment(segment.getId()); + segmentCallbacks.forEach( + (segmentCallback, executor) -> executor.execute( + () -> segmentCallback.segmentAdded(server.getMetadata(), segment) + ) + ); + } + } + + private static class ServerChangeHandler + { + private final Executor executor; + private final ServerRemovedCallback callback; + + private ServerChangeHandler(ServerRemovedCallback callback, Executor executor) + { + this.callback = callback; + this.executor = executor; + } + + private void removeServer(DruidServer server) + { + executor.execute(() -> callback.serverRemoved(server)); + } + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java new file mode 100644 index 000000000000..334651ee30f5 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.simulate; + +import org.apache.druid.java.util.common.logger.Logger; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Wraps an {@link ExecutorService} into a {@link ScheduledExecutorService}. + */ +public class WrappingScheduledExecutorService implements ScheduledExecutorService +{ + private static final Logger log = new Logger(WrappingScheduledExecutorService.class); + + private final String nameFormat; + private final ExecutorService delegate; + private final boolean ignoreScheduledTasks; + + public WrappingScheduledExecutorService( + String nameFormat, + ExecutorService delegate, + boolean ignoreScheduledTasks + ) + { + this.nameFormat = nameFormat; + this.delegate = delegate; + this.ignoreScheduledTasks = ignoreScheduledTasks; + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) + { + if (ignoreScheduledTasks) { + log.debug("[%s] Ignoring scheduled task", nameFormat); + return new WrappingScheduledFuture<>(CompletableFuture.completedFuture(null)); + } + + // Ignore the delay and just queue the task + return new WrappingScheduledFuture<>(submit(command)); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) + { + if (ignoreScheduledTasks) { + log.debug("[%s] Ignoring scheduled task", nameFormat); + return new WrappingScheduledFuture<>(CompletableFuture.completedFuture(null)); + } + + // Ignore the delay and just queue the task + return new WrappingScheduledFuture<>(submit(callable)); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, + long initialDelay, + long period, + TimeUnit unit + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, + long initialDelay, + long delay, + TimeUnit unit + ) + { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdown() + { + delegate.shutdown(); + } + + @Override + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) + { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) + { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) + { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException + { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit + ) throws InterruptedException + { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException + { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) + { + delegate.execute(command); + } + + /** + * Wraps a Future into a ScheduledFuture. + */ + private static class WrappingScheduledFuture implements ScheduledFuture + { + private final Future future; + + private WrappingScheduledFuture(Future future) + { + this.future = future; + } + + @Override + public long getDelay(TimeUnit unit) + { + return 0; + } + + @Override + public int compareTo(Delayed o) + { + return 0; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) + { + return future.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() + { + return future.isCancelled(); + } + + @Override + public boolean isDone() + { + return future.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException + { + return future.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException + { + return future.get(timeout, unit); + } + } +}