Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.joda.time.Interval;

import java.util.Map;
import java.util.Objects;

/**
*/
Expand Down Expand Up @@ -76,4 +77,23 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ForeverLoadRule that = (ForeverLoadRule) o;
return Objects.equals(tieredReplicants, that.tieredReplicants);
}

@Override
public int hashCode()
{
return Objects.hash(tieredReplicants);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.Objects;

/**
*/
public class IntervalDropRule extends DropRule
Expand Down Expand Up @@ -63,4 +65,23 @@ public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
{
return interval.contains(theInterval);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
IntervalDropRule that = (IntervalDropRule) o;
return Objects.equals(interval, that.interval);
}

@Override
public int hashCode()
{
return Objects.hash(interval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.discovery.DruidLeaderClient;
Expand All @@ -40,6 +42,7 @@
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;

import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -48,25 +51,30 @@
import java.util.concurrent.atomic.AtomicReference;

/**
*
*/
@ManageLifecycle
public class CoordinatorRuleManager
{
private static final Logger log = new Logger(CoordinatorRuleManager.class);
private static final Logger LOG = new Logger(CoordinatorRuleManager.class);

private static final TypeReference<Map<String, List<Rule>>> TYPE_REFERENCE =
new TypeReference<Map<String, List<Rule>>>()
{
};

private final ObjectMapper jsonMapper;
private final Supplier<TieredBrokerConfig> config;

private final AtomicReference<Map<String, List<Rule>>> rules;

private final DruidLeaderClient druidLeaderClient;

private volatile ScheduledExecutorService exec;

private final Object lock = new Object();

private volatile boolean started = false;

@GuardedBy("lock")
private ScheduledExecutorService exec;

@Inject
public CoordinatorRuleManager(
@Json ObjectMapper jsonMapper,
Expand All @@ -78,9 +86,7 @@ public CoordinatorRuleManager(
this.config = config;
this.druidLeaderClient = druidLeaderClient;

this.rules = new AtomicReference<>(
Collections.emptyMap()
);
this.rules = new AtomicReference<>(Collections.emptyMap());
}

@LifecycleStart
Expand All @@ -97,14 +103,7 @@ public void start()
exec,
new Duration(0),
config.get().getPollPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
poll();
}
}
this::poll
);

started = true;
Expand Down Expand Up @@ -132,7 +131,6 @@ public boolean isStarted()
return started;
}

@SuppressWarnings("unchecked")
public void poll()
{
try {
Expand All @@ -148,16 +146,13 @@ public void poll()
);
}

rules.set(
Collections.unmodifiableMap(jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
{
}
))
);
final Map<String, List<Rule>> map = jsonMapper.readValue(response.getContent(), TYPE_REFERENCE);
final Map<String, List<Rule>> immutableMapBuilder = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, list) -> immutableMapBuilder.put(k, Collections.unmodifiableList(list)));
rules.set(Collections.unmodifiableMap(immutableMapBuilder));
}
catch (Exception e) {
log.error(e, "Exception while polling for rules");
LOG.error(e, "Exception while polling for rules");
}
}

Expand All @@ -175,4 +170,14 @@ public List<Rule> getRulesWithDefault(final String dataSource)
}
return rulesWithDefault;
}

/**
* Returns the current snapshot of the rules.
* This method should be used for only testing.
*/
@VisibleForTesting
Map<String, List<Rule>> getRules()
{
return rules.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class TieredBrokerConfig
{
public static final String DEFAULT_COORDINATOR_SERVICE_NAME = "druid/coordinator";
public static final String DEFAULT_BROKER_SERVICE_NAME = "druid/broker";
public static final String DEFAULT_RULE_NAME = "_default";

@JsonProperty
@NotNull
Expand All @@ -46,7 +47,7 @@ public class TieredBrokerConfig

@JsonProperty
@NotNull
private String defaultRule = "_default";
private String defaultRule = DEFAULT_RULE_NAME;

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.router;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.server.coordinator.rules.ForeverDropRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.PeriodLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class CoordinatorRuleManagerTest
{
private static final String DATASOURCE1 = "datasource1";
private static final String DATASOURCE2 = "datasource2";
private static final List<Rule> DEFAULT_RULES = ImmutableList.of(
new ForeverLoadRule(ImmutableMap.of("__default", 2))
);

@org.junit.Rule
public ExpectedException expectedException = ExpectedException.none();

private final ObjectMapper objectMapper = new DefaultObjectMapper();
private final TieredBrokerConfig tieredBrokerConfig = new TieredBrokerConfig();

@Test
public void testAddingToRulesMapThrowingError()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
final Map<String, List<Rule>> rules = manager.getRules();
expectedException.expect(UnsupportedOperationException.class);
rules.put("testKey", Collections.emptyList());
}

@Test
public void testAddingToRulesListThrowingError()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
manager.poll();
final Map<String, List<Rule>> rules = manager.getRules();
expectedException.expect(UnsupportedOperationException.class);
rules.get(DATASOURCE1).add(new ForeverDropRule());
}

@Test
public void testGetRulesWithUnknownDatasourceReturningDefaultRule()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
manager.poll();
final List<Rule> rules = manager.getRulesWithDefault("unknown");
Assert.assertEquals(DEFAULT_RULES, rules);
}

@Test
public void testGetRulesWithKnownDatasourceReturningAllRulesWithDefaultRule()
{
final CoordinatorRuleManager manager = new CoordinatorRuleManager(
objectMapper,
() -> tieredBrokerConfig,
mockClient()
);
manager.poll();
final List<Rule> rules = manager.getRulesWithDefault(DATASOURCE2);
final List<Rule> expectedRules = new ArrayList<>();
expectedRules.add(new ForeverLoadRule(null));
expectedRules.add(new IntervalDropRule(Intervals.of("2020-01-01/2020-01-02")));
expectedRules.addAll(DEFAULT_RULES);
Assert.assertEquals(expectedRules, rules);
}

private DruidLeaderClient mockClient()
{
final Map<String, List<Rule>> rules = ImmutableMap.of(
DATASOURCE1,
ImmutableList.of(new ForeverLoadRule(null)),
DATASOURCE2,
ImmutableList.of(new ForeverLoadRule(null), new IntervalDropRule(Intervals.of("2020-01-01/2020-01-02"))),
"datasource3",
ImmutableList.of(
new PeriodLoadRule(new Period("P1M"), true, null),
new ForeverDropRule()
),
TieredBrokerConfig.DEFAULT_RULE_NAME,
ImmutableList.of(new ForeverLoadRule(ImmutableMap.of("__default", 2)))
);
final StringFullResponseHolder holder = EasyMock.niceMock(StringFullResponseHolder.class);
EasyMock.expect(holder.getStatus())
.andReturn(HttpResponseStatus.OK);
try {
EasyMock.expect(holder.getContent())
.andReturn(objectMapper.writeValueAsString(rules));
final DruidLeaderClient client = EasyMock.niceMock(DruidLeaderClient.class);
EasyMock.expect(client.go(EasyMock.anyObject()))
.andReturn(holder);
EasyMock.replay(holder, client);
return client;
}
catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}