diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java index 99159482f21..e2bfee91149 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java @@ -127,6 +127,7 @@ public void addContext(UserException.Builder builder) { buildImplicitColumns(); } + // inStream is expected to be closed by the JsonLoader. InputStream inStream = http.getInputStream(); populateImplicitFieldMap(http); diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java index da4f78c5db3..bb5d217e14c 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java @@ -143,7 +143,7 @@ public boolean next() { @Override public void close() { - AutoCloseables.closeSilently(inStream); AutoCloseables.closeSilently(xmlReader); + AutoCloseables.closeSilently(inStream); } } diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java index a95bc9ff60a..84a9262d3a3 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java @@ -78,7 +78,7 @@ public void eval() { return; } String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args); - // Make the API call + // Make the API call, we expect that results will be closed by the JsonLoader java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl); // If the result string is null or empty, return an empty map if (results == null) { @@ -93,6 +93,8 @@ public void eval() { rowWriter.start(); if (jsonLoader.parser().next()) { rowWriter.save(); + } else { + jsonLoader.close(); } } catch (Exception e) { throw org.apache.drill.common.exceptions.UserException.dataReadError(e) @@ -173,6 +175,7 @@ public void eval() { if (args == null) { return; } + // we expect that results will be closed by the JsonLoader java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args) .getInputStream(); // If the result string is null or empty, return an empty map @@ -189,6 +192,8 @@ public void eval() { rowWriter.start(); if (jsonLoader.parser().next()) { rowWriter.save(); + } else { + jsonLoader.close(); } } catch (Exception e) { throw org.apache.drill.common.exceptions.UserException.dataReadError(e) diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java index ee5b285f1ef..05380d1e2e2 100644 --- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java +++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java @@ -35,6 +35,7 @@ import org.apache.drill.common.exceptions.EmptyErrorContext; import org.apache.drill.common.logical.OAuthConfig; import org.apache.drill.common.logical.StoragePluginConfig.AuthMode; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; @@ -321,7 +322,8 @@ public X509Certificate[] getAcceptedIssuers() { /** * Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong * the method throws a UserException. - * @return An Inputstream of the data from the URL call. + * @return An Inputstream of the data from the URL call. The caller is responsible for calling + * close() on the InputStream. */ public InputStream getInputStream() { @@ -369,15 +371,14 @@ public InputStream getInputStream() { // Build the request object Request request = requestBuilder.build(); + Response response = null; try { logger.debug("Executing request: {}", request); logger.debug("Headers: {}", request.headers()); // Execute the request - Response response = client - .newCall(request) - .execute(); + response = client.newCall(request).execute(); // Preserve the response responseMessage = response.message(); @@ -392,8 +393,9 @@ public InputStream getInputStream() { paginator.notifyPartialPage(); } - // If the request is unsuccessful, throw a UserException + // If the request is unsuccessful clean up and throw a UserException if (!isSuccessful(responseCode)) { + AutoCloseables.closeSilently(response); throw UserException .dataReadError() .message("HTTP request failed") @@ -405,9 +407,12 @@ public InputStream getInputStream() { logger.debug("HTTP Request for {} successful.", url()); logger.debug("Response Headers: {} ", response.headers()); - // Return the InputStream of the response + // Return the InputStream of the response. Note that it is necessary and + // and sufficient that the caller invokes close() on the returned stream. return Objects.requireNonNull(response.body()).byteStream(); + } catch (IOException e) { + // response can only be null at this location so we do not attempt to close it. throw UserException .dataReadError(e) .message("Failed to read the HTTP response body") @@ -419,10 +424,14 @@ public InputStream getInputStream() { public String getResultsFromApiCall() { InputStream inputStream = getInputStream(); - return new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8)) - .lines() - .collect(Collectors.joining("\n")); + try { + return new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + } finally { + AutoCloseables.closeSilently(inputStream); + } } public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) { @@ -913,16 +922,25 @@ public static OkHttpClient getSimpleHttpClient() { } public static String getRequestAndStringResponse(String url) { + ResponseBody respBody = null; try { - return makeSimpleGetRequest(url).string(); + respBody = makeSimpleGetRequest(url); + return respBody.string(); } catch (IOException e) { throw UserException .dataReadError(e) .message("HTTP request failed") .build(logger); + } finally { + AutoCloseables.closeSilently(respBody); } } + /** + * + * @param url + * @return an input stream which the caller is responsible for closing. + */ public static InputStream getRequestAndStreamResponse(String url) { try { return makeSimpleGetRequest(url).byteStream(); @@ -934,6 +952,12 @@ public static InputStream getRequestAndStreamResponse(String url) { } } + /** + * + * @param url + * @return response body which the caller is responsible for closing. + * @throws IOException + */ public static ResponseBody makeSimpleGetRequest(String url) throws IOException { OkHttpClient client = getSimpleHttpClient(); Request.Builder requestBuilder = new Request.Builder() @@ -943,8 +967,8 @@ public static ResponseBody makeSimpleGetRequest(String url) throws IOException { Request request = requestBuilder.build(); // Execute the request - Response response = client.newCall(request).execute(); - return response.body(); + Response response = client.newCall(request).execute(); + return response.body(); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java index 573df78cba1..8b80499bb67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/http/oauth/OAuthUtils.java @@ -38,13 +38,13 @@ public class OAuthUtils { private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class); /** - * Craft a GET request to obtain an access token. + * Crafts a POST request to obtain an access token. * @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode * @param authorizationCode The authorization code from the OAuth2.0 enabled API * @param callbackURL The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place. * @return A Request Body to obtain an access token */ - public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) { + public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) { return new FormBody.Builder() .add("grant_type", "authorization_code") .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID)) @@ -55,12 +55,12 @@ public static RequestBody getPostResponse(CredentialsProvider credentialsProvide } /** - * Crafts a POST response for refreshing an access token when a refresh token is present. + * Crafts a POST request for refreshing an access token when a refresh token is present. * @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken * @param refreshToken The refresh token * @return A Request Body with the correct parameters for obtaining an access token */ - public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) { + public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) { return new FormBody.Builder() .add("grant_type", "refresh_token") .add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID)) @@ -90,7 +90,7 @@ public static Request getAccessTokenRequest(CredentialsProvider credentialsProvi .url(buildAccessTokenURL(credentialsProvider)) .header("Content-Type", "application/json") .addHeader("Accept", "application/json") - .post(getPostResponse(credentialsProvider, authenticationCode, callbackURL)) + .post(getPostRequest(credentialsProvider, authenticationCode, callbackURL)) .build(); } @@ -110,7 +110,7 @@ public static Request getAccessTokenRequestFromRefreshToken(CredentialsProvider .url(tokenURI) .header("Content-Type", "application/json") .addHeader("Accept", "application/json") - .post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken)) + .post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken)) .build(); } @@ -127,9 +127,10 @@ public static Map getOAuthTokens(OkHttpClient client, Request re String accessToken; String refreshToken; Map tokens = new HashMap<>(); + Response response = null; try { - Response response = client.newCall(request).execute(); + response = client.newCall(request).execute(); String responseBody = response.body().string(); if (!response.isSuccessful()) { @@ -164,13 +165,14 @@ public static Map getOAuthTokens(OkHttpClient client, Request re refreshToken = (String) parsedJson.get("refresh_token"); tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken); } - response.close(); return tokens; } catch (NullPointerException | IOException e) { throw UserException.connectionError() .message("Error refreshing access OAuth2 access token. " + e.getMessage()) .build(logger); + } finally { + response.close(); } } }