Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import okhttp3.Response;

import java.io.InputStream;
import java.io.IOException;
import java.util.List;

public class DruidAdminClient {
private static final Logger logger = LoggerFactory.getLogger(DruidAdminClient.class);

private static final String DATASOURCES_BASE_URI = "/druid/coordinator/v1/datasources?simple";
private static final String DEFAULT_ENCODING = "UTF-8";
private static final ObjectMapper mapper = new ObjectMapper();

private final String coordinatorAddress;
Expand All @@ -47,18 +46,19 @@ public DruidAdminClient(String coordinatorAddress, RestClient restClient) {

public List<SimpleDatasourceInfo> getDataSources() throws IOException {
String url = this.coordinatorAddress + DATASOURCES_BASE_URI;
HttpResponse response = restClient.get(url);
try (Response response = restClient.get(url)) {
if (!response.isSuccessful()) {
// TODO: Add a CustomErrorContext when this plugin is converted to EVF.
throw UserException
.dataReadError()
.message("Error getting druid datasources. HTTP request failed")
.addContext("Response code", response.code())
.addContext("Response message", response.message())
.build(logger);
}

if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw UserException
.dataReadError()
.message("Error getting druid datasources. HTTP request failed")
.addContext("Response code", response.getStatusLine().getStatusCode())
.addContext("Response message", response.getStatusLine().getReasonPhrase())
.build(logger);
InputStream responseStream = response.body().byteStream();
return mapper.readValue(responseStream, new TypeReference<List<SimpleDatasourceInfo>>(){});
}

String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
return mapper.readValue(responseJson, new TypeReference<List<SimpleDatasourceInfo>>(){});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import okhttp3.Response;

import java.io.InputStream;
import java.util.ArrayList;

public class DruidQueryClient {
Expand All @@ -48,20 +48,22 @@ public DruidQueryClient(String brokerURI, RestClient restClient) {

public DruidScanResponse executeQuery(String query) throws Exception {
logger.debug("Executing Query - {}", query);
HttpResponse response = restClient.post(queryUrl, query);

if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw UserException
.dataReadError()
.message("Error executing druid query. HTTP request failed")
.addContext("Response code", response.getStatusLine().getStatusCode())
.addContext("Response message", response.getStatusLine().getReasonPhrase())
.build(logger);
}
try (Response response = restClient.post(queryUrl, query)) {
if (!response.isSuccessful()) {
// TODO: Add a CustomErrorContext when this plugin is converted to EVF.
throw UserException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to get the errorContext from the DruidBatchReader so that we get good error messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that these objects are created in the DruidPlguin but maybe we could add a method addErrorContext or something like that and call it in the constructor of the Batch Reader.

Copy link
Contributor Author

@jnturton jnturton Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgivre thanks, I'll check that out. I'd have thought that any error information available in the batch reader would have to also be present here in the HTTP response. Even if that's true, there might be some more response body unpacking to do to expose it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgivre I wasn't aware of the CustomErrorContext, thanks for pointing it out. It looks to me like it's an EVF construct that isn't accessible to pre-EVF plugins like storage-druid. If I've got that right then will we only be able to start it using in this plugin's error messages when its conversion to EVF arrives?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh... you're right. In that case, would you please put some TODO comments in the rest clients and then we can update once the EVF conversion is done?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with merging this once that's done. +1

.dataReadError()
.message("Error executing druid query. HTTP request failed")
.addContext("Response code", response.code())
.addContext("Response message", response.message())
.build(logger);
}

String data = EntityUtils.toString(response.getEntity());
ArrayNode responses = mapper.readValue(data, ArrayNode.class);
return parseResponse(responses);
InputStream responseStream = response.body().byteStream();
ArrayNode responses = mapper.readValue(responseStream, ArrayNode.class);
return parseResponse(responses);
}
}

private DruidScanResponse parseResponse(ArrayNode responses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,25 @@
*/
package org.apache.drill.exec.store.druid.rest;

import org.apache.http.HttpResponse;
import okhttp3.Response;

import java.io.IOException;

public interface RestClient {
HttpResponse get(String url) throws IOException;
HttpResponse post(String url, String body) throws IOException;
/**
* Executes an HTTP GET.
* @param url request URL
* @return a Response object that the caller is responsible for closing.
* @throws IOException
*/
Response get(String url) throws IOException;

/**
* Executes an HTTP POST.
* @param url request URL.
* @param body request body.
* @return a Response object that the caller is responsible for closing.
* @throws IOException
*/
Response post(String url, String body) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,36 @@
*/
package org.apache.drill.exec.store.druid.rest;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;

import javax.ws.rs.core.HttpHeaders;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
import java.nio.charset.StandardCharsets;
import java.io.IOException;

public class RestClientWrapper implements RestClient {
private static final HttpClient httpClient = new DefaultHttpClient();
private static final Charset DEFAULT_ENCODING = StandardCharsets.UTF_8;
// OkHttp client is designed to be shared across threads.
private final OkHttpClient httpClient = new OkHttpClient();

public Response get(String url) throws IOException {
Request get = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/json")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a get request need a content-type? Content-Type on a request refers to the type of the request body but for GETs, this is empty. Maybe I've misunderstood the intent, but shouldn't this be an Accept header? An Accept header is used to tell the HTTP server that the client (us) would prefer to get the response with a particular content-type. See https://en.wikipedia.org/wiki/List_of_HTTP_header_fields which has a description of what the headers mean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning: also a valid question, but remember that this PR was a thread safety fix intended for backporting to stable so I strove to disturb as little possible while fixing the bug. Since the plugin specified this content-type here before, and we have no bug report related to it doing so, it continues to do that after this PR.

Maybe when @cgivre sends in the EVF version of this plugin we can see if Druid is indeed happy for this content-type header to disappear. I'm almost sure you're right and that will be the case.

.build();

public HttpResponse get(String url) throws IOException {
HttpGet httpget = new HttpGet(url);
httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
return httpClient.execute(httpget);
return httpClient.newCall(get).execute();
}

public HttpResponse post(String url, String body) throws IOException {
HttpPost httppost = new HttpPost(url);
httppost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
HttpEntity entity = new StringEntity(body, DEFAULT_ENCODING);
httppost.setEntity(entity);
public Response post(String url, String body) throws IOException {
RequestBody postBody = RequestBody.create(body.getBytes(StandardCharsets.UTF_8));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use the RequestBody.create method that takes a String instead of converting to a byte[]?

https://square.github.io/okhttp/#post-to-a-server

Copy link
Contributor Author

@jnturton jnturton Sep 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use the RequestBody.create method that takes a String instead of converting to a byte[]?

https://square.github.io/okhttp/#post-to-a-server

Ugh I missed that. @cgivre please may we incorporate @pjfanning's suggestion here in the EVF conversion PR.


Request post = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/json")
.post(postBody)
.build();

return httpClient.execute(httppost);
return httpClient.newCall(post).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public static void setUpBeforeClass() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
pluginRegistry = cluster.drillbit().getContext().getStorage();

DruidTestSuit.initDruid();
DruidTestSuite.initDruid();
initDruidStoragePlugin();
}

private static void initDruidStoragePlugin() throws Exception {
pluginRegistry
.put(
DruidStoragePluginConfig.NAME,
DruidTestSuit.getDruidStoragePluginConfig());
DruidTestSuite.getDruidStoragePluginConfig());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
DruidScanSpecBuilderTest.class
})
@Category({SlowTest.class, DruidStorageTest.class})
public class DruidTestSuit {
private static final Logger logger = LoggerFactory.getLogger(DruidTestSuit.class);
public class DruidTestSuite {
private static final Logger logger = LoggerFactory.getLogger(DruidTestSuite.class);

private static final ObjectMapper mapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,28 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.HttpHeaders;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import java.io.InputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static org.apache.http.protocol.HTTP.CONTENT_TYPE;

public class TestDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(TestDataGenerator.class);

private static final HttpClient httpClient = new DefaultHttpClient();
private static final OkHttpClient httpClient = new OkHttpClient();

private static final ObjectMapper mapper = new ObjectMapper();

private static final String DEFAULT_ENCODING = "UTF-8";

private static final String RESPONSE_SUCCESS = "SUCCESS";

public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig) throws Exception {
Expand All @@ -72,11 +62,13 @@ public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig
private static boolean isDruidRunning(DruidStoragePluginConfig druidStoragePluginConfig) {
try {
String healthCheckUrl = druidStoragePluginConfig.getCoordinatorAddress() + "/status/health";
HttpGet httpGet = new HttpGet(healthCheckUrl);
HttpResponse response = httpClient.execute(httpGet);
StatusLine statusLine = response.getStatusLine();
String status = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
return statusLine.getStatusCode() == HttpStatus.SC_OK && status.equalsIgnoreCase("true");
Request get = new Request.Builder()
.url(healthCheckUrl)
.build();

try (Response resp = httpClient.newCall(get).execute()) {
return resp.isSuccessful() && resp.body().string().equalsIgnoreCase("true");
}
} catch (Exception ex) {
logger.error("Error getting druid status", ex);
return false;
Expand All @@ -90,18 +82,24 @@ private static String taskUrl(DruidStoragePluginConfig druidStoragePluginConfig)
private static String startImportTask(DruidStoragePluginConfig druidStoragePluginConfig) throws URISyntaxException, IOException {
try {
String url = taskUrl(druidStoragePluginConfig);
byte[] taskConfig = Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()));

HttpPost httpPost = new HttpPost(url);
httpPost.addHeader(CONTENT_TYPE, APPLICATION_JSON);
HttpEntity entity = new ByteArrayEntity(taskConfig);
httpPost.setEntity(entity);

HttpResponse response = httpClient.execute(httpPost);
String data = EntityUtils.toString(response.getEntity());
TaskStartResponse taskStartResponse = mapper.readValue(data, TaskStartResponse.class);
logger.debug("Started Indexing Task - " + taskStartResponse.getTaskId());
return taskStartResponse.getTaskId();
RequestBody postBody = RequestBody.create(
Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI()))
);
Request post = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/json")
.post(postBody)
.build();

try (Response resp = httpClient.newCall(post).execute()) {
String respBodyStr = resp.body().string();
TaskStartResponse taskStartResponse = mapper.readValue(
respBodyStr,
TaskStartResponse.class
);
logger.debug("Started Indexing Task - {}", taskStartResponse.getTaskId());
return taskStartResponse.getTaskId();
}
} catch (Exception ex) {
logger.error("Error starting Indexing Task");
throw ex;
Expand All @@ -114,14 +112,17 @@ private static void waitForIndexingTaskToFinish(String taskId, DruidStoragePlugi
Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));

String url = taskUrl(druidStoragePluginConfig) + "/" + taskId + "/status";
HttpGet httpget = new HttpGet(url);
httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);

HttpResponse response = httpClient.execute(httpget);
String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING);
TaskStatusResponse taskStatusResponse = mapper.readValue(responseJson, TaskStatusResponse.class);
if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
Request get = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/json")
.build();

try (Response resp = httpClient.newCall(get).execute()) {
InputStream jsonStream = resp.body().byteStream();
TaskStatusResponse taskStatusResponse = mapper.readValue(jsonStream, TaskStatusResponse.class);
if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) {
throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status));
}
}

logger.debug("Task {} finished successfully", taskId);
Expand Down
Loading