From 8fc82a890a26e734cc37a750b68e70f0638d32d1 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Tue, 20 Sep 2016 13:09:07 -0500 Subject: [PATCH 1/2] handle supervisor spec metadata failures close kafka consumer in case supervisor start fails --- .../kafka/supervisor/KafkaSupervisor.java | 34 ++++++++++--------- .../supervisor/SupervisorManager.java | 21 +++++++++--- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 0c1b58eb4742..4fe08e93ea27 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -310,28 +310,30 @@ public void run() } } ); + firstRunTime = DateTime.now().plus(ioConfig.getStartDelay()); + scheduledExec.scheduleAtFixedRate( + buildRunTask(), + ioConfig.getStartDelay().getMillis(), + Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), + TimeUnit.MILLISECONDS + ); + + started = true; + log.info( + "Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", + dataSource, + ioConfig.getStartDelay(), + spec.toString() + ); } catch (Exception e) { + if (consumer != null) { + consumer.close(); + } log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource) .emit(); throw Throwables.propagate(e); } - - firstRunTime = DateTime.now().plus(ioConfig.getStartDelay()); - scheduledExec.scheduleAtFixedRate( - buildRunTask(), - ioConfig.getStartDelay().getMillis(), - Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), - TimeUnit.MILLISECONDS - ); - - started = true; - log.info( - "Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", - dataSource, - ioConfig.getStartDelay(), - spec.toString() - ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 5cf69ea40d69..72f242ad453e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -21,6 +21,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; @@ -155,7 +156,7 @@ public boolean resetSupervisor(String id) /** * Stops a supervisor with a given id and then removes it from the list. - *

+ *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be * starting and stopping supervisors. * @@ -179,7 +180,7 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write /** * Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id. - *

+ *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be * starting and stopping supervisors. * @@ -192,13 +193,23 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe return false; } - Supervisor supervisor = spec.createSupervisor(); - supervisor.start(); // try starting the supervisor first so we don't persist a bad spec - if (persistSpec) { metadataSupervisorManager.insert(id, spec); } + Supervisor supervisor = null; + try { + supervisor = spec.createSupervisor(); + supervisor.start(); + } + catch (Exception e) { + // Supervisor creation or start failed write tombstone only when trying to start a new supervisor + if (persistSpec) { + metadataSupervisorManager.insert(id, new NoopSupervisorSpec()); + } + Throwables.propagate(e); + } + supervisors.put(id, Pair.of(supervisor, spec)); return true; } From 2377eec0cc87db10abbc981420f0a92fbf77aa77 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Sat, 24 Sep 2016 23:53:54 -0500 Subject: [PATCH 2/2] secure BrokerQueryResource endpoints --- server/src/main/java/io/druid/server/BrokerQueryResource.java | 3 +++ .../io/druid/server/http/security/StateResourceFilter.java | 1 + .../server/http/security/SecurityResourceFilterTest.java | 4 +++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java index d0ac0b1097c2..f8c37daf96b2 100644 --- a/server/src/main/java/io/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.inject.Inject; import com.metamx.emitter.service.ServiceEmitter; +import com.sun.jersey.spi.container.ResourceFilters; import io.druid.client.ServerViewUtil; import io.druid.client.TimelineServerView; import io.druid.guice.annotations.Json; @@ -30,6 +31,7 @@ import io.druid.query.Query; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.http.security.StateResourceFilter; import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; import io.druid.server.security.AuthConfig; @@ -76,6 +78,7 @@ public BrokerQueryResource( @Path("/candidates") @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE}) + @ResourceFilters(StateResourceFilter.class) public Response getQueryTargets( InputStream in, @QueryParam("pretty") String pretty, diff --git a/server/src/main/java/io/druid/server/http/security/StateResourceFilter.java b/server/src/main/java/io/druid/server/http/security/StateResourceFilter.java index b4d9d40195f4..960b3c123ccb 100644 --- a/server/src/main/java/io/druid/server/http/security/StateResourceFilter.java +++ b/server/src/main/java/io/druid/server/http/security/StateResourceFilter.java @@ -92,6 +92,7 @@ public boolean isApplicable(String requestPath) requestPath.startsWith("druid/coordinator/v1/tiers") || requestPath.startsWith("druid/worker/v1") || requestPath.startsWith("druid/coordinator/v1/servers") || + requestPath.startsWith("druid/v2") || requestPath.startsWith("status"); } } diff --git a/server/src/test/java/io/druid/server/http/security/SecurityResourceFilterTest.java b/server/src/test/java/io/druid/server/http/security/SecurityResourceFilterTest.java index 4a7cd0de8258..2f7079ce8b5b 100644 --- a/server/src/test/java/io/druid/server/http/security/SecurityResourceFilterTest.java +++ b/server/src/test/java/io/druid/server/http/security/SecurityResourceFilterTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Iterables; import com.google.inject.Injector; import com.sun.jersey.spi.container.ResourceFilter; +import io.druid.server.BrokerQueryResource; import io.druid.server.ClientInfoResource; import io.druid.server.QueryResource; import io.druid.server.StatusResource; @@ -67,7 +68,8 @@ public static Collection data() getRequestPaths(ClientInfoResource.class), getRequestPaths(CoordinatorDynamicConfigsResource.class), getRequestPaths(QueryResource.class), - getRequestPaths(StatusResource.class) + getRequestPaths(StatusResource.class), + getRequestPaths(BrokerQueryResource.class) ) ); }