Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,45 @@
package io.druid.indexing.common.actions;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.discovery.DruidLeaderClient;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Random;

public class RemoteTaskActionClient implements TaskActionClient
{
private final Task task;
private final HttpClient httpClient;
private final ServerDiscoverySelector selector;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;
private final DruidLeaderClient druidLeaderClient;
private final Random random = new Random();

private static final Logger log = new Logger(RemoteTaskActionClient.class);

public RemoteTaskActionClient(
Task task,
HttpClient httpClient,
ServerDiscoverySelector selector,
DruidLeaderClient druidLeaderClient,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
{
this.task = task;
this.httpClient = httpClient;
this.selector = selector;
this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper;
this.druidLeaderClient = druidLeaderClient;
}

@Override
Expand All @@ -83,46 +72,34 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOExcepti

while (true) {
try {
final Server server;
final URI serviceUri;
try {
server = getServiceInstance();
serviceUri = makeServiceUri(server);
}
catch (Exception e) {
// Want to retry, so throw an IOException.
throw new IOException("Failed to locate service uri", e);
}

final StatusResponseHolder response;
final FullResponseHolder fullResponseHolder;

log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction);
log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction);

try {
response = httpClient.go(
new Request(HttpMethod.POST, serviceUri.toURL())
.setContent(MediaType.APPLICATION_JSON, dataToSend),
new StatusResponseHandler(Charsets.UTF_8)
).get();
fullResponseHolder = druidLeaderClient.go(
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")
.setContent(MediaType.APPLICATION_JSON, dataToSend)
);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class);
throw Throwables.propagate(e);
}

if (response.getStatus().getCode() / 100 == 2) {
if (fullResponseHolder.getStatus().getCode() / 100 == 2) {
final Map<String, Object> responseDict = jsonMapper.readValue(
response.getContent(),
fullResponseHolder.getContent(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
} else {
// Want to retry, so throw an IOException.
throw new IOE(
"Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.",
response.getStatus(),
server.getHost()
"Scary HTTP status returned: %s. Check your overlord logs for exceptions.",
fullResponseHolder.getStatus()
);
}
}
Expand Down Expand Up @@ -152,19 +129,4 @@ private long jitter(long input)
long retval = input + (long) jitter;
return retval < 0 ? 0 : retval;
}

private URI makeServiceUri(final Server instance) throws URISyntaxException
{
return new URI(instance.getScheme(), null, instance.getAddress(), instance.getPort(), "/druid/indexer/v1/action", null, null);
}

private Server getServiceInstance()
{
final Server instance = selector.pick();
if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!");
} else {
return instance;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,34 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.client.indexing.IndexingService;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.discovery.DruidLeaderClient;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.task.Task;

/**
*/
public class RemoteTaskActionClientFactory implements TaskActionClientFactory
{
private final HttpClient httpClient;
private final ServerDiscoverySelector selector;
private final DruidLeaderClient druidLeaderClient;
private final RetryPolicyFactory retryPolicyFactory;
private final ObjectMapper jsonMapper;

@Inject
public RemoteTaskActionClientFactory(
@Global HttpClient httpClient,
@IndexingService ServerDiscoverySelector selector,
@IndexingService DruidLeaderClient leaderHttpClient,
RetryPolicyFactory retryPolicyFactory,
ObjectMapper jsonMapper
)
{
this.httpClient = httpClient;
this.selector = selector;
this.druidLeaderClient = leaderHttpClient;
this.retryPolicyFactory = retryPolicyFactory;
this.jsonMapper = jsonMapper;
}

@Override
public TaskActionClient create(Task task)
{
return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper);
return new RemoteTaskActionClient(task, druidLeaderClient, retryPolicyFactory, jsonMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@

package io.druid.indexing.common.actions;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.discovery.DruidLeaderClient;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskLock;
Expand All @@ -36,64 +31,33 @@
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;

public class RemoteTaskActionClientTest
{

private HttpClient httpClient;
private ServerDiscoverySelector selector;
private Server server;
private DruidLeaderClient druidLeaderClient;
List<TaskLock> result = null;
private ObjectMapper objectMapper = new DefaultObjectMapper();

@Before
public void setUp()
{
httpClient = createMock(HttpClient.class);
selector = createMock(ServerDiscoverySelector.class);

server = new Server()
{

@Override
public String getScheme()
{
return "http";
}

@Override
public int getPort()
{
return 8080;
}

@Override
public String getHost()
{
return "localhost";
}

@Override
public String getAddress()
{
return "localhost";
}
};
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);

long now = System.currentTimeMillis();

Expand All @@ -106,29 +70,30 @@ public String getAddress()
}

@Test
public void testSubmitSimple() throws JsonProcessingException
public void testSubmitSimple() throws Exception
{
Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx"));
expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action"))
.andReturn(request);

// return status code 200 and a list with size equals 1
Map<String, Object> responseBody = new HashMap<String, Object>();
responseBody.put("result", result);
String strResult = objectMapper.writeValueAsString(responseBody);
StatusResponseHolder responseHolder = new StatusResponseHolder(
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.OK,
EasyMock.createNiceMock(HttpResponse.class),
new StringBuilder().append(strResult)
);

// set up mocks
expect(selector.pick()).andReturn(server);
replay(selector);
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
replay(druidLeaderClient);

expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn(
Futures.immediateFuture(responseHolder)
);
replay(httpClient);

Task task = new NoopTask("id", 0, 0, null, null, null);
RemoteTaskActionClient client = new RemoteTaskActionClient(
task, httpClient, selector, new RetryPolicyFactory(
task, druidLeaderClient, new RetryPolicyFactory(
new RetryPolicyConfig()
), objectMapper
);
Expand All @@ -140,33 +105,35 @@ task, httpClient, selector, new RetryPolicyFactory(
}

Assert.assertEquals(1, result.size());
EasyMock.verify(selector, httpClient);
EasyMock.verify(druidLeaderClient);
}

@Test(expected = IOException.class)
public void testSubmitWithIllegalStatusCode() throws IOException
public void testSubmitWithIllegalStatusCode() throws Exception
{
// return status code 400 and a list with size equals 1
Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx"));
expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action"))
.andReturn(request);

// return status code 200 and a list with size equals 1
Map<String, Object> responseBody = new HashMap<String, Object>();
responseBody.put("result", result);
String strResult = objectMapper.writeValueAsString(responseBody);
StatusResponseHolder responseHolder = new StatusResponseHolder(
FullResponseHolder responseHolder = new FullResponseHolder(
HttpResponseStatus.BAD_REQUEST,
EasyMock.createNiceMock(HttpResponse.class),
new StringBuilder().append(strResult)
);

// set up mocks
expect(selector.pick()).andReturn(server);
replay(selector);
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
replay(druidLeaderClient);

expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn(
Futures.immediateFuture(responseHolder)
);
replay(httpClient);

Task task = new NoopTask("id", 0, 0, null, null, null);
RemoteTaskActionClient client = new RemoteTaskActionClient(
task, httpClient, selector, new RetryPolicyFactory(
task, druidLeaderClient, new RetryPolicyFactory(
objectMapper.readValue("{\"maxRetryCount\":0}", RetryPolicyConfig.class)
), objectMapper
);
Expand Down
Loading