From a4498b5682b0e5de12e2cebbc1c6203a48cbc201 Mon Sep 17 00:00:00 2001 From: Kamilbur Date: Wed, 14 Jul 2021 11:33:50 +0200 Subject: [PATCH 1/3] Parallelize invocation phase. --- PyRDF/backend/AWS.py | 130 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 111 insertions(+), 19 deletions(-) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index 7445671..1824c77 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -4,11 +4,31 @@ import json import logging import time +import threading +import concurrent.futures +import os import boto3 +import botocore import cloudpickle as pickle -from PyRDF.backend.Dist import Dist +from .Dist import Dist + + +class FlushingLogger: + def __init__(self): + self.logger = logging.getLogger() + + def __getattr__(self, name): + method = getattr(self.logger, name) + if name in ['info', ]: + def flushed_method(msg, *args, **kwargs): + method(msg, *args, **kwargs) + for h in self.logger.handlers: + h.flush() + return flushed_method + else: + return method class AWS(Dist): @@ -25,7 +45,7 @@ def __init__(self, config={}): more support will be added in future. """ super(AWS, self).__init__(config) - self.logger = logging.getLogger() + self.logger = FlushingLogger() if logging.root.level >= logging.INFO else logging.getLogger() self.npartitions = self._get_partitions() self.region = config.get('region') or 'us-east-1' @@ -64,6 +84,7 @@ def encode_object(object_to_encode) -> str: ssm_client = boto3.client('ssm', region_name=self.region) # Check for existence of infrastructure + """ s3_output_bucket = ssm_client.get_parameter(Name='output_bucket')['Parameter']['Value'] if not s3_output_bucket: self.logger.info('AWS backend not initialized!') @@ -82,8 +103,24 @@ def encode_object(object_to_encode) -> str: Value=str(pickled_reducer), Overwrite=True ) + """ + + def invoke_root_lambda(root_range, script): + client = boto3.client('lambda', region_name=self.region) + + def process_response(response): + if response: + if 'Payload' in response: + payload = response['Payload'] + if isinstance(payload, botocore.response.StreamingBody): + payload_bytes = payload.read() + try: + response['Payload'] = json.loads(payload_bytes) + except json.JSONDecodeError: + response['Payload'] = {} + + return response - def invoke_root_lambda(client, root_range, script): payload = json.dumps({ 'range': encode_object(root_range), 'script': script, @@ -92,20 +129,73 @@ def invoke_root_lambda(client, root_range, script): 'filelist': str(root_range.filelist), 'friend_info': encode_object(root_range.friend_info) }) - return client.invoke( - FunctionName='root_lambda', - InvocationType='Event', - Payload=bytes(payload, encoding='utf8') - ) + + # Maybe here give info about number of invoked lambda for awsmonitor + #self.logger.info(f'New lambda - 31') + + while True: + try: + #my_before_time = time.time() + response = client.invoke( + FunctionName='root_lambda', + InvocationType= 'Event', #'RequestResponse', + Payload=bytes(payload, encoding='utf8') + ) + #my_after_time = time.time() + #self.logger.info(f'Invocation time: {my_after_time - my_before_time}') + + if 'FunctionError' in response: + raise RuntimeError('Lambda runtime error') + + response = process_response(response) + + if 'Payload' in response: + response_payload = response['Payload'] + lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200 + if lambda_status != 200: + raise RuntimeError(f'Lambda status code {lambda_status}') + except botocore.exceptions.ClientError as error: + # AWS site errors + self.logger.warn(error['Error']['Message']) + except RuntimeError as error: + # Lambda runtime errors and errors from + # lambdas that returned statusCode + # different than 200 + print(error) + except Exception as error: + # All other errors + print(error) + else: + break + time.sleep(1) + + return response + + + self.logger.info(f'Before lambdas invoke. Number of lambdas: {len(ranges)}') + + processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] + + s3_resource.Bucket(processing_bucket).objects.all().delete() + invoke_begin = time.time() # Invoke workers with ranges and mapper call_results = [] - for root_range in ranges: + """ + for lambda_num, root_range in enumerate(ranges): call_result = invoke_root_lambda(lambda_client, root_range, pickled_mapper) call_results.append(call_result) + self.logger.info(f'New lambda - {lambda_num}') + """ + + with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: + executor.submit(AWS.process_execution, s3_client, processing_bucket, len(ranges), self.logger) + call_results = executor.map(lambda root_range: invoke_root_lambda(root_range, pickled_mapper), ranges) + executor.shutdown(wait=True) wait_begin = time.time() + self.logger.info('All lambdas have been invoked') # while True: # results = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix='out.pickle') @@ -115,16 +205,6 @@ def invoke_root_lambda(client, root_range, script): # time.sleep(1) # result = s3.get_object(s3_output_bucket, 'out.pickle') - processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] - - # Wait until all lambdas finished execution - while True: - results = s3_client.list_objects_v2(Bucket=processing_bucket) - if results['KeyCount'] == len(ranges): - break - self.logger.info(f'Lambdas finished: {results["KeyCount"]}') - time.sleep(1) - reduce_begin = time.time() # Get names of output files, download and reduce them filenames = s3_client.list_objects_v2(Bucket=processing_bucket)['Contents'] @@ -160,3 +240,15 @@ def invoke_root_lambda(client, root_range, script): def distribute_files(self, includes_list): pass + + @staticmethod + def process_execution(s3_client, processing_bucket, num_of_lambdas, logger): + # Wait until all lambdas finished execution + while True: + results = s3_client.list_objects_v2(Bucket=processing_bucket) + logger.info(f'Lambdas finished: {results["KeyCount"]}') + if results['KeyCount'] == num_of_lambdas: + break + time.sleep(1) + + From 5437a0c63d34eb5a71b11010395f92706870493d Mon Sep 17 00:00:00 2001 From: Kamilbur Date: Fri, 23 Jul 2021 23:09:09 +0200 Subject: [PATCH 2/3] Parallelize reduction phase. --- PyRDF/backend/AWS.py | 67 +++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index 1824c77..c511bfd 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -106,8 +106,10 @@ def encode_object(object_to_encode) -> str: """ def invoke_root_lambda(root_range, script): - client = boto3.client('lambda', region_name=self.region) + trials = 3 + + client = boto3.client('lambda', region_name=self.region) def process_response(response): if response: if 'Payload' in response: @@ -133,7 +135,8 @@ def process_response(response): # Maybe here give info about number of invoked lambda for awsmonitor #self.logger.info(f'New lambda - 31') - while True: + while trials: + trials -= 1 try: #my_before_time = time.time() response = client.invoke( @@ -158,17 +161,20 @@ def process_response(response): # AWS site errors self.logger.warn(error['Error']['Message']) except RuntimeError as error: - # Lambda runtime errors and errors from - # lambdas that returned statusCode - # different than 200 - print(error) + # Lambda runtime errors and lambdas that + # returned statusCode different than 200 + # (maybe also response processing errors) + self.logger.warn(error) except Exception as error: # All other errors - print(error) + self.logger.warn(error) else: break time.sleep(1) + # Note: lambda finishes before s3 object is created + #self.logger.info('Lambda finished :)') + return response @@ -189,9 +195,10 @@ def process_response(response): self.logger.info(f'New lambda - {lambda_num}') """ + with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: executor.submit(AWS.process_execution, s3_client, processing_bucket, len(ranges), self.logger) - call_results = executor.map(lambda root_range: invoke_root_lambda(root_range, pickled_mapper), ranges) + futures = [executor.submit(invoke_root_lambda, root_range, pickled_mapper) for root_range in ranges] executor.shutdown(wait=True) wait_begin = time.time() @@ -205,22 +212,42 @@ def process_response(response): # time.sleep(1) # result = s3.get_object(s3_output_bucket, 'out.pickle') + reduce_begin = time.time() # Get names of output files, download and reduce them + results = s3_client.list_objects_v2(Bucket=processing_bucket) + self.logger.info(f'Lambdas finished: {results["KeyCount"]}') filenames = s3_client.list_objects_v2(Bucket=processing_bucket)['Contents'] - # need better way to do that - accumulator = pickle.loads(s3_client.get_object( - Bucket=processing_bucket, - Key=filenames[0]['Key'] - )['Body'].read()) + def get_from_s3(filename): + s3_client = boto3.client('s3', region_name=self.region) + return pickle.loads(s3_client.get_object( + Bucket=processing_bucket, + Key=filename['Key'] + )['Body'].read()) + + files = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor: + futures = [executor.submit(get_from_s3, filename) for filename in filenames] + executor.shutdown(wait=True) + files = [future.result() for future in futures] + + to_process = files + while len(to_process) > 1: + even_index_files = [to_process[i] for i in range(len(to_process)) if i % 2 == 0] + odd_index_files = [to_process[i] for i in range(len(to_process)) if i % 2 == 1] + + with concurrent.futures.ThreadPoolExecutor(len(to_process)) as executor: + futures = [executor.submit(reducer, pair[0], pair[1]) for pair in zip(even_index_files, odd_index_files)] + executor.shutdown(wait=True) + to_process = [future.result() for future in futures] + + if len(even_index_files) > len(odd_index_files): + to_process.append(even_index_files[-1]) + elif len(even_index_files) < len(odd_index_files): + to_process.append(odd_index_files[-1]) - for filename in filenames[1:]: - file = pickle.loads(s3_client.get_object( - Bucket=processing_bucket, - Key=filename['Key'] - )['Body'].read()) - accumulator = reducer(accumulator, file) + reduction_result = to_process[0] # Clean up intermediate objects after we're done s3_resource.Bucket(processing_bucket).objects.all().delete() @@ -234,7 +261,7 @@ def process_response(response): print(bench) - return accumulator + return reduction_result # reduced_output = pickle.loads(result) # return reduced_output From 335a0180182d76c30d68a2cab31955bbde8a20a4 Mon Sep 17 00:00:00 2001 From: Kamilbur Date: Wed, 4 Aug 2021 02:21:32 +0200 Subject: [PATCH 3/3] ROOT files compression and PR corrections --- PyRDF/backend/AWS.py | 274 +++++++++++++++++++++++-------------------- 1 file changed, 149 insertions(+), 125 deletions(-) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index c511bfd..9b59049 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -4,9 +4,11 @@ import json import logging import time -import threading import concurrent.futures import os +import sys +import ROOT +from pathlib import Path import boto3 import botocore @@ -14,6 +16,8 @@ from .Dist import Dist +lambda_await_thread_stop = False + class FlushingLogger: def __init__(self): @@ -21,7 +25,7 @@ def __init__(self): def __getattr__(self, name): method = getattr(self.logger, name) - if name in ['info', ]: + if name in ['info', 'warning', 'debug', 'error', 'critical', ]: def flushed_method(msg, *args, **kwargs): method(msg, *args, **kwargs) for h in self.logger.handlers: @@ -55,14 +59,11 @@ def _get_partitions(self): def ProcessAndMerge(self, mapper, reducer): """ Performs map-reduce using AWS Lambda. - Args: mapper (function): A function that runs the computational graph and returns a list of values. - reducer (function): A function that merges two lists that were returned by the mapper. - Returns: list: A list representing the values of action nodes returned after computation (Map-Reduce). @@ -70,12 +71,9 @@ def ProcessAndMerge(self, mapper, reducer): ranges = self.build_ranges() - def encode_object(object_to_encode) -> str: - return str(base64.b64encode(pickle.dumps(object_to_encode))) - # Make mapper and reducer transferable - pickled_mapper = encode_object(mapper) - pickled_reducer = encode_object(reducer) + pickled_mapper = AWS.encode_object(mapper) + pickled_reducer = AWS.encode_object(reducer) # Setup AWS clients s3_resource = boto3.resource('s3', region_name=self.region) @@ -89,14 +87,12 @@ def encode_object(object_to_encode) -> str: if not s3_output_bucket: self.logger.info('AWS backend not initialized!') return False - ssm_client.put_parameter( Name='ranges_num', Type='String', Value=str(len(ranges)), Overwrite=True ) - ssm_client.put_parameter( Name='reducer', Type='String', @@ -105,103 +101,29 @@ def encode_object(object_to_encode) -> str: ) """ - def invoke_root_lambda(root_range, script): - - trials = 3 - - client = boto3.client('lambda', region_name=self.region) - def process_response(response): - if response: - if 'Payload' in response: - payload = response['Payload'] - if isinstance(payload, botocore.response.StreamingBody): - payload_bytes = payload.read() - try: - response['Payload'] = json.loads(payload_bytes) - except json.JSONDecodeError: - response['Payload'] = {} - - return response - - payload = json.dumps({ - 'range': encode_object(root_range), - 'script': script, - 'start': str(root_range.start), - 'end': str(root_range.end), - 'filelist': str(root_range.filelist), - 'friend_info': encode_object(root_range.friend_info) - }) - - # Maybe here give info about number of invoked lambda for awsmonitor - #self.logger.info(f'New lambda - 31') - - while trials: - trials -= 1 - try: - #my_before_time = time.time() - response = client.invoke( - FunctionName='root_lambda', - InvocationType= 'Event', #'RequestResponse', - Payload=bytes(payload, encoding='utf8') - ) - #my_after_time = time.time() - #self.logger.info(f'Invocation time: {my_after_time - my_before_time}') - - if 'FunctionError' in response: - raise RuntimeError('Lambda runtime error') - - response = process_response(response) - - if 'Payload' in response: - response_payload = response['Payload'] - lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200 - if lambda_status != 200: - raise RuntimeError(f'Lambda status code {lambda_status}') - except botocore.exceptions.ClientError as error: - # AWS site errors - self.logger.warn(error['Error']['Message']) - except RuntimeError as error: - # Lambda runtime errors and lambdas that - # returned statusCode different than 200 - # (maybe also response processing errors) - self.logger.warn(error) - except Exception as error: - # All other errors - self.logger.warn(error) - else: - break - time.sleep(1) - - # Note: lambda finishes before s3 object is created - #self.logger.info('Lambda finished :)') - - return response - - self.logger.info(f'Before lambdas invoke. Number of lambdas: {len(ranges)}') processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] s3_resource.Bucket(processing_bucket).objects.all().delete() - invoke_begin = time.time() # Invoke workers with ranges and mapper - call_results = [] - """ - for lambda_num, root_range in enumerate(ranges): - call_result = invoke_root_lambda(lambda_client, root_range, pickled_mapper) - call_results.append(call_result) - self.logger.info(f'New lambda - {lambda_num}') - """ + global lambda_await_thread_stop + lambda_await_thread_stop = False with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: - executor.submit(AWS.process_execution, s3_client, processing_bucket, len(ranges), self.logger) - futures = [executor.submit(invoke_root_lambda, root_range, pickled_mapper) for root_range in ranges] - executor.shutdown(wait=True) + executor.submit(AWS.wait_for_all_lambdas, s3_client, processing_bucket, len(ranges), self.logger) + futures = [executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger) + for root_range in ranges] + call_results = [future.result() for future in futures] + if not all(call_results): + lambda_await_thread_stop = True + + if lambda_await_thread_stop: + raise Exception(f'Some lambdas failed after multiple retrials') - wait_begin = time.time() self.logger.info('All lambdas have been invoked') # while True: @@ -212,34 +134,30 @@ def process_response(response): # time.sleep(1) # result = s3.get_object(s3_output_bucket, 'out.pickle') + download_begin = time.time() - reduce_begin = time.time() # Get names of output files, download and reduce them - results = s3_client.list_objects_v2(Bucket=processing_bucket) - self.logger.info(f'Lambdas finished: {results["KeyCount"]}') - filenames = s3_client.list_objects_v2(Bucket=processing_bucket)['Contents'] - - def get_from_s3(filename): - s3_client = boto3.client('s3', region_name=self.region) - return pickle.loads(s3_client.get_object( - Bucket=processing_bucket, - Key=filename['Key'] - )['Body'].read()) - - files = [] + filenames = AWS.get_all_objects_from_s3_bucket(s3_client, processing_bucket) + self.logger.info(f'Lambdas finished: {len(filenames)}') + + tmp_files_directory = '/tmp' + AWS.remove_all_tmp_root_files(tmp_files_directory) + with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor: - futures = [executor.submit(get_from_s3, filename) for filename in filenames] - executor.shutdown(wait=True) + futures = [executor.submit(AWS.get_from_s3, filename, self.region, processing_bucket, tmp_files_directory) + for filename in filenames] files = [future.result() for future in futures] + reduce_begin = time.time() + to_process = files while len(to_process) > 1: even_index_files = [to_process[i] for i in range(len(to_process)) if i % 2 == 0] odd_index_files = [to_process[i] for i in range(len(to_process)) if i % 2 == 1] with concurrent.futures.ThreadPoolExecutor(len(to_process)) as executor: - futures = [executor.submit(reducer, pair[0], pair[1]) for pair in zip(even_index_files, odd_index_files)] - executor.shutdown(wait=True) + futures = [executor.submit(reducer, pair[0], pair[1]) for pair in + zip(even_index_files, odd_index_files)] to_process = [future.result() for future in futures] if len(even_index_files) > len(odd_index_files): @@ -254,28 +172,134 @@ def get_from_s3(filename): bench = ( len(ranges), - wait_begin-invoke_begin, - reduce_begin-wait_begin, - time.time()-reduce_begin + download_begin - invoke_begin, + reduce_begin - download_begin, + time.time() - reduce_begin ) print(bench) return reduction_result - # reduced_output = pickle.loads(result) - # return reduced_output def distribute_files(self, includes_list): pass @staticmethod - def process_execution(s3_client, processing_bucket, num_of_lambdas, logger): + def encode_object(object_to_encode) -> str: + return str(base64.b64encode(pickle.dumps(object_to_encode))) + + @staticmethod + def get_from_s3(filename, region, bucket_name, directory): + s3_client = boto3.client('s3', region_name=region) + local_filename = os.path.join(directory, filename['Key']) + s3_client.download_file(bucket_name, filename['Key'], local_filename) + + tfile = ROOT.TFile(local_filename, 'OPEN') + result = [] + # Get all objects from TFile + for key in tfile.GetListOfKeys(): + result.append(key.ReadObj()) + result[-1].SetDirectory(0) + tfile.Close() + + # Remove temporary root file + Path(local_filename).unlink() + + return result + + @staticmethod + def get_all_objects_from_s3_bucket(s3_client, bucket_name): + response = s3_client.list_objects_v2(Bucket=bucket_name) + result = response.get('Contents', []) + while response.get('IsTruncated'): + cont_token = response.get('NextContinuationToken') + response = s3_client.list_objects_v2(Bucket=bucket_name, ContinuationToken=cont_token) + result += response.get('Contents', []) + return result + + @staticmethod + def invoke_root_lambda(root_range, script, region, logger): + """ + Invoke root lambda. + Args: + root_range (Range): Range of data. + script (function): A function that performs an operation on + a range of data. + region (str): String containing AWS region. + logger (logging.Logger): + Returns: + bool: True if lambda invocation and execution was + successful. + """ + + trials = 3 + client = boto3.client('lambda', region_name=region) + + payload = json.dumps({ + 'range': AWS.encode_object(root_range), + 'script': script, + 'start': str(root_range.start), + 'end': str(root_range.end), + 'filelist': str(root_range.filelist), + 'friend_info': AWS.encode_object(root_range.friend_info) + }) + + # Maybe here give info about number of invoked lambda for awsmonitor + + while trials >= 0: + trials -= 1 + try: + response = client.invoke( + FunctionName='root_lambda', + InvocationType='RequestResponse', + Payload=bytes(payload, encoding='utf8') + ) + + try: + response['Payload'] = json.loads(response['Payload'].read()) + except Exception: + response['Payload'] = {} + + if 'FunctionError' in response or response['Payload'].get('statusCode') == 500: + try: + # Get error specification and remove additional quotas (side effect of serialization) + error_type = response['Payload']['errorType'][1:-1] + error_message = response['Payload']['errorMessage'][1:-1] + exception = getattr(sys.modules['builtins'], error_type) + msg = f"Lambda raised an exception: {error_message}" + except Exception: + exception = RuntimeError + msg = (f"Lambda raised an exception: (type={response['Payload']['errorType']}," + f"message={response['Payload']['errorMessage']})") + raise exception(msg) + + except botocore.exceptions.ClientError as error: + # AWS site errors + logger.warning(error['Error']['Message']) + except Exception as error: + # All other errors + logger.warning(str(error) + " (" + type(error).__name__ + ")") + else: + return True + time.sleep(1) + + # Note: lambda finishes before s3 object is created + return False + + @staticmethod + def remove_all_tmp_root_files(directory): + for file in os.listdir(directory): + if file.endswith('.root'): + Path(os.path.join(directory, file)).unlink() + + @staticmethod + def wait_for_all_lambdas(s3_client, processing_bucket, num_of_lambdas, logger): # Wait until all lambdas finished execution - while True: - results = s3_client.list_objects_v2(Bucket=processing_bucket) - logger.info(f'Lambdas finished: {results["KeyCount"]}') - if results['KeyCount'] == num_of_lambdas: + global lambda_await_thread_stop + while not lambda_await_thread_stop: + results = AWS.get_all_objects_from_s3_bucket(s3_client, processing_bucket) + logger.info(f'Lambdas finished: {len(results)}') + if len(results) == num_of_lambdas: break time.sleep(1) -