Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -213,7 +215,11 @@ public SystemSchema(
Preconditions.checkNotNull(serverView, "serverView");
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider,
serverInventoryView,
authorizerMapper,
coordinatorDruidLeaderClient,
overlordDruidLeaderClient),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper),
SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper)
Expand Down Expand Up @@ -460,16 +466,22 @@ static class ServersTable extends AbstractTable implements ScannableTable
private final AuthorizerMapper authorizerMapper;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final InventoryView serverInventoryView;
private final DruidLeaderClient coordinatorDruidLeaderClient;
private final DruidLeaderClient overlordDruidLeaderClient;

public ServersTable(
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
InventoryView serverInventoryView,
AuthorizerMapper authorizerMapper
AuthorizerMapper authorizerMapper,
DruidLeaderClient coordinatorDruidLeaderClient,
DruidLeaderClient overlordDruidLeaderClient
)
{
this.authorizerMapper = authorizerMapper;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.serverInventoryView = serverInventoryView;
this.coordinatorDruidLeaderClient = coordinatorDruidLeaderClient;
this.overlordDruidLeaderClient = overlordDruidLeaderClient;
}

@Override
Expand All @@ -494,6 +506,9 @@ public Enumerable<Object[]> scan(DataContext root)
);
checkStateReadAccessForServers(authenticationResult, authorizerMapper);

final URL leaderCoordinator = getLeaderOfNodes(coordinatorDruidLeaderClient);
final URL leaderOverlord = getLeaderOfNodes(overlordDruidLeaderClient);

final FluentIterable<Object[]> results = FluentIterable
.from(() -> druidServers)
.transform((DiscoveryDruidNode discoveryDruidNode) -> {
Expand All @@ -514,12 +529,50 @@ public Enumerable<Object[]> scan(DataContext root)
return buildRowForNonDataServer(discoveryDruidNode);
}
} else {
return buildRowForNonDataServer(discoveryDruidNode);
switch (discoveryDruidNode.getNodeRole()) {
case COORDINATOR:
return buildRowForLeader(discoveryDruidNode, leaderCoordinator);
case OVERLORD:
return buildRowForLeader(discoveryDruidNode, leaderOverlord);
default:
return buildRowForNonDataServer(discoveryDruidNode);
}
}
});
return Linq4j.asEnumerable(results);
}

private URL getLeaderOfNodes(DruidLeaderClient leaderClient)
{
try {
return new URL(leaderClient.findCurrentLeader());
}
catch (MalformedURLException | ISE ignored) {
return null;
}
}

/**
* Returns a row for nodes which have a leader.
* The leader of the nodes is marked in the 'tier' field
*/
private static Object[] buildRowForLeader(DiscoveryDruidNode discoveryDruidNode, URL leader)
{
final DruidNode node = discoveryDruidNode.getDruidNode();
final boolean isLeader = leader == null ?
false : node.getPortToUse() == leader.getPort() && node.getHost().equals(leader.getHost());
return new Object[]{
node.getHostAndPortToUse(),
node.getHost(),
(long) node.getPlaintextPort(),
(long) node.getTlsPort(),
StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()),
isLeader ? "leader" : null,
UNKNOWN_SIZE,
UNKNOWN_SIZE
};
}

/**
* Returns a row for all node types which don't serve data. The returned row contains only static information.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,24 @@ public Authorizer getAuthorizer(String name)
ImmutableMap.of()
);

private final DiscoveryDruidNode coordinator2 = new DiscoveryDruidNode(
new DruidNode("s1", "localhost", false, 8082, null, true, false),
NodeRole.COORDINATOR,
ImmutableMap.of()
);

private final DiscoveryDruidNode overlord = new DiscoveryDruidNode(
new DruidNode("s2", "localhost", false, 8090, null, true, false),
NodeRole.OVERLORD,
ImmutableMap.of()
);

private final DiscoveryDruidNode overlord2 = new DiscoveryDruidNode(
new DruidNode("s2", "localhost", false, 8091, null, true, false),
NodeRole.OVERLORD,
ImmutableMap.of()
);

private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode(
new DruidNode("s3", "localhost", false, 8082, null, true, false),
NodeRole.BROKER,
Expand Down Expand Up @@ -708,12 +720,15 @@ private void verifyRow(
@Test
public void testServersTable()
{

DruidLeaderClient coordinatorClient = EasyMock.createMock(DruidLeaderClient.class);
DruidLeaderClient overlordClient = EasyMock.createMock(DruidLeaderClient.class);
SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class)
.withConstructor(
druidNodeDiscoveryProvider,
serverInventoryView,
authMapper
authMapper,
coordinatorClient,
overlordClient
)
.createMock();
EasyMock.replay(serversTable);
Expand Down Expand Up @@ -746,8 +761,8 @@ public void testServersTable()
.once();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(peonNodeDiscovery).once();

EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once();
EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator, coordinator2)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord, overlord2)).once();
EasyMock.expect(brokerNodeDiscovery.getAllNodes())
.andReturn(ImmutableList.of(broker1, broker2, brokerWithBroadcastSegments))
.once();
Expand All @@ -759,6 +774,14 @@ public void testServersTable()
EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once();
EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once();

// set leaders
EasyMock.expect(coordinatorClient.findCurrentLeader())
.andReturn(coordinator.getDruidNode().getUriToUse().toString())
.once();
EasyMock.expect(overlordClient.findCurrentLeader())
.andReturn(overlord.getDruidNode().getUriToUse().toString())
.once();

final List<DruidServer> servers = new ArrayList<>();
servers.add(mockDataServer(historical1.getDruidNode().getHostAndPortToUse(), 200L, 1000L, "tier"));
servers.add(mockDataServer(historical2.getDruidNode().getHostAndPortToUse(), 400L, 1000L, "tier"));
Expand All @@ -782,7 +805,9 @@ public void testServersTable()
historicalNodeDiscovery,
mmNodeDiscovery,
peonNodeDiscovery,
indexerNodeDiscovery
indexerNodeDiscovery,
coordinatorClient,
overlordClient
);

DataContext dataContext = new DataContext()
Expand Down Expand Up @@ -850,18 +875,34 @@ public Object get(String name)
createExpectedRow("lameHost:8083", "lameHost", 8083, -1, NodeRole.HISTORICAL, "tier", 0L, 1000L)
);
expectedRows.add(createExpectedRow("localhost:8080", "localhost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L));

// coordinator
expectedRows.add(
createExpectedRow(
"localhost:8081",
"localhost",
8081,
-1,
NodeRole.COORDINATOR,
"leader",
0L,
0L
)
);
expectedRows.add(
createExpectedRow(
"localhost:8082",
"localhost",
8082,
-1,
NodeRole.COORDINATOR,
null,
0L,
0L
)
);

//broker
expectedRows.add(
createExpectedRow(
"localhost:8082",
Expand All @@ -874,21 +915,39 @@ public Object get(String name)
0L
)
);

//historical
expectedRows.add(
createExpectedRow("localhost:8083", "localhost", 8083, -1, NodeRole.HISTORICAL, "tier", 200L, 1000L)
);

//overlord
expectedRows.add(
createExpectedRow(
"localhost:8090",
"localhost",
8090,
-1,
NodeRole.OVERLORD,
"leader",
0L,
0L
)
);
expectedRows.add(
createExpectedRow(
"localhost:8091",
"localhost",
8091,
-1,
NodeRole.OVERLORD,
null,
0L,
0L
)
);

//router
expectedRows.add(
createExpectedRow(
"localhost:8888",
Expand All @@ -901,6 +960,8 @@ public Object get(String name)
0L
)
);

//middle manager
expectedRows.add(
createExpectedRow(
"mmHost:8091",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDI
throw new UnsupportedOperationException();
}
);
druidLeaderClient.start();

return new SystemSchema(
druidSchema,
Expand Down