-
Notifications
You must be signed in to change notification settings - Fork 1
Aws backend - parallelize invocation and reduction #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: AWS-backend
Are you sure you want to change the base?
Conversation
Vetchu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another general thought is moving most function declarations to static in this file instead of inlining them (or moving these to separate file, utils)
PyRDF/backend/AWS.py
Outdated
| trials = 3 | ||
|
|
||
| client = boto3.client('lambda', region_name=self.region) | ||
| def process_response(response): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole function could be simplified to "try json.loads(response[payload].read) except exception e debug log and set empty dict"
PyRDF/backend/AWS.py
Outdated
| # Maybe here give info about number of invoked lambda for awsmonitor | ||
| #self.logger.info(f'New lambda - 31') | ||
|
|
||
| while trials: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be more explicit by stating "trials != 0", as that's what ends this while
PyRDF/backend/AWS.py
Outdated
| #self.logger.info(f'Invocation time: {my_after_time - my_before_time}') | ||
|
|
||
| if 'FunctionError' in response: | ||
| raise RuntimeError('Lambda runtime error') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error from response should be passed along to RuntimeError
| )['Body'].read()) | ||
|
|
||
| files = [] | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this construction repeats multiple times, could go as separate generic function
PyRDF/backend/AWS.py
Outdated
| except Exception as error: | ||
| # All other errors | ||
| self.logger.warn(error) | ||
| else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this else even needed at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code under this else is executed only if there is no exception, what indicates that the loop should be ended straight away, cause lambda invocation have been successful. Idk, can change it.
PyRDF/backend/AWS.py
Outdated
| response_payload = response['Payload'] | ||
| lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200 | ||
| if lambda_status != 200: | ||
| raise RuntimeError(f'Lambda status code {lambda_status}') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again the exact reason is not passed further
PyRDF/backend/AWS.py
Outdated
|
|
||
| if 'Payload' in response: | ||
| response_payload = response['Payload'] | ||
| lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it ever not be in payload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Payload is a serialized value that lambda returns. By default it is dict with some values and one of the keys is statusCode, but it can be omitted. It was done without prior knowledge of lambda's insides and with an assumption that some lambda logic may be based on the early return of other statusCode f.e. to indicate error. This is not the case here, so can remove this whole paylod thing for a potential speedup.
PyRDF/backend/AWS.py
Outdated
| pass | ||
|
|
||
| @staticmethod | ||
| def process_execution(s3_client, processing_bucket, num_of_lambdas, logger): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name is a bit misleading, maybe something more of "wait_for_all_lambdas" or "await_for_completion"
Vetchu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another general thought is moving most function declarations to static in this file instead of inlining them (or moving these to separate file, utils)
No description provided.