Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,28 +310,30 @@ public void run()
}
}
);
firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);

started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
catch (Exception e) {
if (consumer != null) {
consumer.close();
}
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}

firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);

started = true;
log.info(
"Started KafkaSupervisor[%s], first run in [%s], with spec: [%s]",
dataSource,
ioConfig.getStartDelay(),
spec.toString()
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
Expand Down Expand Up @@ -155,7 +156,7 @@ public boolean resetSupervisor(String id)

/**
* Stops a supervisor with a given id and then removes it from the list.
* <p>
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
*
Expand All @@ -179,7 +180,7 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write

/**
* Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id.
* <p>
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
*
Expand All @@ -192,13 +193,23 @@ private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean pe
return false;
}

Supervisor supervisor = spec.createSupervisor();
supervisor.start(); // try starting the supervisor first so we don't persist a bad spec

if (persistSpec) {
metadataSupervisorManager.insert(id, spec);
}

Supervisor supervisor = null;
try {
supervisor = spec.createSupervisor();
supervisor.start();
}
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor
if (persistSpec) {
metadataSupervisorManager.insert(id, new NoopSupervisorSpec());
}
Throwables.propagate(e);
}

supervisors.put(id, Pair.of(supervisor, spec));
return true;
}
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/io/druid/server/BrokerQueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.client.ServerViewUtil;
import io.druid.client.TimelineServerView;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.query.Query;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.http.security.StateResourceFilter;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.AuthConfig;
Expand Down Expand Up @@ -76,6 +78,7 @@ public BrokerQueryResource(
@Path("/candidates")
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE})
@ResourceFilters(StateResourceFilter.class)
public Response getQueryTargets(
InputStream in,
@QueryParam("pretty") String pretty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public boolean isApplicable(String requestPath)
requestPath.startsWith("druid/coordinator/v1/tiers") ||
requestPath.startsWith("druid/worker/v1") ||
requestPath.startsWith("druid/coordinator/v1/servers") ||
requestPath.startsWith("druid/v2") ||
requestPath.startsWith("status");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import com.sun.jersey.spi.container.ResourceFilter;
import io.druid.server.BrokerQueryResource;
import io.druid.server.ClientInfoResource;
import io.druid.server.QueryResource;
import io.druid.server.StatusResource;
Expand Down Expand Up @@ -67,7 +68,8 @@ public static Collection<Object[]> data()
getRequestPaths(ClientInfoResource.class),
getRequestPaths(CoordinatorDynamicConfigsResource.class),
getRequestPaths(QueryResource.class),
getRequestPaths(StatusResource.class)
getRequestPaths(StatusResource.class),
getRequestPaths(BrokerQueryResource.class)
)
);
}
Expand Down