diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index b61b68349843..49942860f04e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -311,28 +311,30 @@ public void run() } } ); + firstRunTime = DateTime.now().plus(ioConfig.getStartDelay()); + scheduledExec.scheduleAtFixedRate( + buildRunTask(), + ioConfig.getStartDelay().getMillis(), + Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), + TimeUnit.MILLISECONDS + ); + + started = true; + log.info( + "Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", + dataSource, + ioConfig.getStartDelay(), + spec.toString() + ); } catch (Exception e) { + if (consumer != null) { + consumer.close(); + } log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource) .emit(); throw Throwables.propagate(e); } - - firstRunTime = DateTime.now().plus(ioConfig.getStartDelay()); - scheduledExec.scheduleAtFixedRate( - buildRunTask(), - ioConfig.getStartDelay().getMillis(), - Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), - TimeUnit.MILLISECONDS - ); - - started = true; - log.info( - "Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]", - dataSource, - ioConfig.getStartDelay(), - spec.toString() - ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 7a9931cc7429..e8916e96c0d4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -21,6 +21,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; @@ -140,7 +141,7 @@ public Optional getSupervisorStatus(String id) /** * Stops a supervisor with a given id and then removes it from the list. - *

+ *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be * starting and stopping supervisors. * @@ -164,7 +165,7 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write /** * Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id. - *

+ *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be * starting and stopping supervisors. * @@ -177,13 +178,23 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe return false; } - Supervisor supervisor = spec.createSupervisor(); - supervisor.start(); // try starting the supervisor first so we don't persist a bad spec - if (persistSpec) { metadataSupervisorManager.insert(id, spec); } + Supervisor supervisor = null; + try { + supervisor = spec.createSupervisor(); + supervisor.start(); + } + catch (Exception e) { + // Supervisor creation or start failed write tombstone only when trying to start a new supervisor + if (persistSpec) { + metadataSupervisorManager.insert(id, new NoopSupervisorSpec()); + } + Throwables.propagate(e); + } + supervisors.put(id, Pair.of(supervisor, spec)); return true; }