From af6dce4ec19c0f0b5b5a0cddb8bf3cfd3786c176 Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 7 Sep 2022 10:41:13 +0200 Subject: [PATCH 1/6] Close OkHttp3 response object in the event of an unsuccesful request in HTTP plugin. --- .../exec/store/http/HttpBatchReader.java | 1 + .../exec/store/http/util/SimpleHttp.java | 31 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java index 99159482f21..7f8fd59248c 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -127,6 +127,7 @@ public void addContext(UserException.Builder builder) { buildImplicitColumns(); } + // Should be closed by the JsonLoader. InputStream inStream = http.getInputStream(); populateImplicitFieldMap(http); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index ee5b285f1ef..e8cdd6805b0 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -35,6 +35,7 @@ import org.apache.drill.common.exceptions.EmptyErrorContext; import org.apache.drill.common.logical.OAuthConfig; import org.apache.drill.common.logical.StoragePluginConfig.AuthMode; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; @@ -321,7 +322,8 @@ public X509Certificate[] getAcceptedIssuers() { /** * Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong * the method throws a UserException. - * @return An Inputstream of the data from the URL call. + * @return An Inputstream of the data from the URL call. The caller is responsible for calling + * close() on the InputStream. */ public InputStream getInputStream() { @@ -369,13 +371,14 @@ public InputStream getInputStream() { // Build the request object Request request = requestBuilder.build(); + Response response = null; try { logger.debug("Executing request: {}", request); logger.debug("Headers: {}", request.headers()); // Execute the request - Response response = client + response = client .newCall(request) .execute(); @@ -392,22 +395,28 @@ public InputStream getInputStream() { paginator.notifyPartialPage(); } - // If the request is unsuccessful, throw a UserException + // If the request is unsuccessful clean up and throw a UserException if (!isSuccessful(responseCode)) { - throw UserException - .dataReadError() - .message("HTTP request failed") - .addContext("Response code", response.code()) - .addContext("Response message", response.message()) - .addContext(errorContext) - .build(logger); + try { + AutoCloseables.closeSilently(response); + } finally { + throw UserException + .dataReadError() + .message("HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .addContext(errorContext) + .build(logger); + } } logger.debug("HTTP Request for {} successful.", url()); logger.debug("Response Headers: {} ", response.headers()); - // Return the InputStream of the response + // Return the InputStream of the response. Note that it is necessary and + // and sufficient that the caller invokes close() on the returned stream. return Objects.requireNonNull(response.body()).byteStream(); } catch (IOException e) { + // response can only be null at this location so we do not attempt to close it. throw UserException .dataReadError(e) .message("Failed to read the HTTP response body") From c2487e67f7bab12cbf0ec2d63b5d578a340f6603 Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 7 Sep 2022 10:44:52 +0200 Subject: [PATCH 2/6] Update comment wording. --- .../java/org/apache/drill/exec/store/http/HttpBatchReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java index 7f8fd59248c..e2bfee91149 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -127,7 +127,7 @@ public void addContext(UserException.Builder builder) { buildImplicitColumns(); } - // Should be closed by the JsonLoader. + // inStream is expected to be closed by the JsonLoader. InputStream inStream = http.getInputStream(); populateImplicitFieldMap(http); From 604e3e1d84976b5f8ca9b642870558c8a2079342 Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 7 Sep 2022 10:55:25 +0200 Subject: [PATCH 3/6] Additional calls to close() on InputStreams from SimpleHttp. --- .../drill/exec/store/http/HttpXMLBatchReader.java | 2 +- .../exec/store/http/udfs/HttpHelperFunctions.java | 4 ++++ .../drill/exec/store/http/util/SimpleHttp.java | 12 ++++++++---- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java index da4f78c5db3..bb5d217e14c 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java @@ -143,7 +143,7 @@ public boolean next() { @Override public void close() { - AutoCloseables.closeSilently(inStream); AutoCloseables.closeSilently(xmlReader); + AutoCloseables.closeSilently(inStream); } } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java index a95bc9ff60a..d227341acba 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java @@ -93,6 +93,8 @@ public void eval() { rowWriter.start(); if (jsonLoader.parser().next()) { rowWriter.save(); + } else { + jsonLoader.close(); } } catch (Exception e) { throw org.apache.drill.common.exceptions.UserException.dataReadError(e) @@ -189,6 +191,8 @@ public void eval() { rowWriter.start(); if (jsonLoader.parser().next()) { rowWriter.save(); + } else { + jsonLoader.close(); } } catch (Exception e) { throw org.apache.drill.common.exceptions.UserException.dataReadError(e) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index e8cdd6805b0..6f09e2d7b31 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -428,10 +428,14 @@ public InputStream getInputStream() { public String getResultsFromApiCall() { InputStream inputStream = getInputStream(); - return new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)) - .lines() - .collect(Collectors.joining("\n")); + try { + return new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + } finally { + AutoCloseables.closeSilently(inputStream); + } } public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) { From 783a9c53ce7d986dbb8738ffef0cd31b1afec72d Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 7 Sep 2022 12:58:32 +0200 Subject: [PATCH 4/6] More calls to close(). --- .../exec/store/http/util/SimpleHttp.java | 53 +++++++++---------- .../exec/store/http/oauth/OAuthUtils.java | 18 ++++--- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index 6f09e2d7b31..5b11d60b0fd 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -371,35 +371,29 @@ public InputStream getInputStream() { // Build the request object Request request = requestBuilder.build(); - Response response = null; try { logger.debug("Executing request: {}", request); logger.debug("Headers: {}", request.headers()); // Execute the request - response = client - .newCall(request) - .execute(); - - // Preserve the response - responseMessage = response.message(); - responseCode = response.code(); - responseProtocol = response.protocol().toString(); - responseURL = response.request().url().toString(); - - // Case for pagination without limit - if (paginator != null && ( - response.code() != 200 || response.body() == null || - response.body().contentLength() == 0)) { - paginator.notifyPartialPage(); - } + try (Response response = client.newCall(request).execute()) { + + // Preserve the response + responseMessage = response.message(); + responseCode = response.code(); + responseProtocol = response.protocol().toString(); + responseURL = response.request().url().toString(); + + // Case for pagination without limit + if (paginator != null && ( + response.code() != 200 || response.body() == null || + response.body().contentLength() == 0)) { + paginator.notifyPartialPage(); + } - // If the request is unsuccessful clean up and throw a UserException - if (!isSuccessful(responseCode)) { - try { - AutoCloseables.closeSilently(response); - } finally { + // If the request is unsuccessful clean up and throw a UserException + if (!isSuccessful(responseCode)) { throw UserException .dataReadError() .message("HTTP request failed") @@ -408,13 +402,13 @@ public InputStream getInputStream() { .addContext(errorContext) .build(logger); } - } - logger.debug("HTTP Request for {} successful.", url()); - logger.debug("Response Headers: {} ", response.headers()); + logger.debug("HTTP Request for {} successful.", url()); + logger.debug("Response Headers: {} ", response.headers()); - // Return the InputStream of the response. Note that it is necessary and - // and sufficient that the caller invokes close() on the returned stream. - return Objects.requireNonNull(response.body()).byteStream(); + // Return the InputStream of the response. Note that it is necessary and + // and sufficient that the caller invokes close() on the returned stream. + return Objects.requireNonNull(response.body()).byteStream(); + } } catch (IOException e) { // response can only be null at this location so we do not attempt to close it. throw UserException @@ -956,8 +950,9 @@ public static ResponseBody makeSimpleGetRequest(String url) throws IOException { Request request = requestBuilder.build(); // Execute the request - Response response = client.newCall(request).execute(); + try (Response response = client.newCall(request).execute()) { return response.body(); + } } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java index 573df78cba1..8b80499bb67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java @@ -38,13 +38,13 @@ public class OAuthUtils { private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class); /** - * Craft a GET request to obtain an access token. + * Crafts a POST request to obtain an access token. * @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode * @param authorizationCode The authorization code from the OAuth2.0 enabled API * @param callbackURL The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place. * @return A Request Body to obtain an access token */ - public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) { + public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) { return new FormBody.Builder() .add("grant_type", "authorization_code") .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID)) @@ -55,12 +55,12 @@ public static RequestBody getPostResponse(CredentialsProvider credentialsProvide } /** - * Crafts a POST response for refreshing an access token when a refresh token is present. + * Crafts a POST request for refreshing an access token when a refresh token is present. * @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken * @param refreshToken The refresh token * @return A Request Body with the correct parameters for obtaining an access token */ - public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) { + public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) { return new FormBody.Builder() .add("grant_type", "refresh_token") .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID)) @@ -90,7 +90,7 @@ public static Request getAccessTokenRequest(CredentialsProvider credentialsProvi .url(buildAccessTokenURL(credentialsProvider)) .header("Content-Type", "application/json") .addHeader("Accept", "application/json") - .post(getPostResponse(credentialsProvider, authenticationCode, callbackURL)) + .post(getPostRequest(credentialsProvider, authenticationCode, callbackURL)) .build(); } @@ -110,7 +110,7 @@ public static Request getAccessTokenRequestFromRefreshToken(CredentialsProvider .url(tokenURI) .header("Content-Type", "application/json") .addHeader("Accept", "application/json") - .post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken)) + .post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken)) .build(); } @@ -127,9 +127,10 @@ public static Map getOAuthTokens(OkHttpClient client, Request re String accessToken; String refreshToken; Map tokens = new HashMap<>(); + Response response = null; try { - Response response = client.newCall(request).execute(); + response = client.newCall(request).execute(); String responseBody = response.body().string(); if (!response.isSuccessful()) { @@ -164,13 +165,14 @@ public static Map getOAuthTokens(OkHttpClient client, Request re refreshToken = (String) parsedJson.get("refresh_token"); tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken); } - response.close(); return tokens; } catch (NullPointerException | IOException e) { throw UserException.connectionError() .message("Error refreshing access OAuth2 access token. " + e.getMessage()) .build(logger); + } finally { + response.close(); } } } From a5d9b2572726629eac7a2dba2a46cbd80436881e Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 7 Sep 2022 14:10:05 +0200 Subject: [PATCH 5/6] Fix response closing logic in SimpleHttp. --- .../exec/store/http/util/SimpleHttp.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index 5b11d60b0fd..cf3d080285a 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -371,44 +371,46 @@ public InputStream getInputStream() { // Build the request object Request request = requestBuilder.build(); + Response response = null; try { logger.debug("Executing request: {}", request); logger.debug("Headers: {}", request.headers()); // Execute the request - try (Response response = client.newCall(request).execute()) { - - // Preserve the response - responseMessage = response.message(); - responseCode = response.code(); - responseProtocol = response.protocol().toString(); - responseURL = response.request().url().toString(); - - // Case for pagination without limit - if (paginator != null && ( - response.code() != 200 || response.body() == null || - response.body().contentLength() == 0)) { - paginator.notifyPartialPage(); - } - - // If the request is unsuccessful clean up and throw a UserException - if (!isSuccessful(responseCode)) { - throw UserException - .dataReadError() - .message("HTTP request failed") - .addContext("Response code", response.code()) - .addContext("Response message", response.message()) - .addContext(errorContext) - .build(logger); - } - logger.debug("HTTP Request for {} successful.", url()); - logger.debug("Response Headers: {} ", response.headers()); + response = client.newCall(request).execute(); + + // Preserve the response + responseMessage = response.message(); + responseCode = response.code(); + responseProtocol = response.protocol().toString(); + responseURL = response.request().url().toString(); + + // Case for pagination without limit + if (paginator != null && ( + response.code() != 200 || response.body() == null || + response.body().contentLength() == 0)) { + paginator.notifyPartialPage(); + } - // Return the InputStream of the response. Note that it is necessary and - // and sufficient that the caller invokes close() on the returned stream. - return Objects.requireNonNull(response.body()).byteStream(); + // If the request is unsuccessful clean up and throw a UserException + if (!isSuccessful(responseCode)) { + AutoCloseables.closeSilently(response); + throw UserException + .dataReadError() + .message("HTTP request failed") + .addContext("Response code", response.code()) + .addContext("Response message", response.message()) + .addContext(errorContext) + .build(logger); } + logger.debug("HTTP Request for {} successful.", url()); + logger.debug("Response Headers: {} ", response.headers()); + + // Return the InputStream of the response. Note that it is necessary and + // and sufficient that the caller invokes close() on the returned stream. + return Objects.requireNonNull(response.body()).byteStream(); + } catch (IOException e) { // response can only be null at this location so we do not attempt to close it. throw UserException From 61236902cf6288ca8ed20d35083b1ad8cf52643d Mon Sep 17 00:00:00 2001 From: James Turton Date: Wed, 7 Sep 2022 16:56:38 +0200 Subject: [PATCH 6/6] Fix closing in getRequestAndStreamResponse in SimpleHttp. --- .../store/http/udfs/HttpHelperFunctions.java | 3 ++- .../exec/store/http/util/SimpleHttp.java | 22 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java index d227341acba..84a9262d3a3 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java @@ -78,7 +78,7 @@ public void eval() { return; } String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args); - // Make the API call + // Make the API call, we expect that results will be closed by the JsonLoader java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl); // If the result string is null or empty, return an empty map if (results == null) { @@ -175,6 +175,7 @@ public void eval() { if (args == null) { return; } + // we expect that results will be closed by the JsonLoader java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args) .getInputStream(); // If the result string is null or empty, return an empty map diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index cf3d080285a..05380d1e2e2 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -922,16 +922,25 @@ public static OkHttpClient getSimpleHttpClient() { } public static String getRequestAndStringResponse(String url) { + ResponseBody respBody = null; try { - return makeSimpleGetRequest(url).string(); + respBody = makeSimpleGetRequest(url); + return respBody.string(); } catch (IOException e) { throw UserException .dataReadError(e) .message("HTTP request failed") .build(logger); + } finally { + AutoCloseables.closeSilently(respBody); } } + /** + * + * @param url + * @return an input stream which the caller is responsible for closing. + */ public static InputStream getRequestAndStreamResponse(String url) { try { return makeSimpleGetRequest(url).byteStream(); @@ -943,6 +952,12 @@ public static InputStream getRequestAndStreamResponse(String url) { } } + /** + * + * @param url + * @return response body which the caller is responsible for closing. + * @throws IOException + */ public static ResponseBody makeSimpleGetRequest(String url) throws IOException { OkHttpClient client = getSimpleHttpClient(); Request.Builder requestBuilder = new Request.Builder() @@ -952,9 +967,8 @@ public static ResponseBody makeSimpleGetRequest(String url) throws IOException { Request request = requestBuilder.build(); // Execute the request - try (Response response = client.newCall(request).execute()) { - return response.body(); - } + Response response = client.newCall(request).execute(); + return response.body(); } /**