Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|

### Metadata Retrieval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
*/
package io.druid.server.coordinator;

import org.joda.time.DateTime;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListeningExecutorService;

import java.io.Closeable;

public interface BalancerStrategyFactory extends Closeable
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class),
})
public interface BalancerStrategyFactory
{
public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp);
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,12 @@
package io.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.joda.time.DateTime;

import java.io.IOException;
import java.util.concurrent.Executors;

public class CostBalancerStrategyFactory implements BalancerStrategyFactory
{
private final ListeningExecutorService exec;

public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount)
{
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount));
}

@Override
public CostBalancerStrategy createBalancerStrategy(DateTime referenceTimestamp)
public CostBalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new CostBalancerStrategy(exec);
}

@Override
public void close() throws IOException
{
exec.shutdownNow();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.timeline.DataSegment;

public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
{
public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec)
{
super(exec);
}

/**
* Averages the cost obtained from CostBalancerStrategy. Also the costs are weighted according to their usage ratios.
* This ensures that all the hosts will have the same % disk utilization.
*/
@Override
protected double computeCost(
final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer
)
{
double cost = super.computeCost(proposalSegment, server, includeCurrentServer);

if(cost == Double.POSITIVE_INFINITY){
return cost;
}

int nSegments = 1;
if(server.getServer().getSegments().size() > 0)
{
nSegments = server.getServer().getSegments().size();
}

double normalizedCost = cost/nSegments;
double usageRatio = (double)server.getServer().getCurrSize()/(double)server.getServer().getMaxSize();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the server's currSize is 0, this will zero out the cost, likely not what we want.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currSize = 0 , will ensure that the host has the lowest cost and is assigned the segments


return normalizedCost*usageRatio;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this change from the initial version of this PR? I don't remember anything being tied to number of segments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it did change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why this is better than the previous one? I guess I'm not sure I fully understand normalization, a comment might be worthwhile.

Copy link
Copy Markdown
Member

@xvrl xvrl Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump, @niketh can you explain why normalizedCost * usageRatio is be better than simply using cost * usageRatio to achieve the desired distribution?

Copy link
Copy Markdown
Contributor Author

@niketh niketh Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl @cheddar The above cost function is better that cost*usage ratio because the cost is directly proportional to number of segments. If the number of segments is more, the cost will be more. Disk Normalized cost ensures that the cost does not increase just because there are more segments. Further Normalized cost * usage ratio will ensure that the host which has a higher percentage of free space is preferred over host with lesser percentage .I didn't add a reply here since i added a comment in the code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, technically cost is not proportional to the number of segments, but if your segments all cover the same interval length, using that approximation (essentially multiplying your cost by the average segment size) might work.

I've thought about it a little bit more and I was thinking a more consistent approach could be to add a "weight" factor to the existing cost balancer, where we assigns each segment a weight proportional to the size of the segment. Essentially if we just added two parameters double weightA, double weight B to computeJointSegmentsCost(final DataSegment segmentA, final DataSegment segmentB) and expand the multiplier to be multiplier = multiplier * weightA * weightB, then you should only have to divide the resulting cost by the server maxSize to achieve your objective.

I don't want to hold up this PR any longer, but how hard would it be to try this out and see if you get similar results in your setup? As long as we can commit to trying it out, I think we can merge what we have right now, so we can get the refactoring in.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl We can merge this now and I will try your suggestion this week and will share the results that we see on our cluster

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niketh 👍

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;

import com.google.common.util.concurrent.ListeningExecutorService;

public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory
{
@Override
public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
{
return new DiskNormalizedCostBalancerStrategy(exec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -84,6 +86,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -129,7 +132,7 @@ public Interval apply(DataSegment segment)
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;

private final BalancerStrategyFactory factory;

@Inject
public DruidCoordinator(
Expand All @@ -146,7 +149,8 @@ public DruidCoordinator(
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self,
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers
@CoordinatorIndexingServiceHelper Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory
)
{
this(
Expand All @@ -164,7 +168,8 @@ public DruidCoordinator(
serviceAnnouncer,
self,
Maps.<String, LoadQueuePeon>newConcurrentMap(),
indexingServiceHelpers
indexingServiceHelpers,
factory
);
}

Expand All @@ -183,7 +188,8 @@ public DruidCoordinator(
ServiceAnnouncer serviceAnnouncer,
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<DruidCoordinatorHelper> indexingServiceHelpers
Set<DruidCoordinatorHelper> indexingServiceHelpers,
BalancerStrategyFactory factory
)
{
this.config = config;
Expand All @@ -205,6 +211,7 @@ public DruidCoordinator(

this.leaderLatch = new AtomicReference<>(null);
this.loadManagementPeons = loadQueuePeonMap;
this.factory = factory;
}

public boolean isLeader()
Expand Down Expand Up @@ -664,6 +671,7 @@ protected CoordinatorRunnable(List<DruidCoordinatorHelper> helpers, final int st
@Override
public void run()
{
ListeningExecutorService balancerExec = null;
try {
synchronized (lock) {
final LeaderLatch latch = leaderLatch.get();
Expand All @@ -686,27 +694,32 @@ public void run()
}
}

try (BalancerStrategyFactory factory =
new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) {
// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategyFactory(factory)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
balancerExec = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(getDynamicConfigs().getBalancerComputeThreads()));
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);

// Do coordinator stuff.
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDatasources(metadataSegmentManager.getInventory())
.withDynamicConfigs(getDynamicConfigs())
.withEmitter(emitter)
.withBalancerStrategy(balancerStrategy)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (leader && startingLeaderCounter == leaderCounter) {
params = helper.run(params);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
} finally {
if(balancerExec != null){
balancerExec.shutdownNow();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategyFactory strategyFactory;
private final BalancerStrategy balancerStrategy;

public DruidCoordinatorRuntimeParams(
long startTime,
Expand All @@ -63,7 +63,7 @@ public DruidCoordinatorRuntimeParams(
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
BalancerStrategy balancerStrategy
)
{
this.startTime = startTime;
Expand All @@ -78,7 +78,7 @@ public DruidCoordinatorRuntimeParams(
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory = strategyFactory;
this.balancerStrategy = balancerStrategy;
}

public long getStartTime()
Expand Down Expand Up @@ -141,9 +141,9 @@ public DateTime getBalancerReferenceTimestamp()
return balancerReferenceTimestamp;
}

public BalancerStrategyFactory getBalancerStrategyFactory()
public BalancerStrategy getBalancerStrategy()
{
return strategyFactory;
return balancerStrategy;
}

public boolean hasDeletionWaitTimeElapsed()
Expand Down Expand Up @@ -171,7 +171,7 @@ public Builder buildFromExisting()
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}

Expand All @@ -190,7 +190,7 @@ public Builder buildFromExistingWithoutAvailableSegments()
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}

Expand All @@ -208,7 +208,7 @@ public static class Builder
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategyFactory strategyFactory;
private BalancerStrategy balancerStrategy;

Builder()
{
Expand Down Expand Up @@ -239,7 +239,7 @@ public static class Builder
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategyFactory strategyFactory
BalancerStrategy balancerStrategy
)
{
this.startTime = startTime;
Expand All @@ -254,7 +254,7 @@ public static class Builder
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.strategyFactory=strategyFactory;
this.balancerStrategy=balancerStrategy;
}

public DruidCoordinatorRuntimeParams build()
Expand All @@ -272,7 +272,7 @@ public DruidCoordinatorRuntimeParams build()
coordinatorDynamicConfig,
stats,
balancerReferenceTimestamp,
strategyFactory
balancerStrategy
);
}

Expand Down Expand Up @@ -348,9 +348,9 @@ public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestam
return this;
}

public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory)
public Builder withBalancerStrategy(BalancerStrategy balancerStrategy)
{
this.strategyFactory=strategyFactory;
this.balancerStrategy=balancerStrategy;
return this;
}
}
Expand Down
Loading