From fe647d50f9c62e3340387ba84ce8ec015e965aa5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 5 Feb 2020 21:25:46 -0800 Subject: [PATCH] Add unit tests for CoordinatorRuleManager --- .../coordinator/rules/ForeverLoadRule.java | 20 +++ .../coordinator/rules/IntervalDropRule.java | 21 +++ .../server/router/CoordinatorRuleManager.java | 55 ++++--- .../server/router/TieredBrokerConfig.java | 3 +- .../router/CoordinatorRuleManagerTest.java | 149 ++++++++++++++++++ 5 files changed, 222 insertions(+), 26 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverLoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverLoadRule.java index a509877aa951..ee5da0bdbb20 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverLoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverLoadRule.java @@ -28,6 +28,7 @@ import org.joda.time.Interval; import java.util.Map; +import java.util.Objects; /** */ @@ -76,4 +77,23 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) { return true; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ForeverLoadRule that = (ForeverLoadRule) o; + return Objects.equals(tieredReplicants, that.tieredReplicants); + } + + @Override + public int hashCode() + { + return Objects.hash(tieredReplicants); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalDropRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalDropRule.java index 46f8f330fb38..b2959e925d28 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalDropRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalDropRule.java @@ -25,6 +25,8 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Objects; + /** */ public class IntervalDropRule extends DropRule @@ -63,4 +65,23 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp) { return interval.contains(theInterval); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IntervalDropRule that = (IntervalDropRule) o; + return Objects.equals(interval, that.interval); + } + + @Override + public int hashCode() + { + return Objects.hash(interval); + } } diff --git a/server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java b/server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java index d18a6d65afab..5ebe05e9a6ce 100644 --- a/server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java +++ b/server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; +import com.google.common.collect.Maps; import com.google.inject.Inject; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.discovery.DruidLeaderClient; @@ -40,6 +42,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Duration; +import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -48,25 +51,30 @@ import java.util.concurrent.atomic.AtomicReference; /** + * */ @ManageLifecycle public class CoordinatorRuleManager { - private static final Logger log = new Logger(CoordinatorRuleManager.class); + private static final Logger LOG = new Logger(CoordinatorRuleManager.class); + + private static final TypeReference>> TYPE_REFERENCE = + new TypeReference>>() + { + }; private final ObjectMapper jsonMapper; private final Supplier config; - private final AtomicReference>> rules; - private final DruidLeaderClient druidLeaderClient; - private volatile ScheduledExecutorService exec; - private final Object lock = new Object(); private volatile boolean started = false; + @GuardedBy("lock") + private ScheduledExecutorService exec; + @Inject public CoordinatorRuleManager( @Json ObjectMapper jsonMapper, @@ -78,9 +86,7 @@ public CoordinatorRuleManager( this.config = config; this.druidLeaderClient = druidLeaderClient; - this.rules = new AtomicReference<>( - Collections.emptyMap() - ); + this.rules = new AtomicReference<>(Collections.emptyMap()); } @LifecycleStart @@ -97,14 +103,7 @@ public void start() exec, new Duration(0), config.get().getPollPeriod().toStandardDuration(), - new Runnable() - { - @Override - public void run() - { - poll(); - } - } + this::poll ); started = true; @@ -132,7 +131,6 @@ public boolean isStarted() return started; } - @SuppressWarnings("unchecked") public void poll() { try { @@ -148,16 +146,13 @@ public void poll() ); } - rules.set( - Collections.unmodifiableMap(jsonMapper.readValue( - response.getContent(), new TypeReference>>() - { - } - )) - ); + final Map> map = jsonMapper.readValue(response.getContent(), TYPE_REFERENCE); + final Map> immutableMapBuilder = Maps.newHashMapWithExpectedSize(map.size()); + map.forEach((k, list) -> immutableMapBuilder.put(k, Collections.unmodifiableList(list))); + rules.set(Collections.unmodifiableMap(immutableMapBuilder)); } catch (Exception e) { - log.error(e, "Exception while polling for rules"); + LOG.error(e, "Exception while polling for rules"); } } @@ -175,4 +170,14 @@ public List getRulesWithDefault(final String dataSource) } return rulesWithDefault; } + + /** + * Returns the current snapshot of the rules. + * This method should be used for only testing. + */ + @VisibleForTesting + Map> getRules() + { + return rules.get(); + } } diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java index 9d3a1f9b8ecf..e18863853794 100644 --- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java +++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java @@ -36,6 +36,7 @@ public class TieredBrokerConfig { public static final String DEFAULT_COORDINATOR_SERVICE_NAME = "druid/coordinator"; public static final String DEFAULT_BROKER_SERVICE_NAME = "druid/broker"; + public static final String DEFAULT_RULE_NAME = "_default"; @JsonProperty @NotNull @@ -46,7 +47,7 @@ public class TieredBrokerConfig @JsonProperty @NotNull - private String defaultRule = "_default"; + private String defaultRule = DEFAULT_RULE_NAME; @JsonProperty @NotNull diff --git a/server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java b/server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java new file mode 100644 index 000000000000..89741c0c706b --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.router; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.server.coordinator.rules.ForeverDropRule; +import org.apache.druid.server.coordinator.rules.ForeverLoadRule; +import org.apache.druid.server.coordinator.rules.IntervalDropRule; +import org.apache.druid.server.coordinator.rules.PeriodLoadRule; +import org.apache.druid.server.coordinator.rules.Rule; +import org.easymock.EasyMock; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class CoordinatorRuleManagerTest +{ + private static final String DATASOURCE1 = "datasource1"; + private static final String DATASOURCE2 = "datasource2"; + private static final List DEFAULT_RULES = ImmutableList.of( + new ForeverLoadRule(ImmutableMap.of("__default", 2)) + ); + + @org.junit.Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper objectMapper = new DefaultObjectMapper(); + private final TieredBrokerConfig tieredBrokerConfig = new TieredBrokerConfig(); + + @Test + public void testAddingToRulesMapThrowingError() + { + final CoordinatorRuleManager manager = new CoordinatorRuleManager( + objectMapper, + () -> tieredBrokerConfig, + mockClient() + ); + final Map> rules = manager.getRules(); + expectedException.expect(UnsupportedOperationException.class); + rules.put("testKey", Collections.emptyList()); + } + + @Test + public void testAddingToRulesListThrowingError() + { + final CoordinatorRuleManager manager = new CoordinatorRuleManager( + objectMapper, + () -> tieredBrokerConfig, + mockClient() + ); + manager.poll(); + final Map> rules = manager.getRules(); + expectedException.expect(UnsupportedOperationException.class); + rules.get(DATASOURCE1).add(new ForeverDropRule()); + } + + @Test + public void testGetRulesWithUnknownDatasourceReturningDefaultRule() + { + final CoordinatorRuleManager manager = new CoordinatorRuleManager( + objectMapper, + () -> tieredBrokerConfig, + mockClient() + ); + manager.poll(); + final List rules = manager.getRulesWithDefault("unknown"); + Assert.assertEquals(DEFAULT_RULES, rules); + } + + @Test + public void testGetRulesWithKnownDatasourceReturningAllRulesWithDefaultRule() + { + final CoordinatorRuleManager manager = new CoordinatorRuleManager( + objectMapper, + () -> tieredBrokerConfig, + mockClient() + ); + manager.poll(); + final List rules = manager.getRulesWithDefault(DATASOURCE2); + final List expectedRules = new ArrayList<>(); + expectedRules.add(new ForeverLoadRule(null)); + expectedRules.add(new IntervalDropRule(Intervals.of("2020-01-01/2020-01-02"))); + expectedRules.addAll(DEFAULT_RULES); + Assert.assertEquals(expectedRules, rules); + } + + private DruidLeaderClient mockClient() + { + final Map> rules = ImmutableMap.of( + DATASOURCE1, + ImmutableList.of(new ForeverLoadRule(null)), + DATASOURCE2, + ImmutableList.of(new ForeverLoadRule(null), new IntervalDropRule(Intervals.of("2020-01-01/2020-01-02"))), + "datasource3", + ImmutableList.of( + new PeriodLoadRule(new Period("P1M"), true, null), + new ForeverDropRule() + ), + TieredBrokerConfig.DEFAULT_RULE_NAME, + ImmutableList.of(new ForeverLoadRule(ImmutableMap.of("__default", 2))) + ); + final StringFullResponseHolder holder = EasyMock.niceMock(StringFullResponseHolder.class); + EasyMock.expect(holder.getStatus()) + .andReturn(HttpResponseStatus.OK); + try { + EasyMock.expect(holder.getContent()) + .andReturn(objectMapper.writeValueAsString(rules)); + final DruidLeaderClient client = EasyMock.niceMock(DruidLeaderClient.class); + EasyMock.expect(client.go(EasyMock.anyObject())) + .andReturn(holder); + EasyMock.replay(holder, client); + return client; + } + catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } +}