Adding Interactive API's for MSQ engine#14416
Conversation
|
|
||
| TaskStatusResponse taskResponse = overlordClient.getTaskStatus(queryId); | ||
| if (taskResponse == null) { | ||
| return Response.status(Response.Status.NOT_FOUND).build(); |
There was a problem hiding this comment.
It will be useful to add a message that includes queryId in it.
|
|
||
| TaskStatusPlus statusPlus = taskResponse.getStatus(); | ||
| if (statusPlus == null || !MSQControllerTask.TYPE.equals(statusPlus.getType())) { | ||
| return Response.status(Response.Status.NOT_FOUND).build(); |
There was a problem hiding this comment.
it will be useful to add an error message along with response code.
| ); | ||
| } else if (sqlStatementState == SqlStatementState.FAILED) { | ||
| return buildNonOkResponse( | ||
| Response.Status.NOT_FOUND.getStatusCode(), |
There was a problem hiding this comment.
it shouldn't be a 404 IMO. the status code should come from statusPlus itself.
| new QueryException(null, statusPlus.getErrorMsg(), null, null), | ||
| queryId | ||
| ); | ||
| } else { |
There was a problem hiding this comment.
you can avoid this nested code by using following structure
if (abc)
return xyz
if (abcd)
return something_else
code for default
| TaskStatusPlus statusPlus = taskResponse.getStatus(); | ||
| if (statusPlus == null || !MSQControllerTask.TYPE.equals(statusPlus.getType())) { | ||
| return Optional.empty(); | ||
| } |
There was a problem hiding this comment.
is this expected? If not, can we log some warning?
There was a problem hiding this comment.
I think we should not add a warning since its bad user input.
Adding more sanity post test
2. Adding rows and size to response. 3. Adding dataSource to response. 4. Adding exceptionDetails to response.
# Conflicts: # sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
| @GET | ||
| @Path("/enabled") | ||
| @Produces(MediaType.APPLICATION_JSON) | ||
| public Response doGetEnabled(@Context final HttpServletRequest request) |
There was a problem hiding this comment.
this context will be helpful in the javadocs.
| if (numRows != 1) { | ||
| throw new RE("Expected a single row but got [%d] rows. Please check broker logs for more information.", numRows); | ||
| } | ||
| String taskId = (String) rows.get(0)[0]; |
There was a problem hiding this comment.
please add a size check on rows.get(0)
| for (Map.Entry<String, Object> worker : counterMap.entrySet()) { | ||
| Object workerChannels = worker.getValue(); | ||
| if (workerChannels == null || !(workerChannels instanceof Map)) { | ||
| return Optional.empty(); |
There was a problem hiding this comment.
do you ever expect this though?
There was a problem hiding this comment.
No. Just defensive coding.
| return FutureUtils.getUnchecked(future, true); | ||
| } | ||
| catch (RuntimeException e) { | ||
| throw new QueryException(null, "Unable to contact overlord " + e.getMessage(), null, null, null); |
There was a problem hiding this comment.
whats the expected action associated with this error message?
There was a problem hiding this comment.
This can happen for n reasons. Changed it to a developer exception.
DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.UNCATEGORIZED)
.build("Unable to contact overlord " + e.getMessage());
}
| MSQControllerTask msqControllerTask = getMSQControllerTaskOrThrow(queryId, authenticationResult.getIdentity()); | ||
| Optional<List<ColNameAndType>> signature = getSignature(msqControllerTask); | ||
| if (!signature.isPresent()) { | ||
| return Response.ok().build(); |
There was a problem hiding this comment.
hmm. what does it mean for signature to be absent?
There was a problem hiding this comment.
Signature should not be empty but that's for the MSQ engine to decide.
We can't return results if the signature is empty. So if the results call get's an empty signature, we just return an empty result.
| if (!signature.isPresent()) { | ||
| return Response.ok().build(); | ||
| } | ||
| Optional<List<Object>> results = getResults(getPayload(overlordWork(overlordClient.taskReportAsMap(queryId)))); |
There was a problem hiding this comment.
so the results are in the task report?
2. Moving to DruidExceptions
adarshsanjeev
left a comment
There was a problem hiding this comment.
Partially reviewing this, still going through a few classes. Looks good to me overall. Had a few nits.
| } | ||
|
|
||
| private byte[] responseToByteArray(Response resp) throws IOException | ||
| public static byte[] responseToByteArray(Response resp) throws IOException |
There was a problem hiding this comment.
Minor nit: Could be pushed up to CalciteTestBase so that other classes can use it in the future
There was a problem hiding this comment.
I generally avoid pushing stuff to calcite test base since its already a big class until necessary. We can probably do that if this method is being used in other places in the future.
|
|
||
| ListenableFuture<Map<String, Object>> taskReportAsMap(String taskId); | ||
|
|
||
| ListenableFuture<TaskPayloadResponse> taskPayload(String taskId); |
There was a problem hiding this comment.
nit: This function might need a better name to avoid confusing it with the task payload containing results, since we have a payload() function in SqlStatementResource already. Maybe taskDefinition?
There was a problem hiding this comment.
This is borrowed for the existing indexingServiceClient hence I did not want to change the name.
Code coverage things. Review comments.
| final long last = SqlStatementResourceHelper.getLastIndex(numberOfRows, start); | ||
|
|
||
|
|
||
| getStatementStatus(queryId, authenticationResult.getIdentity(), false); |
There was a problem hiding this comment.
Is this call required? It seems to be checked after this already.
There was a problem hiding this comment.
Oh yes that needs to be removed. Thanks for the catch.
adarshsanjeev
left a comment
There was a problem hiding this comment.
Looks good to me overall. I don't see any blocking issues here.
| { | ||
|
|
||
| try { | ||
| Access authResult = AuthorizationUtils.authorizeAllResourceActions( |
There was a problem hiding this comment.
@abhishekagarwal87 I am following the pattern from
We might need to add a new method to AuthorizationUtilsto set the req header but that can be done as part of a follow up PR.
|
Thanks @abhishekagarwal87 @adarshsanjeev for the reviews. User Facing docs would be coming soon. |
…14527) One of the most requested features in druid is to have an ability to download big result sets. As part of #14416 , we added an ability for MSQ to be queried via a query friendly endpoint. This PR builds upon that work and adds the ability for MSQ to write select results to durable storage. We write the results to the durable storage location <prefix>/results/<queryId> in the druid frame format. This is exposed to users by /v2/sql/statements/:queryId/results.
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API #14512 Allow empty tiered replicants map for load rules #14432 Adding Interactive API's for MSQ engine #14416 Add replication factor column to sys table #14403 Account for data format and compression in MSQ auto taskAssignment #14307 Errors take 3 #14004
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API apache#14512 Allow empty tiered replicants map for load rules apache#14432 Adding Interactive API's for MSQ engine apache#14416 Add replication factor column to sys table apache#14403 Account for data format and compression in MSQ auto taskAssignment apache#14307 Errors take 3 apache#14004
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API #14512 Allow empty tiered replicants map for load rules #14432 Adding Interactive API's for MSQ engine #14416 Add replication factor column to sys table #14403 Account for data format and compression in MSQ auto taskAssignment #14307 Errors take 3 #14004 Co-authored-by: Vadim Ogievetsky <vadim@ogievetsky.com>
This PR aims to expose a new API called "@path("/druid/v2/sql/statements/")" which takes the same payload as the current "/druid/v2/sql" endpoint and allows users to fetch results in an async manner.
…pache#14527) One of the most requested features in druid is to have an ability to download big result sets. As part of apache#14416 , we added an ability for MSQ to be queried via a query friendly endpoint. This PR builds upon that work and adds the ability for MSQ to write select results to durable storage. We write the results to the durable storage location <prefix>/results/<queryId> in the druid frame format. This is exposed to users by /v2/sql/statements/:queryId/results.
This PR catches the console up to all the backend changes for Druid 27 Specifically: Add page information to SqlStatementResource API apache#14512 Allow empty tiered replicants map for load rules apache#14432 Adding Interactive API's for MSQ engine apache#14416 Add replication factor column to sys table apache#14403 Account for data format and compression in MSQ auto taskAssignment apache#14307 Errors take 3 apache#14004
The current SQL endpoint is more suited toward hot queries where the client blocks for the results. The current task endpoint is more of a fire-and-forget API, primarily meant for ingestion.
This PR aims to expose a new API called
"@path("/druid/v2/sql/statements/")" which takes the same payload as the current "/druid/v2/sql" endpoint and allows users to fetch results in an async manner.
The statement execution API runs a single SQL statement which can be:
POST SQL statement
POST /sql/statementsExecutes a SQL statement.
so that users can easily switch to the new async apis.
Two additional context field:
Query execution failures
This sections is exactly similar as https://druid.apache.org/docs/latest/querying/querying.html#query-execution-failures
If a query fails, Druid returns a response with an HTTP response code and a JSON object with the following structure:
Status
GET /sql/statements/{id}Returns the same response as the post API.
Results
GET /sql/statements/{id}/results?offset=x&numRows=y&size=s&timeout=zReturns a page of results if they are available.
Close Query
DELETE /sql/statements/{id}Cancels a running/accepted query.
TODO
This PR has: