Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -57,13 +58,17 @@ public class CommonCacheNotifier
{
private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class);

private static final List<String> NODE_TYPES = Arrays.asList(
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER,
DruidNodeDiscoveryProvider.NODE_TYPE_MM
/**
* {@link NodeType#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
* from metadata storage.
*/
private static final List<NodeType> NODE_TYPES = Arrays.asList(
NodeType.BROKER,
NodeType.OVERLORD,
NodeType.HISTORICAL,
NodeType.PEON,
NodeType.ROUTER,
NodeType.MIDDLE_MANAGER
);

private final DruidNodeDiscoveryProvider discoveryProvider;
Expand Down Expand Up @@ -154,7 +159,7 @@ public void addUpdate(String updatedItemName, byte[] updatedItemData)
private List<ListenableFuture<StatusResponseHolder>> sendUpdate(String updatedAuthorizerPrefix, byte[] serializedUserMap)
{
List<ListenableFuture<StatusResponseHolder>> futures = new ArrayList<>();
for (String nodeType : NODE_TYPES) {
for (NodeType nodeType : NODE_TYPES) {
DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeType(nodeType);
Collection<DiscoveryDruidNode> nodes = nodeDiscovery.getAllNodes();
for (DiscoveryDruidNode node : nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
Expand Down Expand Up @@ -310,7 +310,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
new LookupNodeService(lookupTier);
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
Expand Down Expand Up @@ -255,7 +255,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
new LookupNodeService(lookupTier);
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
Expand Down Expand Up @@ -672,7 +672,7 @@ private DiscoveryDruidNode createDiscoveryDruidNode(TaskToolbox toolbox)
new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER));
return new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
Expand Down Expand Up @@ -364,7 +364,7 @@ public String getVersion(final Interval interval)
new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
Expand Down Expand Up @@ -445,7 +446,7 @@ private void taskComplete(
private void startWorkersHandling() throws InterruptedException
{
final CountDownLatch workerViewInitialized = new CountDownLatch(1);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void testFreshStart() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down Expand Up @@ -129,15 +130,15 @@ protected WorkerHolder createWorkerHolder(

DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
);

DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0")
)
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testOneStuckTaskAssignmentDoesntBlockOthers() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down Expand Up @@ -221,15 +222,15 @@ protected WorkerHolder createWorkerHolder(

DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
);

DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0")
)
Expand Down Expand Up @@ -259,7 +260,7 @@ public void testTaskRunnerRestart() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down Expand Up @@ -324,7 +325,7 @@ protected WorkerHolder createWorkerHolder(

DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 1234, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
Expand Down Expand Up @@ -404,7 +405,7 @@ public void testWorkerDisapperAndReappearBeforeItsCleanup() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down Expand Up @@ -458,7 +459,7 @@ protected WorkerHolder createWorkerHolder(

DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 1234, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
Expand Down Expand Up @@ -567,7 +568,7 @@ public void testWorkerDisapperAndReappearAfterItsCleanup() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down Expand Up @@ -621,7 +622,7 @@ protected WorkerHolder createWorkerHolder(

DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 1234, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
Expand Down Expand Up @@ -730,7 +731,7 @@ public void testMarkWorkersLazy() throws Exception
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down Expand Up @@ -788,7 +789,7 @@ protected WorkerHolder createWorkerHolder(

DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0")
)
Expand Down Expand Up @@ -832,7 +833,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0")

DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")
)
Expand Down Expand Up @@ -865,7 +866,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")

DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
new DruidNode("service", "host3", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")
)
Expand Down Expand Up @@ -1159,7 +1160,7 @@ private HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
private final ObjectMapper jsonMapper;

@Inject
public CuratorDruidNodeAnnouncer(
Announcer announcer,
ZkPathsConfig config,
@Json ObjectMapper jsonMapper
)
public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper)
{
this.announcer = announcer;
this.config = config;
Expand All @@ -59,14 +55,12 @@ public void announce(DiscoveryDruidNode discoveryDruidNode)
try {
log.info("Announcing [%s].", discoveryDruidNode);

announcer.announce(
ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
),
jsonMapper.writeValueAsBytes(discoveryDruidNode)
String path = ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType().toString(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
);
announcer.announce(path, jsonMapper.writeValueAsBytes(discoveryDruidNode));

log.info("Announced [%s].", discoveryDruidNode);
}
Expand All @@ -80,13 +74,12 @@ public void unannounce(DiscoveryDruidNode discoveryDruidNode)
{
log.info("Unannouncing [%s].", discoveryDruidNode);

announcer.unannounce(
ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
)
String path = ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType().toString(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
);
announcer.unannounce(path);

log.info("Unannounced [%s].", discoveryDruidNode);
}
Expand Down
Loading