Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void addContext(UserException.Builder builder) {
buildImplicitColumns();
}

// inStream is expected to be closed by the JsonLoader.
InputStream inStream = http.getInputStream();
populateImplicitFieldMap(http);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public boolean next() {

@Override
public void close() {
AutoCloseables.closeSilently(inStream);
AutoCloseables.closeSilently(xmlReader);
AutoCloseables.closeSilently(inStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void eval() {
return;
}
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
// Make the API call
// Make the API call, we expect that results will be closed by the JsonLoader
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
// If the result string is null or empty, return an empty map
if (results == null) {
Expand All @@ -93,6 +93,8 @@ public void eval() {
rowWriter.start();
if (jsonLoader.parser().next()) {
rowWriter.save();
} else {
jsonLoader.close();
}
} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
Expand Down Expand Up @@ -173,6 +175,7 @@ public void eval() {
if (args == null) {
return;
}
// we expect that results will be closed by the JsonLoader
java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(plugin, endpointConfig, drillbitContext, args)
.getInputStream();
// If the result string is null or empty, return an empty map
Expand All @@ -189,6 +192,8 @@ public void eval() {
rowWriter.start();
if (jsonLoader.parser().next()) {
rowWriter.save();
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly close the results InputStream here as well? Would mind testing this on a query that produces multiple batches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgivre

  1. The JsonLoader closes the input streams it's been working off of when it is closed so I don't think so.
  2. Multiple batch datasets do not work with these UDFs yet from what I recall? I think @vdiravka continues to work on that, perhaps he can comment on the closing of the JsonLoader here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my recollection, this function does handle the multiple batches. It was the convert_fromJSON that @vdiravka was working on.

Copy link
Contributor Author

@jnturton jnturton Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgivre yes, you're right. I tried a couple of things. First I provided a JSON response that would normally produce 64k+1 rows if queried to http_get but it looked to me like it was being handled in a single batch since, I guess, the row count of a query based on VALUES(1) is still 1. I then wrote a query to SELECT http_get(some simple JSON) from a mock table containing 64k+1 rows. This overwhelms the okhttp3 mock server and fails with a timeout. I'm not sure if there some other test to try here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fed the http_get function a string containing 50 million little JSON objects from sequence {"foo": 1} {"foo": 2} {"foo": 3}... and it got through it (took about 45s). I just don't know if that answers the right question.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works for me :-)

jsonLoader.close();
}
} catch (Exception e) {
throw org.apache.drill.common.exceptions.UserException.dataReadError(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.logical.OAuthConfig;
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
Expand Down Expand Up @@ -321,7 +322,8 @@ public X509Certificate[] getAcceptedIssuers() {
/**
* Returns an InputStream based on the URL and config in the scanSpec. If anything goes wrong
* the method throws a UserException.
* @return An Inputstream of the data from the URL call.
* @return An Inputstream of the data from the URL call. The caller is responsible for calling
* close() on the InputStream.
*/
public InputStream getInputStream() {

Expand Down Expand Up @@ -369,15 +371,14 @@ public InputStream getInputStream() {

// Build the request object
Request request = requestBuilder.build();
Response response = null;

try {
logger.debug("Executing request: {}", request);
logger.debug("Headers: {}", request.headers());

// Execute the request
Response response = client
.newCall(request)
.execute();
response = client.newCall(request).execute();

// Preserve the response
responseMessage = response.message();
Expand All @@ -392,8 +393,9 @@ public InputStream getInputStream() {
paginator.notifyPartialPage();
}

// If the request is unsuccessful, throw a UserException
// If the request is unsuccessful clean up and throw a UserException
if (!isSuccessful(responseCode)) {
AutoCloseables.closeSilently(response);
throw UserException
.dataReadError()
.message("HTTP request failed")
Expand All @@ -405,9 +407,12 @@ public InputStream getInputStream() {
logger.debug("HTTP Request for {} successful.", url());
logger.debug("Response Headers: {} ", response.headers());

// Return the InputStream of the response
// Return the InputStream of the response. Note that it is necessary and
// and sufficient that the caller invokes close() on the returned stream.
return Objects.requireNonNull(response.body()).byteStream();

} catch (IOException e) {
// response can only be null at this location so we do not attempt to close it.
throw UserException
.dataReadError(e)
.message("Failed to read the HTTP response body")
Expand All @@ -419,10 +424,14 @@ public InputStream getInputStream() {

public String getResultsFromApiCall() {
InputStream inputStream = getInputStream();
return new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
try {
return new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.collect(Collectors.joining("\n"));
} finally {
AutoCloseables.closeSilently(inputStream);
}
}

public static HttpProxyConfig getProxySettings(HttpStoragePluginConfig config, Config drillConfig, HttpUrl url) {
Expand Down Expand Up @@ -913,16 +922,25 @@ public static OkHttpClient getSimpleHttpClient() {
}

public static String getRequestAndStringResponse(String url) {
ResponseBody respBody = null;
try {
return makeSimpleGetRequest(url).string();
respBody = makeSimpleGetRequest(url);
return respBody.string();
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("HTTP request failed")
.build(logger);
} finally {
AutoCloseables.closeSilently(respBody);
}
}

/**
*
* @param url
* @return an input stream which the caller is responsible for closing.
*/
public static InputStream getRequestAndStreamResponse(String url) {
try {
return makeSimpleGetRequest(url).byteStream();
Expand All @@ -934,6 +952,12 @@ public static InputStream getRequestAndStreamResponse(String url) {
}
}

/**
*
* @param url
* @return response body which the caller is responsible for closing.
* @throws IOException
*/
public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
OkHttpClient client = getSimpleHttpClient();
Request.Builder requestBuilder = new Request.Builder()
Expand All @@ -943,8 +967,8 @@ public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
Request request = requestBuilder.build();

// Execute the request
Response response = client.newCall(request).execute();
return response.body();
Response response = client.newCall(request).execute();
return response.body();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public class OAuthUtils {
private static final Logger logger = LoggerFactory.getLogger(OAuthUtils.class);

/**
* Craft a GET request to obtain an access token.
* Crafts a POST request to obtain an access token.
* @param credentialsProvider A credential provider containing the clientID, clientSecret and authorizationCode
* @param authorizationCode The authorization code from the OAuth2.0 enabled API
* @param callbackURL The callback URL. For our purposes this is obtained from the incoming Drill request as it all goes to the same place.
* @return A Request Body to obtain an access token
*/
public static RequestBody getPostResponse(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
public static RequestBody getPostRequest(CredentialsProvider credentialsProvider, String authorizationCode, String callbackURL) {
return new FormBody.Builder()
.add("grant_type", "authorization_code")
.add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
Expand All @@ -55,12 +55,12 @@ public static RequestBody getPostResponse(CredentialsProvider credentialsProvide
}

/**
* Crafts a POST response for refreshing an access token when a refresh token is present.
* Crafts a POST request for refreshing an access token when a refresh token is present.
* @param credentialsProvider A credential provider containing the clientID, clientSecret and refreshToken
* @param refreshToken The refresh token
* @return A Request Body with the correct parameters for obtaining an access token
*/
public static RequestBody getPostResponseForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
public static RequestBody getPostRequestForTokenRefresh(CredentialsProvider credentialsProvider, String refreshToken) {
return new FormBody.Builder()
.add("grant_type", "refresh_token")
.add("client_id", credentialsProvider.getCredentials().get(OAuthTokenCredentials.CLIENT_ID))
Expand Down Expand Up @@ -90,7 +90,7 @@ public static Request getAccessTokenRequest(CredentialsProvider credentialsProvi
.url(buildAccessTokenURL(credentialsProvider))
.header("Content-Type", "application/json")
.addHeader("Accept", "application/json")
.post(getPostResponse(credentialsProvider, authenticationCode, callbackURL))
.post(getPostRequest(credentialsProvider, authenticationCode, callbackURL))
.build();
}

Expand All @@ -110,7 +110,7 @@ public static Request getAccessTokenRequestFromRefreshToken(CredentialsProvider
.url(tokenURI)
.header("Content-Type", "application/json")
.addHeader("Accept", "application/json")
.post(getPostResponseForTokenRefresh(credentialsProvider, refreshToken))
.post(getPostRequestForTokenRefresh(credentialsProvider, refreshToken))
.build();
}

Expand All @@ -127,9 +127,10 @@ public static Map<String, String> getOAuthTokens(OkHttpClient client, Request re
String accessToken;
String refreshToken;
Map<String, String> tokens = new HashMap<>();
Response response = null;

try {
Response response = client.newCall(request).execute();
response = client.newCall(request).execute();
String responseBody = response.body().string();

if (!response.isSuccessful()) {
Expand Down Expand Up @@ -164,13 +165,14 @@ public static Map<String, String> getOAuthTokens(OkHttpClient client, Request re
refreshToken = (String) parsedJson.get("refresh_token");
tokens.put(OAuthTokenCredentials.REFRESH_TOKEN, refreshToken);
}
response.close();
return tokens;

} catch (NullPointerException | IOException e) {
throw UserException.connectionError()
.message("Error refreshing access OAuth2 access token. " + e.getMessage())
.build(logger);
} finally {
response.close();
}
}
}