Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,25 @@
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>default-tools.jar</id>
<activation>
<property>
<name>java.vendor</name>
<value>Oracle Corporation</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.sun</groupId>
<artifactId>tools</artifactId>
<version>${java.version}}</version>
<scope>system</scope>
<systemPath>${java.home}/../lib/tools.jar</systemPath>
</dependency>
</dependencies>
</profile>
</profiles>

</project>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
Expand All @@ -39,6 +41,7 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
private final ObjectMapper jsonMapper;
private final TaskLogPusher persistentTaskLogs;
private final DruidNode node;
private final HttpClient httpClient;

@Inject
public ForkingTaskRunnerFactory(
Expand All @@ -48,7 +51,8 @@ public ForkingTaskRunnerFactory(
final Properties props,
final ObjectMapper jsonMapper,
final TaskLogPusher persistentTaskLogs,
@Self DruidNode node
@Self DruidNode node,
@Global HttpClient httpClient
) {
this.config = config;
this.taskConfig = taskConfig;
Expand All @@ -57,11 +61,12 @@ public ForkingTaskRunnerFactory(
this.jsonMapper = jsonMapper;
this.persistentTaskLogs = persistentTaskLogs;
this.node = node;
this.httpClient = httpClient;
}

@Override
public TaskRunner build()
{
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node, httpClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,28 @@ public synchronized int findUnusedPort()
return port;
}

public synchronized void markPortUnused(int port)
/**
* Force a port to be thought of as used.
*
* @param port The port of interest
*
* @return Result of Set.add
*/
public synchronized boolean markPortUsed(int port)
{
usedPorts.remove(port);
return usedPorts.add(port);
}

/**
* Force removing a port from the list of used ports
*
* @param port The port to remove
*
* @return The result of Set.remove
*/
public synchronized boolean markPortUnused(int port)
{
return usedPorts.remove(port);
}

private int chooseNext(int start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public class ForkingTaskRunnerConfig
@Max(65535)
private int startPort = 8100;


/**
* This is the time (in ms) that the forking task runner should allow the task to softly shutdown before trying to forcibly kill it.
*/
@JsonProperty
@Min(0)
private long softShutdownTimelimit = 30_000;

@JsonProperty
@NotNull
List<String> allowedPrefixes = Lists.newArrayList(
Expand Down Expand Up @@ -85,4 +93,15 @@ public List<String> getAllowedPrefixes()
{
return allowedPrefixes;
}

public ForkingTaskRunnerConfig setSoftShutdownTimeLimit(@Min(0) long limit)
{
this.softShutdownTimelimit = limit;
return this;
}

public long getSoftShutdownTimelimit()
{
return softShutdownTimelimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package io.druid.indexing.worker.executor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSink;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
Expand All @@ -29,14 +31,32 @@
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskRunner;

import io.druid.server.DruidNode;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutorService;

/**
Expand All @@ -51,8 +71,7 @@ public class ExecutorLifecycle
private final TaskActionClientFactory taskActionClientFactory;
private final TaskRunner taskRunner;
private final ObjectMapper jsonMapper;

private final ExecutorService parentMonitorExec = Execs.singleThreaded("parent-monitor-%d");
private final DruidNode druidNode;

private volatile ListenableFuture<TaskStatus> statusFuture = null;

Expand All @@ -61,21 +80,37 @@ public ExecutorLifecycle(
ExecutorLifecycleConfig config,
TaskActionClientFactory taskActionClientFactory,
TaskRunner taskRunner,
ObjectMapper jsonMapper
ObjectMapper jsonMapper,
@Self DruidNode druidNode
)
{
this.config = config;
this.taskActionClientFactory = taskActionClientFactory;
this.taskRunner = taskRunner;
this.jsonMapper = jsonMapper;
this.druidNode = druidNode;
}

@LifecycleStart
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an easy way to make sure all the tasks are dead when killing a middleManager would be useful. Sometimes you really do want it to die. So IMO removing this means we should have some other way. Maybe the middleManager should heartbeat to its tasks, and they should exit if they haven't got a heartbeat in a certain amount of time? Something long enough to cover any reasonable update + restart time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm How would you feel about forcing a node restart if you wanted to make absolutely sure all peons were dead?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, restarting the actual OS? I think it's best to have some other way to do it, since I think people expect apps to be able to be lifecycled independently of the host OS

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be ok to make it a config in runtime.props, also a flag while starting up the middlemanager indicating if we want to start with a clean state by killing any left over tasks from previous run can be useful.

public void start()
{
final File taskFile = Preconditions.checkNotNull(config.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(config.getStatusFile(), "statusFile");
final InputStream parentStream = Preconditions.checkNotNull(config.getParentStream(), "parentStream");
final File portFile = Preconditions.checkNotNull(config.getPortFile(), "portFile");

try(final FileChannel portFileChannel = FileChannel.open(portFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
final String portStr = String.format("%d", druidNode.getPort());
final FileLock lock = portFileChannel.lock();
try {
CharsetEncoder encoder = Charsets.UTF_8.newEncoder();
portFileChannel.write(encoder.encode(CharBuffer.wrap(portStr)));
}finally{
lock.release();
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}

final Task task;

Expand All @@ -91,30 +126,6 @@ public void start()
throw Throwables.propagate(e);
}

// Spawn monitor thread to keep a watch on parent's stdin
// If stdin reaches eof, the parent is gone, and we should shut down
parentMonitorExec.submit(
new Runnable()
{
@Override
public void run()
{
try {
while (parentStream.read() != -1) {
// Toss the byte
}
}
catch (Exception e) {
log.error(e, "Failed to read from stdin");
}

// Kind of gross, but best way to kill the JVM as far as I know
log.info("Triggering JVM shutdown.");
System.exit(2);
}
}
);

// Won't hurt in remote mode, and is required for setting up locks in local mode:
try {
if (!task.isReady(taskActionClientFactory.create(task))) {
Expand Down Expand Up @@ -166,6 +177,12 @@ public void join()
@LifecycleStop
public void stop()
{
parentMonitorExec.shutdown();
final File portFile = Preconditions.checkNotNull(config.getPortFile(), "portFile");
if(!portFile.delete())
{
log.warn("Unable to delete task port file at [%s]", portFile.toString());
} else {
log.info("Deleted port file at [%s]", portFile.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public class ExecutorLifecycleConfig
private File statusFile = null;

@JsonProperty
@Pattern(regexp = "\\{stdin\\}")
private String parentStreamName = "stdin";
@NotNull
private File portFile = null;


public File getTaskFile()
{
Expand All @@ -63,24 +64,14 @@ public ExecutorLifecycleConfig setStatusFile(File statusFile)
return this;
}

public String getParentStreamName()
public File getPortFile()
{
return parentStreamName;
return portFile;
}

public ExecutorLifecycleConfig setParentStreamName(String parentStreamName)
public ExecutorLifecycleConfig setPortFile(File portFile)
{
this.parentStreamName = parentStreamName;
this.portFile = portFile;
return this;
}

public InputStream getParentStream()
{
if ("stdin".equals(parentStreamName)) {
return System.in;
}
else {
throw new ISE("Unknown stream name[%s]", parentStreamName);
}
}
}
66 changes: 66 additions & 0 deletions server/src/main/java/io/druid/server/http/ShutdownResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.http;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;

import javax.ws.rs.DELETE;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Path("/shutdown")
public class ShutdownResource
{
private final Lifecycle lifecycle;
@Inject
public ShutdownResource(
Lifecycle lifecycle
){
this.lifecycle = lifecycle;
}
private static final Logger log = new Logger(ShutdownResource.class);
private final ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));

@DELETE
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DELETE seems weird here. You're not really "deleting" the /shutdown resource. I think sticking with good ol' POST is fine

public Response shutDown()
{
log.info("Received shutdown request");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to call lifecycle.shutdown() somewhere here or register it as a shutdown hook. I don't see lifecycle being used in this class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did at one point due to how peons were shutting down, but now it is all done through shutdown hooks and this will be removed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, yeah, just saw the other PR with the update to CliPeon .

executorService.schedule(
new Runnable()
{
@Override
public void run()
{
System.exit(0);
}
},
1,
TimeUnit.SECONDS
);
executorService.shutdown();
return Response.status(Response.Status.ACCEPTED).build();
}
}
Loading