From 3b37c3dd9db245d42a98d63d3c7515caa23e22f7 Mon Sep 17 00:00:00 2001 From: frank chen Date: Tue, 22 Sep 2020 14:14:21 +0800 Subject: [PATCH 1/3] show leader of coordinators/overlords in the Services view --- .../sql/calcite/schema/SystemSchema.java | 58 ++++++++++++++- .../sql/calcite/schema/SystemSchemaTest.java | 71 +++++++++++++++++-- 2 files changed, 121 insertions(+), 8 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 0237b86512c7..e823f0195a59 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -85,6 +85,8 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -213,7 +215,11 @@ public SystemSchema( Preconditions.checkNotNull(serverView, "serverView"); this.tableMap = ImmutableMap.of( SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper), - SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), + SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, + serverInventoryView, + authorizerMapper, + coordinatorDruidLeaderClient, + overlordDruidLeaderClient), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper), SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper) @@ -460,16 +466,22 @@ static class ServersTable extends AbstractTable implements ScannableTable private final AuthorizerMapper authorizerMapper; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; private final InventoryView serverInventoryView; + private final DruidLeaderClient coordinatorDruidLeaderClient; + private final DruidLeaderClient overlordDruidLeaderClient; public ServersTable( DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, InventoryView serverInventoryView, - AuthorizerMapper authorizerMapper + AuthorizerMapper authorizerMapper, + DruidLeaderClient coordinatorDruidLeaderClient, + DruidLeaderClient overlordDruidLeaderClient ) { this.authorizerMapper = authorizerMapper; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.serverInventoryView = serverInventoryView; + this.coordinatorDruidLeaderClient = coordinatorDruidLeaderClient; + this.overlordDruidLeaderClient = overlordDruidLeaderClient; } @Override @@ -494,6 +506,9 @@ public Enumerable scan(DataContext root) ); checkStateReadAccessForServers(authenticationResult, authorizerMapper); + final URL leaderCoordinator = getLeaderOfNodes(coordinatorDruidLeaderClient); + final URL leaderOverlord = getLeaderOfNodes(overlordDruidLeaderClient); + final FluentIterable results = FluentIterable .from(() -> druidServers) .transform((DiscoveryDruidNode discoveryDruidNode) -> { @@ -514,12 +529,49 @@ public Enumerable scan(DataContext root) return buildRowForNonDataServer(discoveryDruidNode); } } else { - return buildRowForNonDataServer(discoveryDruidNode); + switch (discoveryDruidNode.getNodeRole()) { + case COORDINATOR: + return buildRowForLeader(discoveryDruidNode, leaderCoordinator); + case OVERLORD: + return buildRowForLeader(discoveryDruidNode, leaderOverlord); + default: + return buildRowForNonDataServer(discoveryDruidNode); + } } }); return Linq4j.asEnumerable(results); } + private URL getLeaderOfNodes(DruidLeaderClient leaderClient) + { + try { + return new URL(leaderClient.findCurrentLeader()); + } + catch (MalformedURLException | ISE ignored) { + return null; + } + } + + /** + * Returns a row for nodes which have a leader. + * The leader of the nodes is marked in the 'tier' field + */ + private static Object[] buildRowForLeader(DiscoveryDruidNode discoveryDruidNode, URL leader) + { + final DruidNode node = discoveryDruidNode.getDruidNode(); + final boolean isLeader = node.getHost().equals(leader.getHost()) && node.getPortToUse() == leader.getPort(); + return new Object[]{ + node.getHostAndPortToUse(), + node.getHost(), + (long) node.getPlaintextPort(), + (long) node.getTlsPort(), + StringUtils.toLowerCase(discoveryDruidNode.getNodeRole().toString()), + isLeader ? "leader" : null, + UNKNOWN_SIZE, + UNKNOWN_SIZE + }; + } + /** * Returns a row for all node types which don't serve data. The returned row contains only static information. */ diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a6db177116d8..8d5cb35157d9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -370,12 +370,24 @@ public Authorizer getAuthorizer(String name) ImmutableMap.of() ); + private final DiscoveryDruidNode coordinator2 = new DiscoveryDruidNode( + new DruidNode("s1", "localhost", false, 8082, null, true, false), + NodeRole.COORDINATOR, + ImmutableMap.of() + ); + private final DiscoveryDruidNode overlord = new DiscoveryDruidNode( new DruidNode("s2", "localhost", false, 8090, null, true, false), NodeRole.OVERLORD, ImmutableMap.of() ); + private final DiscoveryDruidNode overlord2 = new DiscoveryDruidNode( + new DruidNode("s2", "localhost", false, 8091, null, true, false), + NodeRole.OVERLORD, + ImmutableMap.of() + ); + private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode( new DruidNode("s3", "localhost", false, 8082, null, true, false), NodeRole.BROKER, @@ -708,12 +720,15 @@ private void verifyRow( @Test public void testServersTable() { - + DruidLeaderClient coordinatorClient = EasyMock.createMock(DruidLeaderClient.class); + DruidLeaderClient overlordClient = EasyMock.createMock(DruidLeaderClient.class); SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class) .withConstructor( druidNodeDiscoveryProvider, serverInventoryView, - authMapper + authMapper, + coordinatorClient, + overlordClient ) .createMock(); EasyMock.replay(serversTable); @@ -746,8 +761,8 @@ public void testServersTable() .once(); EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(peonNodeDiscovery).once(); - EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once(); - EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once(); + EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator, coordinator2)).once(); + EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord, overlord2)).once(); EasyMock.expect(brokerNodeDiscovery.getAllNodes()) .andReturn(ImmutableList.of(broker1, broker2, brokerWithBroadcastSegments)) .once(); @@ -759,6 +774,14 @@ public void testServersTable() EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once(); EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once(); + // set leaders + EasyMock.expect(coordinatorClient.findCurrentLeader()) + .andReturn(coordinator.getDruidNode().getUriToUse().toString()) + .once(); + EasyMock.expect(overlordClient.findCurrentLeader()) + .andReturn(overlord.getDruidNode().getUriToUse().toString()) + .once(); + final List servers = new ArrayList<>(); servers.add(mockDataServer(historical1.getDruidNode().getHostAndPortToUse(), 200L, 1000L, "tier")); servers.add(mockDataServer(historical2.getDruidNode().getHostAndPortToUse(), 400L, 1000L, "tier")); @@ -782,7 +805,9 @@ public void testServersTable() historicalNodeDiscovery, mmNodeDiscovery, peonNodeDiscovery, - indexerNodeDiscovery + indexerNodeDiscovery, + coordinatorClient, + overlordClient ); DataContext dataContext = new DataContext() @@ -850,6 +875,8 @@ public Object get(String name) createExpectedRow("lameHost:8083", "lameHost", 8083, -1, NodeRole.HISTORICAL, "tier", 0L, 1000L) ); expectedRows.add(createExpectedRow("localhost:8080", "localhost", 8080, -1, NodeRole.PEON, "tier", 0L, 1000L)); + + // coordinator expectedRows.add( createExpectedRow( "localhost:8081", @@ -857,11 +884,25 @@ public Object get(String name) 8081, -1, NodeRole.COORDINATOR, + "leader", + 0L, + 0L + ) + ); + expectedRows.add( + createExpectedRow( + "localhost:8082", + "localhost", + 8082, + -1, + NodeRole.COORDINATOR, null, 0L, 0L ) ); + + //broker expectedRows.add( createExpectedRow( "localhost:8082", @@ -874,9 +915,13 @@ public Object get(String name) 0L ) ); + + //historical expectedRows.add( createExpectedRow("localhost:8083", "localhost", 8083, -1, NodeRole.HISTORICAL, "tier", 200L, 1000L) ); + + //overlord expectedRows.add( createExpectedRow( "localhost:8090", @@ -884,11 +929,25 @@ public Object get(String name) 8090, -1, NodeRole.OVERLORD, + "leader", + 0L, + 0L + ) + ); + expectedRows.add( + createExpectedRow( + "localhost:8091", + "localhost", + 8091, + -1, + NodeRole.OVERLORD, null, 0L, 0L ) ); + + //router expectedRows.add( createExpectedRow( "localhost:8888", @@ -901,6 +960,8 @@ public Object get(String name) 0L ) ); + + //middle manager expectedRows.add( createExpectedRow( "mmHost:8091", From ff235c877fe887b2daf9324e625de883aabf2f81 Mon Sep 17 00:00:00 2001 From: frank chen Date: Tue, 22 Sep 2020 14:30:00 +0800 Subject: [PATCH 2/3] avoid NPE --- .../java/org/apache/druid/sql/calcite/schema/SystemSchema.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index e823f0195a59..1189e67a9b72 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -559,7 +559,8 @@ private URL getLeaderOfNodes(DruidLeaderClient leaderClient) private static Object[] buildRowForLeader(DiscoveryDruidNode discoveryDruidNode, URL leader) { final DruidNode node = discoveryDruidNode.getDruidNode(); - final boolean isLeader = node.getHost().equals(leader.getHost()) && node.getPortToUse() == leader.getPort(); + final boolean isLeader = leader == null ? + false : node.getPortToUse() == leader.getPort() && node.getHost().equals(leader.getHost()); return new Object[]{ node.getHostAndPortToUse(), node.getHost(), From bd3b967a92b68ee9057f4226c9065f2540e2b7a2 Mon Sep 17 00:00:00 2001 From: frank chen Date: Tue, 29 Sep 2020 18:26:40 +0800 Subject: [PATCH 3/3] fix failure of unit test cases --- .../java/org/apache/druid/sql/calcite/util/CalciteTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index d8088ae7ed87..6333aa1378f8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -1014,6 +1014,7 @@ NodeRole.COORDINATOR, new FakeDruidNodeDiscovery(ImmutableMap.of(NodeRole.COORDI throw new UnsupportedOperationException(); } ); + druidLeaderClient.start(); return new SystemSchema( druidSchema,