-
Notifications
You must be signed in to change notification settings - Fork 16.4k
openlineage, aws: Add OpenLineage support for AthenaOperator. #35090
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
openlineage, aws: Add OpenLineage support for AthenaOperator. #35090
Conversation
c9b1ad0 to
81ffa3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have 2 issues:
-
While this information is true it's not in the language of AthenaOperator it's in the language of boto3.
AthenaOperator exposeoutput_locationnotOutputLocation in ResultConfiguration
Also I don't think you can run Athena query without specifyingoutput_location? -
This information seems to be user facing not developer facing thus it should be in the docs not as comment in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I don't think you can run Athena query without specifying output_location?
Technically you could run Athena query without specifying OutputLocation in ResultConfiguration by setting up output location in Workgroup. However in AthenaOperator output_location it is mandatory field even though it is not mandatory in boto3 / AWS API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we should change it.
If user doesnt set it right I guess boto3/athena api will raise exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user doesnt set it right I guess boto3/athena api will raise exception
Yeas that is true. For example two cases
With this settings on primary group, which is used by default in boto3 / AthenaOperator

This simple code would run without any issues thought boto3
import boto3
session = boto3.session.Session(...)
client = session.client(service_name="athena")
response = client.start_query_execution(QueryString="SELECT 1")
assert responseHowever if WorkGroup doesn't assign any output location, like this one:

Then it would raise an error
import boto3
session = boto3.session.Session(...)
client = session.client(service_name="athena")
response = client.start_query_execution(QueryString="SELECT 1", WorkGroup="test_workgroup")botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: No output location provided. An output location is required either through the Workgroup result configuration setting or as an API input.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This information seems to be user facing not developer facing thus it should be in the docs not as comment in the code.
I agree. I'll remove those comments and move them to docs.
Regarding output location - whatever the change would be (seems like output_location should not be required) - is there a way to retrieve information from WorkGroup? Am I correct this is the right information: https://docs.aws.amazon.com/athena/latest/APIReference/API_ResultConfiguration.html#athena-Type-ResultConfiguration-OutputLocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you should better grab this information from GetQueryExecution API (Athena.Client.get_query_execution)
That is quite difficult (I think even impossible) to resolve it without query to this API endpoint, workgroup might have settings to always overwrite API settings for output locations
import boto3
session = boto3.session.Session(...)
client = session.client(service_name="athena")
query_execution_id = client.start_query_execution(QueryString="SELECT 1")["QueryExecutionId"]
response = client.get_query_execution(QueryExecutionId=query_execution_id)
result_configuration = response["QueryExecution"]["ResultConfiguration"]{
"OutputLocation": "s3://foo-bar/70c5fe8e-eb6c-4d98-b584-23b0120d4671.csv",
"EncryptionConfiguration": {
"EncryptionOption": "SSE_S3"
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made some more research. It seems like there might be different cases, e.g. set external_location in CTAS query that overrides setting from workgroup (or raise error if enforcing query location is enabled).
This all seems a bit too complicated but on the other hand get_query_execution returns OutputLocation as well. Therefore I think it will be best to save the property when checking query status. WDYT?
@Taragolis I missed your comment. The only question is if in order not to call the endpoint one more time we could set output_location after the query is in succeed state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eladkal @Taragolis I've let myself introduce caching to get_query_execution caching with backwards compatibility - user needs to explicitly set use_cached_response argument.
output_location is now set when query succeeds and reused later in OL.
2f5183f to
cda1214
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks a bit hacky for me, maybe we could specify in constructor private attribute and store information there. WDYT?
def __init__(...):
...
self.__query_results: dict[str, Any] = {}
def get_query_execution(self, query_execution_id: str, use_cache: bool = False):
if use_cache and query_execution_id in self.__query_results:
return self.__query_results[query_execution_id]
response = self.conn.get_query_execution(QueryExecutionId=query_execution_id)
if use_cache:
self.__query_results[query_execution_id] = response
return responseThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might look hacky, I agree.
Motivation was to use Python's caching implementation which is well documented imho but it's too simple and readable to replace it with your suggestion.
Changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Taragolis would you mind looking at the changes? Thanks in advance!
cda1214 to
e147330
Compare
Based on: OpenLineage/OpenLineage#1328. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
Adjust code to catalog and output_location changes. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
Rename `get_query_execution` to `get_query_info`. Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
e147330 to
94eb315
Compare
Based on: OpenLineage/OpenLineage#1328.
This PR adds OpenLineage support for AthenaOperator. It includes symlink facet to point to physical location as well.