diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 0237b86512c7..1189e67a9b72 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -85,6 +85,8 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -213,7 +215,11 @@ public SystemSchema( Preconditions.checkNotNull(serverView, "serverView"); this.tableMap = ImmutableMap.of( SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper), - SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), + SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, + serverInventoryView, + authorizerMapper, + coordinatorDruidLeaderClient, + overlordDruidLeaderClient), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper), SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper) @@ -460,16 +466,22 @@ static class ServersTable extends AbstractTable implements ScannableTable private final AuthorizerMapper authorizerMapper; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; private final InventoryView serverInventoryView; + private final DruidLeaderClient coordinatorDruidLeaderClient; + private final DruidLeaderClient overlordDruidLeaderClient; public ServersTable( DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, InventoryView serverInventoryView, - AuthorizerMapper authorizerMapper + AuthorizerMapper authorizerMapper, + DruidLeaderClient coordinatorDruidLeaderClient, + DruidLeaderClient overlordDruidLeaderClient ) { this.authorizerMapper = authorizerMapper; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.serverInventoryView = serverInventoryView; + this.coordinatorDruidLeaderClient = coordinatorDruidLeaderClient; + this.overlordDruidLeaderClient = overlordDruidLeaderClient; } @Override @@ -494,6 +506,9 @@ public Enumerable scan(DataContext root) ); checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final URL leaderCoordinator = getLeaderOfNodes(coordinatorDruidLeaderClient); + final URL leaderOverlord = getLeaderOfNodes(overlordDruidLeaderClient); + final FluentIterable results = FluentIterable .from(() -> druidServers) .transform((DiscoveryDruidNode discoveryDruidNode) -> { @@ -514,12 +529,50 @@ public Enumerable scan(DataContext root) return buildRowForNonDataServer(discoveryDruidNode); } } else { - return buildRowForNonDataServer(discoveryDruidNode); + switch (discoveryDruidNode.getNodeRole()) { + case COORDINATOR: + return buildRowForLeader(discoveryDruidNode, leaderCoordinator); + case OVERLORD: + return buildRowForLeader(discoveryDruidNode, leaderOverlord); + default: + return buildRowForNonDataServer(discoveryDruidNode); + } } }); return Linq4j.asEnumerable(results); } + private URL getLeaderOfNodes(DruidLeaderClient leaderClient) + { + try { + return new URL(leaderClient.findCurrentLeader()); + } + catch (MalformedURLException | ISE ignored) { + return null; + } + } + + /** + * Returns a row for nodes which have a leader. + * The leader of the nodes is marked in the 'tier' field + */ + private static Object[] buildRowForLeader(DiscoveryDruidNode discoveryDruidNode, URL leader) + { + final DruidNode node = discoveryDruidNode.getDruidNode(); + final boolean isLeader = leader == null ? + false : node.getPortToUse() == leader.getPort() && node.getHost().equals(leader.getHost()); + return new Object[]{ + node.getHostAndPortToUse(), + node.getHost(), + (long) node.getPlaintextPort(), + (long) node.getTlsPort(), + StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()), + isLeader ? "leader" : null, + UNKNOWN_SIZE, + UNKNOWN_SIZE + }; + } + /** * Returns a row for all node types which don't serve data. The returned row contains only static information. */ diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a6db177116d8..8d5cb35157d9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -370,12 +370,24 @@ public Authorizer getAuthorizer(String name) ImmutableMap.of() ); + private final DiscoveryDruidNode coordinator2 = new DiscoveryDruidNode( + new DruidNode("s1", "localhost", false, 8082, null, true, false), + NodeRole.COORDINATOR, + ImmutableMap.of() + ); + private final DiscoveryDruidNode overlord = new DiscoveryDruidNode( new DruidNode("s2", "localhost", false, 8090, null, true, false), NodeRole.OVERLORD, ImmutableMap.of() ); + private final DiscoveryDruidNode overlord2 = new DiscoveryDruidNode( + new DruidNode("s2", "localhost", false, 8091, null, true, false), + NodeRole.OVERLORD, + ImmutableMap.of() + ); + private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode( new DruidNode("s3", "localhost", false, 8082, null, true, false), NodeRole.BROKER, @@ -708,12 +720,15 @@ private void verifyRow( @Test public void testServersTable() { - + DruidLeaderClient coordinatorClient = EasyMock.createMock(DruidLeaderClient.class); + DruidLeaderClient overlordClient = EasyMock.createMock(DruidLeaderClient.class); SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class) .withConstructor( druidNodeDiscoveryProvider, serverInventoryView, - authMapper + authMapper, + coordinatorClient, + overlordClient ) .createMock(); EasyMock.replay(serversTable); @@ -746,8 +761,8 @@ public void testServersTable() .once(); EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(peonNodeDiscovery).once(); - EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once(); - EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once(); + EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator, coordinator2)).once(); + EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord, overlord2)).once(); EasyMock.expect(brokerNodeDiscovery.getAllNodes()) .andReturn(ImmutableList.of(broker1, broker2, brokerWithBroadcastSegments)) .once(); @@ -759,6 +774,14 @@ public void testServersTable() EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once(); EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once(); + // set leaders + EasyMock.expect(coordinatorClient.findCurrentLeader()) + .andReturn(coordinator.getDruidNode().getUriToUse().toString()) + .once(); + EasyMock.expect(overlordClient.findCurrentLeader()) + .andReturn(overlord.getDruidNode().getUriToUse().toString()) + .once(); + final List servers = new ArrayList<>(); servers.add(mockDataServer(historical1.getDruidNode().getHostAndPortToUse(), 200L, 1000L, "tier")); servers.add(mockDataServer(historical2.getDruidNode().getHostAndPortToUse(), 400L, 1000L, "tier")); @@ -782,7 +805,9 @@ public void testServersTable() historicalNodeDiscovery, mmNodeDiscovery, peonNodeDiscovery, - indexerNodeDiscovery + indexerNodeDiscovery, + coordinatorClient, + overlordClient ); DataContext dataContext = new DataContext() @@ -850,6 +875,8 @@ public Object get(String name) createExpectedRow("lameHost:8083", "lameHost", 8083, -1, NodeRole.HISTORICAL, "tier", 0L, 1000L) ); expectedRows.add(createExpectedRow("localhost:8080", "localhost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L)); + + // coordinator expectedRows.add( createExpectedRow( "localhost:8081", @@ -857,11 +884,25 @@ public Object get(String name) 8081, -1, NodeRole.COORDINATOR, + "leader", + 0L, + 0L + ) + ); + expectedRows.add( + createExpectedRow( + "localhost:8082", + "localhost", + 8082, + -1, + NodeRole.COORDINATOR, null, 0L, 0L ) ); + + //broker expectedRows.add( createExpectedRow( "localhost:8082", @@ -874,9 +915,13 @@ public Object get(String name) 0L ) ); + + //historical expectedRows.add( createExpectedRow("localhost:8083", "localhost", 8083, -1, NodeRole.HISTORICAL, "tier", 200L, 1000L) ); + + //overlord expectedRows.add( createExpectedRow( "localhost:8090", @@ -884,11 +929,25 @@ public Object get(String name) 8090, -1, NodeRole.OVERLORD, + "leader", + 0L, + 0L + ) + ); + expectedRows.add( + createExpectedRow( + "localhost:8091", + "localhost", + 8091, + -1, + NodeRole.OVERLORD, null, 0L, 0L ) ); + + //router expectedRows.add( createExpectedRow( "localhost:8888", @@ -901,6 +960,8 @@ public Object get(String name) 0L ) ); + + //middle manager expectedRows.add( createExpectedRow( "mmHost:8091", diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index d8088ae7ed87..6333aa1378f8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -1014,6 +1014,7 @@ NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDI throw new UnsupportedOperationException(); } ); + druidLeaderClient.start(); return new SystemSchema( druidSchema,