diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java index bbfd336a42d..09c99fc8340 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidAdminClient.java @@ -21,12 +21,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.store.druid.druid.SimpleDatasourceInfo; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import okhttp3.Response; + +import java.io.InputStream; import java.io.IOException; import java.util.List; @@ -34,7 +34,6 @@ public class DruidAdminClient { private static final Logger logger = LoggerFactory.getLogger(DruidAdminClient.class); private static final String DATASOURCES_BASE_URI = "/druid/coordinator/v1/datasources?simple"; - private static final String DEFAULT_ENCODING = "UTF-8"; private static final ObjectMapper mapper = new ObjectMapper(); private final String coordinatorAddress; @@ -47,18 +46,19 @@ public DruidAdminClient(String coordinatorAddress, RestClient restClient) { public List getDataSources() throws IOException { String url = this.coordinatorAddress + DATASOURCES_BASE_URI; - HttpResponse response = restClient.get(url); + try (Response response = restClient.get(url)) { + if (!response.isSuccessful()) { + // TODO: Add a CustomErrorContext when this plugin is converted to EVF. + throw UserException + .dataReadError() + .message("Error getting druid datasources. HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .build(logger); + } - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw UserException - .dataReadError() - .message("Error getting druid datasources. HTTP request failed") - .addContext("Response code", response.getStatusLine().getStatusCode()) - .addContext("Response message", response.getStatusLine().getReasonPhrase()) - .build(logger); + InputStream responseStream = response.body().byteStream(); + return mapper.readValue(responseStream, new TypeReference>(){}); } - - String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING); - return mapper.readValue(responseJson, new TypeReference>(){}); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java index bd05c3ee5d0..fe82650199e 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java @@ -22,12 +22,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import okhttp3.Response; + +import java.io.InputStream; import java.util.ArrayList; public class DruidQueryClient { @@ -48,20 +48,22 @@ public DruidQueryClient(String brokerURI, RestClient restClient) { public DruidScanResponse executeQuery(String query) throws Exception { logger.debug("Executing Query - {}", query); - HttpResponse response = restClient.post(queryUrl, query); - if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { - throw UserException - .dataReadError() - .message("Error executing druid query. HTTP request failed") - .addContext("Response code", response.getStatusLine().getStatusCode()) - .addContext("Response message", response.getStatusLine().getReasonPhrase()) - .build(logger); - } + try (Response response = restClient.post(queryUrl, query)) { + if (!response.isSuccessful()) { + // TODO: Add a CustomErrorContext when this plugin is converted to EVF. + throw UserException + .dataReadError() + .message("Error executing druid query. HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .build(logger); + } - String data = EntityUtils.toString(response.getEntity()); - ArrayNode responses = mapper.readValue(data, ArrayNode.class); - return parseResponse(responses); + InputStream responseStream = response.body().byteStream(); + ArrayNode responses = mapper.readValue(responseStream, ArrayNode.class); + return parseResponse(responses); + } } private DruidScanResponse parseResponse(ArrayNode responses) { diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java index d88a41f5b4f..fa8a4cf5e11 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClient.java @@ -17,11 +17,25 @@ */ package org.apache.drill.exec.store.druid.rest; -import org.apache.http.HttpResponse; +import okhttp3.Response; import java.io.IOException; public interface RestClient { - HttpResponse get(String url) throws IOException; - HttpResponse post(String url, String body) throws IOException; + /** + * Executes an HTTP GET. + * @param url request URL + * @return a Response object that the caller is responsible for closing. + * @throws IOException + */ + Response get(String url) throws IOException; + + /** + * Executes an HTTP POST. + * @param url request URL. + * @param body request body. + * @return a Response object that the caller is responsible for closing. + * @throws IOException + */ + Response post(String url, String body) throws IOException; } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java index 5a7087e8e0a..a5cb6e73f1f 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java @@ -17,38 +17,36 @@ */ package org.apache.drill.exec.store.druid.rest; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.DefaultHttpClient; - -import javax.ws.rs.core.HttpHeaders; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; -import static javax.ws.rs.core.MediaType.APPLICATION_JSON; -import static org.apache.http.protocol.HTTP.CONTENT_TYPE; +import java.nio.charset.StandardCharsets; +import java.io.IOException; public class RestClientWrapper implements RestClient { - private static final HttpClient httpClient = new DefaultHttpClient(); - private static final Charset DEFAULT_ENCODING = StandardCharsets.UTF_8; + // OkHttp client is designed to be shared across threads. + private final OkHttpClient httpClient = new OkHttpClient(); + + public Response get(String url) throws IOException { + Request get = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .build(); - public HttpResponse get(String url) throws IOException { - HttpGet httpget = new HttpGet(url); - httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); - return httpClient.execute(httpget); + return httpClient.newCall(get).execute(); } - public HttpResponse post(String url, String body) throws IOException { - HttpPost httppost = new HttpPost(url); - httppost.addHeader(CONTENT_TYPE, APPLICATION_JSON); - HttpEntity entity = new StringEntity(body, DEFAULT_ENCODING); - httppost.setEntity(entity); + public Response post(String url, String body) throws IOException { + RequestBody postBody = RequestBody.create(body.getBytes(StandardCharsets.UTF_8)); + + Request post = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .post(postBody) + .build(); - return httpClient.execute(httppost); + return httpClient.newCall(post).execute(); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java index ef47e26c0ed..d52a8fc28e1 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestBase.java @@ -35,7 +35,7 @@ public static void setUpBeforeClass() throws Exception { startCluster(ClusterFixture.builder(dirTestWatcher)); pluginRegistry = cluster.drillbit().getContext().getStorage(); - DruidTestSuit.initDruid(); + DruidTestSuite.initDruid(); initDruidStoragePlugin(); } @@ -43,7 +43,7 @@ private static void initDruidStoragePlugin() throws Exception { pluginRegistry .put( DruidStoragePluginConfig.NAME, - DruidTestSuit.getDruidStoragePluginConfig()); + DruidTestSuite.getDruidStoragePluginConfig()); } @AfterClass diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java similarity index 98% rename from contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java rename to contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java index 5e1c149556d..92d3c3486a4 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuit.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestSuite.java @@ -43,8 +43,8 @@ DruidScanSpecBuilderTest.class }) @Category({SlowTest.class, DruidStorageTest.class}) -public class DruidTestSuit { - private static final Logger logger = LoggerFactory.getLogger(DruidTestSuit.class); +public class DruidTestSuite { + private static final Logger logger = LoggerFactory.getLogger(DruidTestSuite.class); private static final ObjectMapper mapper = new ObjectMapper(); diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java index f4abcd3ca5a..3b96db7cdbf 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDataGenerator.java @@ -23,38 +23,28 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.shaded.guava.com.google.common.io.Resources; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.HttpHeaders; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import java.io.InputStream; import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.TimeUnit; -import static javax.ws.rs.core.MediaType.APPLICATION_JSON; -import static org.apache.http.protocol.HTTP.CONTENT_TYPE; - public class TestDataGenerator { private static final Logger logger = LoggerFactory.getLogger(TestDataGenerator.class); - private static final HttpClient httpClient = new DefaultHttpClient(); + private static final OkHttpClient httpClient = new OkHttpClient(); private static final ObjectMapper mapper = new ObjectMapper(); - private static final String DEFAULT_ENCODING = "UTF-8"; - private static final String RESPONSE_SUCCESS = "SUCCESS"; public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig) throws Exception { @@ -72,11 +62,13 @@ public static void startImport(DruidStoragePluginConfig druidStoragePluginConfig private static boolean isDruidRunning(DruidStoragePluginConfig druidStoragePluginConfig) { try { String healthCheckUrl = druidStoragePluginConfig.getCoordinatorAddress() + "/status/health"; - HttpGet httpGet = new HttpGet(healthCheckUrl); - HttpResponse response = httpClient.execute(httpGet); - StatusLine statusLine = response.getStatusLine(); - String status = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING); - return statusLine.getStatusCode() == HttpStatus.SC_OK && status.equalsIgnoreCase("true"); + Request get = new Request.Builder() + .url(healthCheckUrl) + .build(); + + try (Response resp = httpClient.newCall(get).execute()) { + return resp.isSuccessful() && resp.body().string().equalsIgnoreCase("true"); + } } catch (Exception ex) { logger.error("Error getting druid status", ex); return false; @@ -90,18 +82,24 @@ private static String taskUrl(DruidStoragePluginConfig druidStoragePluginConfig) private static String startImportTask(DruidStoragePluginConfig druidStoragePluginConfig) throws URISyntaxException, IOException { try { String url = taskUrl(druidStoragePluginConfig); - byte[] taskConfig = Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI())); - - HttpPost httpPost = new HttpPost(url); - httpPost.addHeader(CONTENT_TYPE, APPLICATION_JSON); - HttpEntity entity = new ByteArrayEntity(taskConfig); - httpPost.setEntity(entity); - - HttpResponse response = httpClient.execute(httpPost); - String data = EntityUtils.toString(response.getEntity()); - TaskStartResponse taskStartResponse = mapper.readValue(data, TaskStartResponse.class); - logger.debug("Started Indexing Task - " + taskStartResponse.getTaskId()); - return taskStartResponse.getTaskId(); + RequestBody postBody = RequestBody.create( + Files.readAllBytes(Paths.get(Resources.getResource("wikipedia-index.json").toURI())) + ); + Request post = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .post(postBody) + .build(); + + try (Response resp = httpClient.newCall(post).execute()) { + String respBodyStr = resp.body().string(); + TaskStartResponse taskStartResponse = mapper.readValue( + respBodyStr, + TaskStartResponse.class + ); + logger.debug("Started Indexing Task - {}", taskStartResponse.getTaskId()); + return taskStartResponse.getTaskId(); + } } catch (Exception ex) { logger.error("Error starting Indexing Task"); throw ex; @@ -114,14 +112,17 @@ private static void waitForIndexingTaskToFinish(String taskId, DruidStoragePlugi Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes)); String url = taskUrl(druidStoragePluginConfig) + "/" + taskId + "/status"; - HttpGet httpget = new HttpGet(url); - httpget.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); - - HttpResponse response = httpClient.execute(httpget); - String responseJson = EntityUtils.toString(response.getEntity(), DEFAULT_ENCODING); - TaskStatusResponse taskStatusResponse = mapper.readValue(responseJson, TaskStatusResponse.class); - if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) { - throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status)); + Request get = new Request.Builder() + .url(url) + .addHeader("Content-Type", "application/json") + .build(); + + try (Response resp = httpClient.newCall(get).execute()) { + InputStream jsonStream = resp.body().byteStream(); + TaskStatusResponse taskStatusResponse = mapper.readValue(jsonStream, TaskStatusResponse.class); + if (!taskStatusResponse.taskStatus.status.equalsIgnoreCase(RESPONSE_SUCCESS)) { + throw new Exception(String.format("Task %s finished with status %s", taskId, taskStatusResponse.taskStatus.status)); + } } logger.debug("Task {} finished successfully", taskId); diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java index dc9986ce2ac..bb065ab7b31 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java @@ -18,17 +18,13 @@ package org.apache.drill.exec.store.druid.rest; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; -import org.apache.http.HttpStatus; -import org.apache.http.HttpResponse; -import org.apache.http.StatusLine; -import org.apache.http.HttpEntity; -import org.apache.http.Header; -import org.apache.http.HttpHeaders; -import org.apache.http.message.BasicHeader; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import okhttp3.Response; +import okhttp3.ResponseBody; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -45,31 +41,23 @@ public class DruidQueryClientTest { private RestClient restClient; @Mock - private HttpResponse httpResponse; - - @Mock - private StatusLine statusLine; + private Response httpResponse; @Mock - private HttpEntity httpEntity; + private ResponseBody httpResponseBody; private DruidQueryClient druidQueryClient; private static final String BROKER_URI = "some broker uri"; private static final String QUERY = "{\"queryType\":\"scan\",\"dataSource\":\"wikipedia\",\"descending\":false,\"filter\":{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"user\",\"value\":\"Dansker\"},{\"type\":\"search\",\"dimension\":\"comment\",\"query\":{\"type\":\"contains\",\"value\":\"Bitte\",\"caseSensitive\":false}}]},\"granularity\":\"all\",\"intervals\":[\"2016-06-27T00:00:00.000Z/2016-06-27T22:00:00.000Z\"],\"columns\":[],\"offset\":0,\"limit\":4096}"; - private static final Header ENCODING_HEADER = - new BasicHeader(HttpHeaders.CONTENT_ENCODING, StandardCharsets.UTF_8.name()); @Before public void setup() throws IOException { restClient = mock(RestClient.class); - httpResponse = mock(HttpResponse.class); - statusLine = mock(StatusLine.class); - httpEntity = mock(HttpEntity.class); - - when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); - when(httpEntity.getContentEncoding()).thenReturn(ENCODING_HEADER); - when(httpResponse.getStatusLine()).thenReturn(statusLine); - when(httpResponse.getEntity()).thenReturn(httpEntity); + httpResponse = mock(Response.class); + httpResponseBody = mock(ResponseBody.class); + + when(httpResponse.isSuccessful()).thenReturn(true); + when(httpResponse.body()).thenReturn(httpResponseBody); when(restClient.post(BROKER_URI + "/druid/v2", QUERY)) .thenReturn(httpResponse); @@ -79,7 +67,7 @@ public void setup() throws IOException { @Test(expected=Exception.class) public void executeQueryCalledDruidReturnsNon200ShouldThrowError() throws Exception { - when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + when(httpResponse.isSuccessful()).thenReturn(false); druidQueryClient.executeQuery(QUERY); } @@ -88,7 +76,7 @@ public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList() throws Exception { InputStream inputStream = new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8)); - when(httpEntity.getContent()).thenReturn(inputStream); + when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); assertThat(response.getEvents()).isEmpty(); @@ -100,7 +88,7 @@ public void executeQueryCalledSuccessfullyParseQueryResults() String result = "[{\"segmentId\":\"wikipedia_2016-06-27T14:00:00.000Z_2016-06-27T15:00:00.000Z_2021-12-11T11:12:16.106Z\",\"columns\":[\"__time\",\"channel\",\"cityName\",\"comment\",\"countryIsoCode\",\"countryName\",\"diffUrl\",\"flags\",\"isAnonymous\",\"isMinor\",\"isNew\",\"isRobot\",\"isUnpatrolled\",\"metroCode\",\"namespace\",\"page\",\"regionIsoCode\",\"regionName\",\"user\",\"sum_deleted\",\"sum_deltaBucket\",\"sum_added\",\"sum_commentLength\",\"count\",\"sum_delta\"],\"events\":[{\"__time\":1467036000000,\"channel\":\"#de.wikipedia\",\"cityName\":null,\"comment\":\"Bitte [[WP:Literatur]] beachten.\",\"countryIsoCode\":null,\"countryName\":null,\"diffUrl\":\"https://de.wikipedia.org/w/index.php?diff=155672392&oldid=155667393\",\"flags\":null,\"isAnonymous\":\"false\",\"isMinor\":\"false\",\"isNew\":\"false\",\"isRobot\":\"false\",\"isUnpatrolled\":\"false\",\"metroCode\":null,\"namespace\":\"Main\",\"page\":\"Walfang\",\"regionIsoCode\":null,\"regionName\":null,\"user\":\"Dansker\",\"sum_deleted\":133,\"sum_deltaBucket\":-200,\"sum_added\":0,\"sum_commentLength\":32,\"count\":1,\"sum_delta\":-133}]}]"; InputStream inputStream = new ByteArrayInputStream(result.getBytes(StandardCharsets.UTF_8)); - when(httpEntity.getContent()).thenReturn(inputStream); + when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); assertThat(response.getEvents()).isNotEmpty();