diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 67e86a051dac..623458fc1cc7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -181,7 +181,11 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed. private static final Joiner JOINER = Joiner.on("/"); + + @Nullable // Null, if zk is disabled private final CuratorFramework cf; + + @Nullable // Null, if zk is disabled private final ScheduledExecutorService zkCleanupExec; private final IndexerZkConfig indexerZkConfig; @@ -193,7 +197,7 @@ public HttpRemoteTaskRunner( ProvisioningStrategy provisioningStrategy, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, TaskStorage taskStorage, - CuratorFramework cf, + @Nullable CuratorFramework cf, IndexerZkConfig indexerZkConfig ) { @@ -218,12 +222,18 @@ public HttpRemoteTaskRunner( ScheduledExecutors.fixed(1, "HttpRemoteTaskRunner-Worker-Cleanup-%d") ); - this.cf = cf; + if (cf != null) { + this.cf = cf; + this.zkCleanupExec = ScheduledExecutors.fixed( + 1, + "HttpRemoteTaskRunner-zk-cleanup-%d" + ); + } else { + this.cf = null; + this.zkCleanupExec = null; + } + this.indexerZkConfig = indexerZkConfig; - this.zkCleanupExec = ScheduledExecutors.fixed( - 1, - "HttpRemoteTaskRunner-zk-cleanup-%d" - ); this.provisioningStrategy = provisioningStrategy; } @@ -270,6 +280,10 @@ public void start() private void scheduleCompletedTaskStatusCleanupFromZk() { + if (cf == null) { + return; + } + zkCleanupExec.scheduleAtFixedRate( () -> { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java index 99f6f6607dc0..6b81bb0782f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerFactory.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.inject.Inject; +import com.google.inject.Provider; import org.apache.curator.framework.CuratorFramework; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.guice.annotations.EscalatedGlobal; import org.apache.druid.guice.annotations.Smile; @@ -36,6 +38,8 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.server.initialization.IndexerZkConfig; +import javax.annotation.Nullable; + /** */ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory @@ -52,6 +56,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory cfProvider, + final IndexerZkConfig indexerZkConfig, + final ZkEnablementConfig zkEnablementConfig ) { this.smileMapper = smileMapper; @@ -77,8 +83,13 @@ public HttpRemoteTaskRunnerFactory( this.provisioningStrategy = provisioningStrategy; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.taskStorage = taskStorage; - this.cf = cf; this.indexerZkConfig = indexerZkConfig; + + if (zkEnablementConfig.isEnabled()) { + this.cf = cfProvider.get(); + } else { + this.cf = null; + } } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 3f497768eaad..19598d05dc03 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -76,7 +76,7 @@ * starts running and completed task on disk is deleted based on a periodic schedule where overlord is asked for * active tasks to see which completed tasks are safe to delete. */ -public abstract class WorkerTaskManager +public class WorkerTaskManager { private static final EmittingLogger log = new EmittingLogger(WorkerTaskManager.class); @@ -596,6 +596,12 @@ public void workerDisabled() } } + public boolean isWorkerEnabled() + { + Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), "not started"); + return !disabled.get(); + } + private static class TaskDetails { private final Task task; @@ -776,7 +782,13 @@ public void handle() //watches task assignments and updates task statuses inside Zookeeper. When the transition to HTTP is complete //in Overlord as well as MiddleManagers then WorkerTaskMonitor should be deleted, this class should no longer be abstract //and the methods below should be removed. - protected abstract void taskStarted(String taskId); + protected void taskStarted(String taskId) + { - protected abstract void taskAnnouncementChanged(TaskAnnouncement announcement); + } + + protected void taskAnnouncementChanged(TaskAnnouncement announcement) + { + + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java index 7b4b2b38f960..57046ff80a03 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java @@ -31,11 +31,10 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.hrtr.WorkerHolder; import org.apache.druid.indexing.worker.WorkerHistoryItem; -import org.apache.druid.indexing.worker.WorkerTaskMonitor; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; -import org.apache.druid.server.http.SegmentListerResource; import org.apache.druid.server.http.security.StateResourceFilter; import javax.servlet.AsyncContext; @@ -61,22 +60,22 @@ @ResourceFilters(StateResourceFilter.class) public class TaskManagementResource { - protected static final EmittingLogger log = new EmittingLogger(SegmentListerResource.class); + protected static final EmittingLogger log = new EmittingLogger(TaskManagementResource.class); protected final ObjectMapper jsonMapper; protected final ObjectMapper smileMapper; - private final WorkerTaskMonitor workerTaskMonitor; + private final WorkerTaskManager workerTaskManager; @Inject public TaskManagementResource( @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - WorkerTaskMonitor workerTaskMonitor + WorkerTaskManager workerTaskManager ) { this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; - this.workerTaskMonitor = workerTaskMonitor; + this.workerTaskManager = workerTaskManager; } /** @@ -119,7 +118,7 @@ public Void getWorkerState( final ResponseContext context = createContext(req.getHeader("Accept")); - final ListenableFuture> future = workerTaskMonitor.getChangesSince( + final ListenableFuture> future = workerTaskManager.getChangesSince( new ChangeRequestHistory.Counter( counter, hash @@ -205,7 +204,7 @@ public void onFailure(Throwable th) public Response assignTask(Task task) { try { - workerTaskMonitor.assignTask(task); + workerTaskManager.assignTask(task); return Response.ok().build(); } catch (RuntimeException ex) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java index a66a4e7f896a..06c414b008dd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java @@ -26,13 +26,15 @@ import com.google.common.collect.Lists; import com.google.common.io.ByteSource; import com.google.inject.Inject; +import com.google.inject.Provider; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; -import org.apache.druid.indexing.worker.WorkerTaskMonitor; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.http.HttpMediaType; @@ -40,6 +42,7 @@ import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.tasklogs.TaskLogStreamer; +import javax.annotation.Nullable; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -60,23 +63,32 @@ public class WorkerResource private static String DISABLED_VERSION = ""; private final Worker enabledWorker; + + @Nullable // Null, if zk is disabled private final WorkerCuratorCoordinator curatorCoordinator; + private final TaskRunner taskRunner; - private final WorkerTaskMonitor workerTaskManager; + private final WorkerTaskManager workerTaskManager; @Inject public WorkerResource( Worker worker, - WorkerCuratorCoordinator curatorCoordinator, + Provider curatorCoordinatorProvider, TaskRunner taskRunner, - WorkerTaskMonitor workerTaskManager + WorkerTaskManager workerTaskManager, + ZkEnablementConfig zkEnablementConfig ) { this.enabledWorker = worker; - this.curatorCoordinator = curatorCoordinator; this.taskRunner = taskRunner; this.workerTaskManager = workerTaskManager; + + if (zkEnablementConfig.isEnabled()) { + this.curatorCoordinator = curatorCoordinatorProvider.get(); + } else { + this.curatorCoordinator = null; + } } @@ -87,17 +99,19 @@ public WorkerResource( public Response doDisable() { try { - final Worker disabledWorker = new Worker( - enabledWorker.getScheme(), - enabledWorker.getHost(), - enabledWorker.getIp(), - enabledWorker.getCapacity(), - DISABLED_VERSION, - enabledWorker.getCategory() - ); - curatorCoordinator.updateWorkerAnnouncement(disabledWorker); + if (curatorCoordinator != null) { + final Worker disabledWorker = new Worker( + enabledWorker.getScheme(), + enabledWorker.getHost(), + enabledWorker.getIp(), + enabledWorker.getCapacity(), + DISABLED_VERSION, + enabledWorker.getCategory() + ); + curatorCoordinator.updateWorkerAnnouncement(disabledWorker); + } workerTaskManager.workerDisabled(); - return Response.ok(ImmutableMap.of(disabledWorker.getHost(), "disabled")).build(); + return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "disabled")).build(); } catch (Exception e) { return Response.serverError().build(); @@ -111,7 +125,9 @@ public Response doDisable() public Response doEnable() { try { - curatorCoordinator.updateWorkerAnnouncement(enabledWorker); + if (curatorCoordinator != null) { + curatorCoordinator.updateWorkerAnnouncement(enabledWorker); + } workerTaskManager.workerEnabled(); return Response.ok(ImmutableMap.of(enabledWorker.getHost(), "enabled")).build(); } @@ -127,9 +143,7 @@ public Response doEnable() public Response isEnabled() { try { - final Worker theWorker = curatorCoordinator.getWorker(); - final boolean enabled = !theWorker.getVersion().equalsIgnoreCase(DISABLED_VERSION); - return Response.ok(ImmutableMap.of(theWorker.getHost(), enabled)).build(); + return Response.ok(ImmutableMap.of(enabledWorker.getHost(), workerTaskManager.isWorkerEnabled())).build(); } catch (Exception e) { return Response.serverError().build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java index 3667ef74e27a..0ad900dd8c44 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java @@ -25,6 +25,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; @@ -100,9 +101,10 @@ public String getBase() workerResource = new WorkerResource( worker, - curatorCoordinator, + () -> curatorCoordinator, null, - EasyMock.createNiceMock(WorkerTaskMonitor.class) + EasyMock.createNiceMock(WorkerTaskMonitor.class), + ZkEnablementConfig.ENABLED ); } diff --git a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java index 6663e99ceb58..6ef34b11de96 100644 --- a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java +++ b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java @@ -43,6 +43,7 @@ public CliHistoricalForQueryRetryTest() } @Inject + @Override public void configure(Properties properties) { log.info("Historical is configured for testing query retry on missing segments"); diff --git a/server/src/main/java/org/apache/druid/curator/CuratorModule.java b/server/src/main/java/org/apache/druid/curator/CuratorModule.java index d5f516b898b3..093265ea6482 100644 --- a/server/src/main/java/org/apache/druid/curator/CuratorModule.java +++ b/server/src/main/java/org/apache/druid/curator/CuratorModule.java @@ -63,6 +63,7 @@ public class CuratorModule implements Module @Override public void configure(Binder binder) { + JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, ZkEnablementConfig.class); JsonConfigProvider.bind(binder, CURATOR_CONFIG_PREFIX, CuratorConfig.class); JsonConfigProvider.bind(binder, EXHIBITOR_CONFIG_PREFIX, ExhibitorConfig.class); } @@ -70,8 +71,12 @@ public void configure(Binder binder) @Provides @LazySingleton @SuppressForbidden(reason = "System#err") - public CuratorFramework makeCurator(CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle) + public CuratorFramework makeCurator(ZkEnablementConfig zkEnablementConfig, CuratorConfig config, EnsembleProvider ensembleProvider, Lifecycle lifecycle) { + if (!zkEnablementConfig.isEnabled()) { + throw new RuntimeException("Zookeeper is disabled, Can't create CuratorFramework."); + } + final CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); if (!Strings.isNullOrEmpty(config.getZkUser()) && !Strings.isNullOrEmpty(config.getZkPwd())) { builder.authorization( diff --git a/server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java b/server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java new file mode 100644 index 000000000000..8553509ecbe0 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/ZkEnablementConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.Properties; + +public class ZkEnablementConfig +{ + private static final String PROP_KEY_ENABLED = StringUtils.format("%s.enabled", CuratorModule.CURATOR_CONFIG_PREFIX); + + public static final ZkEnablementConfig ENABLED = new ZkEnablementConfig(true); + + @JsonProperty + private final boolean enabled; + + @JsonCreator + public ZkEnablementConfig(@JsonProperty("enabled") Boolean enabled) + { + this.enabled = enabled == null ? true : enabled.booleanValue(); + } + + public boolean isEnabled() + { + return enabled; + } + + public static boolean isEnabled(Properties properties) + { + String value = properties.getProperty(PROP_KEY_ENABLED); + return value == null ? true : Boolean.parseBoolean(value); + } +} diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java index 78c56691967d..384b4d8832e2 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java @@ -46,6 +46,7 @@ import org.apache.curator.x.discovery.details.ServiceCacheListener; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; @@ -66,6 +67,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -86,6 +88,14 @@ public class DiscoveryModule implements Module private static final String INTERNAL_DISCOVERY_PROP = "druid.discovery.type"; private static final String CURATOR_KEY = "curator"; + private boolean isZkEnabled = true; + + @Inject + public void configure(Properties properties) + { + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + /** * Requests that the un-annotated DruidNode instance be injected and published as part of the lifecycle. * @@ -155,9 +165,16 @@ public void configure(Binder binder) // Build the binder so that it will at a minimum inject an empty set. DruidBinders.discoveryAnnouncementBinder(binder); - binder.bind(ServiceAnnouncer.class) - .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) - .in(LazySingleton.class); + if (isZkEnabled) { + binder.bind(ServiceAnnouncer.class) + .to(Key.get(CuratorServiceAnnouncer.class, Names.named(NAME))) + .in(LazySingleton.class); + } else { + binder.bind(Key.get(ServiceAnnouncer.Noop.class, Names.named(NAME))).toInstance(new ServiceAnnouncer.Noop()); + binder.bind(ServiceAnnouncer.class) + .to(Key.get(ServiceAnnouncer.Noop.class, Names.named(NAME))) + .in(LazySingleton.class); + } // internal discovery bindings. PolyBind.createChoiceWithDefault(binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeAnnouncer.class), CURATOR_KEY); @@ -532,7 +549,7 @@ public void close() private static class DruidLeaderSelectorProvider implements Provider { @Inject - private CuratorFramework curatorFramework; + private Provider curatorFramework; @Inject @Self @@ -552,7 +569,7 @@ private static class DruidLeaderSelectorProvider implements Provider announcerProvider, + ObjectMapper jsonMapper, + ZkEnablementConfig zkEnablementConfig ) { this.config = config; - this.announcer = announcer; this.jsonMapper = jsonMapper; this.server = server; @@ -99,13 +107,28 @@ public BatchDataSegmentAnnouncer( return rv; }; - if (this.config.isSkipSegmentAnnouncementOnZk()) { + isSkipSegmentAnnouncementOnZk = !zkEnablementConfig.isEnabled() || config.isSkipSegmentAnnouncementOnZk(); + if (isSkipSegmentAnnouncementOnZk) { dummyZnode = new SegmentZNode("PLACE_HOLDER_ONLY"); + this.announcer = null; } else { dummyZnode = null; + this.announcer = announcerProvider.get(); } } + @VisibleForTesting + public BatchDataSegmentAnnouncer( + DruidServerMetadata server, + final BatchDataSegmentAnnouncerConfig config, + ZkPathsConfig zkPaths, + Announcer announcer, + ObjectMapper jsonMapper + ) + { + this(server, config, zkPaths, () -> announcer, jsonMapper, ZkEnablementConfig.ENABLED); + } + @Override public void announceSegment(DataSegment segment) throws IOException { @@ -124,7 +147,7 @@ public void announceSegment(DataSegment segment) throws IOException changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce)); - if (config.isSkipSegmentAnnouncementOnZk()) { + if (isSkipSegmentAnnouncementOnZk) { segmentLookup.put(segment, dummyZnode); return; } @@ -192,7 +215,7 @@ public void unannounceSegment(DataSegment segment) changes.addChangeRequest(new SegmentChangeRequestDrop(segment)); - if (config.isSkipSegmentAnnouncementOnZk()) { + if (isSkipSegmentAnnouncementOnZk) { return; } @@ -231,7 +254,7 @@ public void announceSegments(Iterable segments) throws IOException changesBatch.add(new SegmentChangeRequestLoad(segment)); - if (config.isSkipSegmentAnnouncementOnZk()) { + if (isSkipSegmentAnnouncementOnZk) { segmentLookup.put(segment, dummyZnode); continue; } @@ -262,7 +285,7 @@ public void announceSegments(Iterable segments) throws IOException changes.addChangeRequests(changesBatch); - if (!config.isSkipSegmentAnnouncementOnZk()) { + if (!isSkipSegmentAnnouncementOnZk) { segmentZNode.addSegments(batch); announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java index 6d1497831c4b..b7ad83cfa436 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentServerAnnouncer.java @@ -27,4 +27,20 @@ public interface DataSegmentServerAnnouncer { void announce(); void unannounce(); + + class Noop implements DataSegmentServerAnnouncer + { + + @Override + public void announce() + { + + } + + @Override + public void unannounce() + { + + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 3300e5907e7a..678f440d7e0e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; +import com.google.inject.Provider; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntMaps; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; @@ -41,6 +42,7 @@ import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.common.config.JacksonConfigManager; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.ManageLifecycle; @@ -132,7 +134,10 @@ public class DruidCoordinator private final SegmentsMetadataManager segmentsMetadataManager; private final ServerInventoryView serverInventoryView; private final MetadataRuleManager metadataRuleManager; + + @Nullable // Null if zk is disabled private final CuratorFramework curator; + private final ServiceEmitter emitter; private final IndexingServiceClient indexingServiceClient; private final ScheduledExecutorService exec; @@ -161,7 +166,7 @@ public DruidCoordinator( SegmentsMetadataManager segmentsMetadataManager, ServerInventoryView serverInventoryView, MetadataRuleManager metadataRuleManager, - CuratorFramework curator, + Provider curatorProvider, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, @@ -172,7 +177,8 @@ public DruidCoordinator( BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, @Coordinator DruidLeaderSelector coordLeaderSelector, - CompactSegments compactSegments + CompactSegments compactSegments, + ZkEnablementConfig zkEnablementConfig ) { this( @@ -182,7 +188,7 @@ public DruidCoordinator( segmentsMetadataManager, serverInventoryView, metadataRuleManager, - curator, + curatorProvider, emitter, scheduledExecutorFactory, indexingServiceClient, @@ -194,7 +200,8 @@ public DruidCoordinator( factory, lookupCoordinatorManager, coordLeaderSelector, - compactSegments + compactSegments, + zkEnablementConfig ); } @@ -205,7 +212,7 @@ public DruidCoordinator( SegmentsMetadataManager segmentsMetadataManager, ServerInventoryView serverInventoryView, MetadataRuleManager metadataRuleManager, - CuratorFramework curator, + Provider curatorProvider, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, @@ -217,7 +224,8 @@ public DruidCoordinator( BalancerStrategyFactory factory, LookupCoordinatorManager lookupCoordinatorManager, DruidLeaderSelector coordLeaderSelector, - CompactSegments compactSegments + CompactSegments compactSegments, + ZkEnablementConfig zkEnablementConfig ) { this.config = config; @@ -227,7 +235,11 @@ public DruidCoordinator( this.segmentsMetadataManager = segmentsMetadataManager; this.serverInventoryView = serverInventoryView; this.metadataRuleManager = metadataRuleManager; - this.curator = curator; + if (zkEnablementConfig.isEnabled()) { + this.curator = curatorProvider.get(); + } else { + this.curator = null; + } this.emitter = emitter; this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; @@ -472,7 +484,7 @@ public void moveSegment( () -> { try { if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) && - curator.checkExists().forPath(toLoadQueueSegPath) == null && + (curator == null || curator.checkExists().forPath(toLoadQueueSegPath) == null) && !dropPeon.getSegmentsToDrop().contains(segment)) { dropPeon.dropSegment(segment, loadPeonCallback); } else { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java index 30263094a3d8..e4f22832e9a1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Provider; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.ImmutableDruidServer; @@ -34,7 +35,7 @@ */ public class LoadQueueTaskMaster { - private final CuratorFramework curator; + private final Provider curatorFrameworkProvider; private final ObjectMapper jsonMapper; private final ScheduledExecutorService peonExec; private final ExecutorService callbackExec; @@ -43,7 +44,7 @@ public class LoadQueueTaskMaster private final ZkPathsConfig zkPaths; public LoadQueueTaskMaster( - CuratorFramework curator, + Provider curatorFrameworkProvider, ObjectMapper jsonMapper, ScheduledExecutorService peonExec, ExecutorService callbackExec, @@ -52,7 +53,7 @@ public LoadQueueTaskMaster( ZkPathsConfig zkPaths ) { - this.curator = curator; + this.curatorFrameworkProvider = curatorFrameworkProvider; this.jsonMapper = jsonMapper; this.peonExec = peonExec; this.callbackExec = callbackExec; @@ -67,7 +68,7 @@ public LoadQueuePeon giveMePeon(ImmutableDruidServer server) return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec); } else { return new CuratorLoadQueuePeon( - curator, + curatorFrameworkProvider.get(), ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()), jsonMapper, peonExec, diff --git a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java index c338854dca20..4bc48f444df1 100644 --- a/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java +++ b/server/src/main/java/org/apache/druid/server/http/HistoricalResource.java @@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.sun.jersey.spi.container.ResourceFilters; -import org.apache.druid.server.coordination.ZkCoordinator; +import org.apache.druid.server.coordination.SegmentLoadDropHandler; import org.apache.druid.server.http.security.StateResourceFilter; import javax.inject.Inject; @@ -34,14 +34,14 @@ @Path("/druid/historical/v1") public class HistoricalResource { - private final ZkCoordinator coordinator; + private final SegmentLoadDropHandler segmentLoadDropHandler; @Inject public HistoricalResource( - ZkCoordinator coordinator + SegmentLoadDropHandler segmentLoadDropHandler ) { - this.coordinator = coordinator; + this.segmentLoadDropHandler = segmentLoadDropHandler; } @GET @@ -50,14 +50,14 @@ public HistoricalResource( @Produces(MediaType.APPLICATION_JSON) public Response getLoadStatus() { - return Response.ok(ImmutableMap.of("cacheInitialized", coordinator.isStarted())).build(); + return Response.ok(ImmutableMap.of("cacheInitialized", segmentLoadDropHandler.isStarted())).build(); } @GET @Path("/readiness") public Response getReadiness() { - if (coordinator.isStarted()) { + if (segmentLoadDropHandler.isStarted()) { return Response.ok().build(); } else { return Response.status(Response.Status.SERVICE_UNAVAILABLE).build(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index c44c0d8d3fb3..dd382e31a65f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -38,6 +38,7 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.curator.CuratorUtils; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.discovery.NoopServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.jackson.DefaultObjectMapper; @@ -223,7 +224,7 @@ public String getBase() segmentsMetadataManager, baseView, metadataRuleManager, - curator, + () -> curator, new NoopServiceEmitter(), scheduledExecutorFactory, null, @@ -249,7 +250,8 @@ public void unannounce(DruidNode node) new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), - null + null, + ZkEnablementConfig.ENABLED ); } @@ -521,7 +523,7 @@ public String getBase() segmentsMetadataManager, baseView, metadataRuleManager, - curator, + () -> curator, new NoopServiceEmitter(), scheduledExecutorFactory, null, @@ -547,7 +549,8 @@ public void unannounce(DruidNode node) new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), - null + null, + ZkEnablementConfig.ENABLED ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 9d1756e27de7..34fe944f61e4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -38,6 +38,7 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.CuratorTestBase; import org.apache.druid.curator.CuratorUtils; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.curator.discovery.NoopServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.jackson.DefaultObjectMapper; @@ -188,7 +189,7 @@ public String getBase() segmentsMetadataManager, serverInventoryView, metadataRuleManager, - curator, + () -> curator, serviceEmitter, scheduledExecutorFactory, null, @@ -214,7 +215,8 @@ public void unannounce(DruidNode node) new CostBalancerStrategyFactory(), EasyMock.createNiceMock(LookupCoordinatorManager.class), new TestDruidLeaderSelector(), - null + null, + ZkEnablementConfig.ENABLED ); } @@ -692,7 +694,7 @@ public void testBalancerThreadNumber() null, null, null, - null, + () -> null, null, scheduledExecutorFactory, null, @@ -703,7 +705,8 @@ public void testBalancerThreadNumber() null, null, null, - null + null, + ZkEnablementConfig.ENABLED ); DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index bc5d840e30df..1876a28836ec 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -25,6 +25,7 @@ import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.name.Names; import io.airlift.airline.Command; @@ -255,7 +256,7 @@ public void configure(Binder binder) @Provides @LazySingleton public LoadQueueTaskMaster getLoadQueueTaskMaster( - CuratorFramework curator, + Provider curatorFrameworkProvider, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidCoordinatorConfig config, @@ -276,7 +277,7 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( } ExecutorServices.manageLifecycle(lifecycle, callBackExec); return new LoadQueueTaskMaster( - curator, + curatorFrameworkProvider, jsonMapper, factory.create(1, "Master-PeonExec--%d"), callBackExec, diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index d0c457adfd2a..815864b5be14 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -22,11 +22,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; @@ -59,6 +61,7 @@ import org.eclipse.jetty.server.Server; import java.util.List; +import java.util.Properties; @Command( name = "historical", @@ -68,11 +71,19 @@ public class CliHistorical extends ServerRunnable { private static final Logger log = new Logger(CliHistorical.class); + private boolean isZkEnabled = true; + public CliHistorical() { super(log); } + @Inject + public void configure(Properties properties) + { + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + @Override protected List getModules() { @@ -99,10 +110,13 @@ protected List getModules() binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class); Jerseys.addResource(binder, QueryResource.class); - Jerseys.addResource(binder, HistoricalResource.class); Jerseys.addResource(binder, SegmentListerResource.class); + Jerseys.addResource(binder, HistoricalResource.class); LifecycleModule.register(binder, QueryResource.class); - LifecycleModule.register(binder, ZkCoordinator.class); + + if (isZkEnabled) { + LifecycleModule.register(binder, ZkCoordinator.class); + } JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class); binder.install(new CacheModule()); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 10a701453310..54208fcfbd91 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -28,6 +28,7 @@ import io.airlift.airline.Command; import org.apache.druid.client.DruidServer; import org.apache.druid.client.DruidServerConfig; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.discovery.NodeRole; @@ -89,14 +90,21 @@ public class CliIndexer extends ServerRunnable { private static final Logger log = new Logger(CliIndexer.class); - @Inject private Properties properties; + private boolean isZkEnabled = true; public CliIndexer() { super(log); } + @Inject + public void configure(Properties properties) + { + this.properties = properties; + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + @Override protected List getModules() { @@ -133,7 +141,7 @@ public void configure(Binder binder) CliPeon.bindPeonDataSegmentHandlers(binder); CliPeon.bindRealtimeCache(binder); CliPeon.bindCoordinatorHandoffNotiferAndClient(binder); - CliMiddleManager.bindWorkerManagementClasses(binder); + CliMiddleManager.bindWorkerManagementClasses(binder, isZkEnabled); binder.bind(AppenderatorsManager.class) .to(UnifiedIndexerAppenderatorsManager.class) diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 4438a07b4e53..c6f7d45fecae 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; @@ -31,6 +32,7 @@ import io.airlift.airline.Command; import org.apache.druid.client.indexing.HttpIndexingServiceClient; import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.curator.ZkEnablementConfig; import org.apache.druid.discovery.NodeRole; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.guice.IndexingServiceFirehoseModule; @@ -54,6 +56,7 @@ import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.WorkerCuratorCoordinator; +import org.apache.druid.indexing.worker.WorkerTaskManager; import org.apache.druid.indexing.worker.WorkerTaskMonitor; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.indexing.worker.http.TaskManagementResource; @@ -74,6 +77,7 @@ import org.eclipse.jetty.server.Server; import java.util.List; +import java.util.Properties; /** * @@ -86,11 +90,19 @@ public class CliMiddleManager extends ServerRunnable { private static final Logger log = new Logger(CliMiddleManager.class); + private boolean isZkEnabled = true; + public CliMiddleManager() { super(log); } + @Inject + public void configure(Properties properties) + { + isZkEnabled = ZkEnablementConfig.isEnabled(properties); + } + @Override protected List getModules() { @@ -132,7 +144,7 @@ public void configure(Binder binder) .in(LazySingleton.class); binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); - bindWorkerManagementClasses(binder); + bindWorkerManagementClasses(binder, isZkEnabled); binder.bind(JettyServerInitializer.class) .to(MiddleManagerJettyServerInitializer.class) @@ -192,11 +204,17 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) ); } - public static void bindWorkerManagementClasses(Binder binder) + public static void bindWorkerManagementClasses(Binder binder, boolean isZkEnabled) { - binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); - binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); - LifecycleModule.register(binder, WorkerTaskMonitor.class); + if (isZkEnabled) { + binder.bind(WorkerTaskManager.class).to(WorkerTaskMonitor.class); + binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); + binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); + LifecycleModule.register(binder, WorkerTaskMonitor.class); + } else { + binder.bind(WorkerTaskManager.class).in(ManageLifecycle.class); + } + Jerseys.addResource(binder, WorkerResource.class); Jerseys.addResource(binder, TaskManagementResource.class); }