Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -672,58 +672,63 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
super(
ImmutableList.of(
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
params -> {
// Display info about all historical servers
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(DruidServer::segmentReplicatable)
.transform(DruidServer::toImmutableDruidServer);

if (log.isDebugEnabled()) {
log.debug("Servers");
for (ImmutableDruidServer druidServer : servers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
new DruidCoordinatorHelper()
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Display info about all historical servers
Iterable<ImmutableDruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(DruidServer::segmentReplicatable)
.transform(DruidServer::toImmutableDruidServer);

if (log.isDebugEnabled()) {
log.debug("Servers");
for (ImmutableDruidServer druidServer : servers) {
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
}
}

// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
loadQueuePeon.start();
log.info("Created LoadQueuePeon for server[%s].", server.getName());
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
loadQueuePeon.start();
log.info("Created LoadQueuePeon for server[%s].", server.getName());

loadManagementPeons.put(server.getName(), loadQueuePeon);
loadManagementPeons.put(server.getName(), loadQueuePeon);
}

cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
}

cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
}
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);

segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
}
for (String name : disappeared) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
}

// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
for (ImmutableDruidServer server : servers) {
disappeared.remove(server.getName());
return params.buildFromExisting()
.withDruidCluster(cluster)
.withDatabaseRuleManager(metadataRuleManager)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
}
for (String name : disappeared) {
log.info("Removing listener for server[%s] which is no longer there.", name);
LoadQueuePeon peon = loadManagementPeons.remove(name);
peon.stop();
}

return params.buildFromExisting()
.withDruidCluster(cluster)
.withDatabaseRuleManager(metadataRuleManager)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
.build();
},
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
Expand Down