From 93f489ca8a50c777b54d537028adc5d460e38655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Wed, 25 Aug 2021 16:32:10 +0200 Subject: [PATCH 01/11] Initial commit for AWS --- bindings/experimental/distrdf/CMakeLists.txt | 2 + .../python/DistRDF/Backends/AWS/Backend.py | 292 ++++++++++++++++++ .../python/DistRDF/Backends/AWS/__init__.py | 21 ++ 3 files changed, 315 insertions(+) create mode 100644 bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py create mode 100644 bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py diff --git a/bindings/experimental/distrdf/CMakeLists.txt b/bindings/experimental/distrdf/CMakeLists.txt index 6aef05fb1b25e..a5564e8e5d72a 100644 --- a/bindings/experimental/distrdf/CMakeLists.txt +++ b/bindings/experimental/distrdf/CMakeLists.txt @@ -23,6 +23,8 @@ set(py_sources DistRDF/Backends/Utils.py DistRDF/Backends/Spark/__init__.py DistRDF/Backends/Spark/Backend.py + DistRDF/Backends/AWS/__init__.py + DistRDF/Backends/AWS/Backend.py ) foreach(val RANGE ${how_many_pythons}) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py new file mode 100644 index 0000000000000..0ae722f7dfe22 --- /dev/null +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -0,0 +1,292 @@ +from __future__ import print_function + +import base64 +import concurrent.futures +import json +import logging +import os +import sys +import time +from pathlib import Path + +import ROOT +import boto3 +import botocore +import cloudpickle as pickle + +lambda_await_thread_stop = False + +from DistRDF.Backends import Base +from DistRDF import DataFrame +from DistRDF import HeadNode + +class FlushingLogger: + def __init__(self): + self.logger = logging.getLogger() + + def __getattr__(self, name): + method = getattr(self.logger, name) + if name in ['info', 'warning', 'debug', 'error', 'critical', ]: + def flushed_method(msg, *args, **kwargs): + method(msg, *args, **kwargs) + for h in self.logger.handlers: + h.flush() + + return flushed_method + else: + return method + + +class AWS(Base.BaseBackend): + """ + Backend that executes the computational graph using using AWS Lambda + for distributed execution. + """ + + MIN_NPARTITIONS = 2 + + def __init__(self, config={}): + """ + Config for AWS is same as in Dist backend, + more support will be added in future. + """ + super(AWS, self).__init__(config) + self.logger = FlushingLogger() if logging.root.level >= logging.INFO else logging.getLogger() + self.npartitions = self._get_partitions() + self.region = config.get('region') or 'us-east-1' + + def _get_partitions(self): + return int(self.npartitions or AWS.MIN_NPARTITIONS) + + def make_dataframe(self, *args, **kwargs): + """ + Creates an instance of distributed RDataFrame that can send computations + to a Spark cluster. + """ + # Set the number of partitions for this dataframe, one of the following: + # 1. User-supplied `npartitions` optional argument + # 2. An educated guess according to the backend, using the backend's + # `optimize_npartitions` function + # 3. Set `npartitions` to 2 + npartitions = kwargs.pop("npartitions", self.npartitions) + headnode = HeadNode.get_headnode(npartitions, *args) + return DataFrame.RDataFrame(headnode, self) + + + def ProcessAndMerge(self, ranges, mapper, reducer): + """ + Performs map-reduce using AWS Lambda. + Args: + mapper (function): A function that runs the computational graph + and returns a list of values. + reducer (function): A function that merges two lists that were + returned by the mapper. + Returns: + list: A list representing the values of action nodes returned + after computation (Map-Reduce). + """ + + # Make mapper and reducer transferable + pickled_mapper = AWS.encode_object(mapper) + pickled_reducer = AWS.encode_object(reducer) + + # Setup AWS clients + s3_resource = boto3.resource('s3', region_name=self.region) + s3_client = boto3.client('s3', region_name=self.region) + lambda_client = boto3.client('lambda', region_name=self.region) + ssm_client = boto3.client('ssm', region_name=self.region) + + self.logger.info(f'Before lambdas invoke. Number of lambdas: {len(ranges)}') + + processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] + + s3_resource.Bucket(processing_bucket).objects.all().delete() + + invoke_begin = time.time() + # Invoke workers with ranges and mapper + + global lambda_await_thread_stop + lambda_await_thread_stop = False + + with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: + executor.submit(AWS.wait_for_all_lambdas, s3_client, processing_bucket, len(ranges), self.logger) + futures = [executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger) + for root_range in ranges] + call_results = [future.result() for future in futures] + if not all(call_results): + lambda_await_thread_stop = True + + if lambda_await_thread_stop: + raise Exception(f'Some lambdas failed after multiple retrials') + + self.logger.info('All lambdas have been invoked') + + download_begin = time.time() + + # Get names of output files, download and reduce them + filenames = AWS.get_all_objects_from_s3_bucket(s3_client, processing_bucket) + self.logger.info(f'Lambdas finished: {len(filenames)}') + + tmp_files_directory = '/tmp' + AWS.remove_all_tmp_root_files(tmp_files_directory) + + with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor: + futures = [executor.submit(AWS.get_from_s3, filename, self.region, processing_bucket, tmp_files_directory) + for filename in filenames] + files = [future.result() for future in futures] + + reduce_begin = time.time() + + to_process = files + while len(to_process) > 1: + even_index_files = [to_process[i] for i in range(len(to_process)) if i % 2 == 0] + odd_index_files = [to_process[i] for i in range(len(to_process)) if i % 2 == 1] + + with concurrent.futures.ThreadPoolExecutor(len(to_process)) as executor: + futures = [executor.submit(reducer, pair[0], pair[1]) for pair in + zip(even_index_files, odd_index_files)] + to_process = [future.result() for future in futures] + + if len(even_index_files) > len(odd_index_files): + to_process.append(even_index_files[-1]) + elif len(even_index_files) < len(odd_index_files): + to_process.append(odd_index_files[-1]) + + reduction_result = to_process[0] + + # Clean up intermediate objects after we're done + s3_resource.Bucket(processing_bucket).objects.all().delete() + + bench = ( + len(ranges), + download_begin - invoke_begin, + reduce_begin - download_begin, + time.time() - reduce_begin + ) + + print(bench) + + return reduction_result + + def distribute_files(self, includes_list): + pass + + @staticmethod + def encode_object(object_to_encode) -> str: + return str(base64.b64encode(pickle.dumps(object_to_encode))) + + @staticmethod + def get_from_s3(filename, region, bucket_name, directory): + s3_client = boto3.client('s3', region_name=region) + local_filename = os.path.join(directory, filename['Key']) + s3_client.download_file(bucket_name, filename['Key'], local_filename) + + tfile = ROOT.TFile(local_filename, 'OPEN') + result = [] + + # Get all objects from TFile + for key in tfile.GetListOfKeys(): + result.append(key.ReadObj()) + result[-1].SetDirectory(0) + tfile.Close() + + # Remove temporary root file + Path(local_filename).unlink() + + return result + + @staticmethod + def get_all_objects_from_s3_bucket(s3_client, bucket_name): + response = s3_client.list_objects_v2(Bucket=bucket_name) + result = response.get('Contents', []) + while response.get('IsTruncated'): + cont_token = response.get('NextContinuationToken') + response = s3_client.list_objects_v2(Bucket=bucket_name, ContinuationToken=cont_token) + result += response.get('Contents', []) + return result + + @staticmethod + def invoke_root_lambda(root_range, script, region, logger): + """ + Invoke root lambda. + Args: + root_range (Range): Range of data. + script (function): A function that performs an operation on + a range of data. + region (str): String containing AWS region. + logger (logging.Logger): + Returns: + bool: True if lambda invocation and execution was + successful. + """ + + trials = 3 + client = boto3.client('lambda', region_name=region) + + payload = json.dumps({ + 'range': AWS.encode_object(root_range), + 'script': script, + 'start': str(root_range.start), + 'end': str(root_range.end), + 'filelist': str(root_range.filelist), + 'friend_info': AWS.encode_object(root_range.friend_info) + }) + + # Maybe here give info about number of invoked lambda for awsmonitor + + while trials >= 0: + trials -= 1 + try: + response = client.invoke( + FunctionName='root_lambda', + InvocationType='RequestResponse', + Payload=bytes(payload, encoding='utf8') + ) + + try: + response['Payload'] = json.loads(response['Payload'].read()) + except: + response['Payload'] = {} + + if 'FunctionError' in response or response['Payload'].get('statusCode') == 500: + try: + # Get error specification and remove additional quotas (side effect of serialization) + error_type = response['Payload']['errorType'][1:-1] + error_message = response['Payload']['errorMessage'][1:-1] + exception = getattr(sys.modules['builtins'], error_type) + msg = f"Lambda raised an exception: {error_message}" + except Exception: + exception = RuntimeError + msg = (f"Lambda raised an exception: (type={response['Payload']['errorType']}," + f"message={response['Payload']['errorMessage']})") + raise exception(msg) + + except botocore.exceptions.ClientError as error: + # AWS site errors + logger.warning(error['Error']['Message']) + except Exception as error: + # All other errors + logger.warning(str(error) + " (" + type(error).__name__ + ")") + else: + return True + time.sleep(1) + + # Note: lambda finishes before s3 object is created + return False + + @staticmethod + def remove_all_tmp_root_files(directory): + for file in os.listdir(directory): + if file.endswith('.root'): + Path(os.path.join(directory, file)).unlink() + + @staticmethod + def wait_for_all_lambdas(s3_client, processing_bucket, num_of_lambdas, logger): + # Wait until all lambdas finished execution + global lambda_await_thread_stop + while not lambda_await_thread_stop: + results = AWS.get_all_objects_from_s3_bucket(s3_client, processing_bucket) + logger.info(f'Lambdas finished: {len(results)}') + if len(results) == num_of_lambdas: + break + time.sleep(1) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py new file mode 100644 index 0000000000000..767c5c2e5ca5c --- /dev/null +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py @@ -0,0 +1,21 @@ +## @author Vincenzo Eduardo Padulano +# @author Enric Tejedor +# @date 2021-02 + +################################################################################ +# Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. # +# All rights reserved. # +# # +# For the licensing terms see $ROOTSYS/LICENSE. # +# For the list of contributors see $ROOTSYS/README/CREDITS. # +################################################################################ + +def RDataFrame(*args, **kwargs): + """ + Create an RDataFrame object that can run computations on a Spark cluster. + """ + + from DistRDF.Backends.AWS import Backend + aws = Backend.AWSBackend() + + return aws.make_dataframe(*args, **kwargs) From 364fc4b8d6aae485c4d88b4de78e2b94d984eccc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Wed, 25 Aug 2021 18:30:59 +0200 Subject: [PATCH 02/11] update --- .../distrdf/python/DistRDF/Backends/AWS/Backend.py | 14 ++++++++++++++ .../python/DistRDF/Backends/AWS/__init__.py | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index 0ae722f7dfe22..5e7918c3c8946 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -290,3 +290,17 @@ def wait_for_all_lambdas(s3_client, processing_bucket, num_of_lambdas, logger): if len(results) == num_of_lambdas: break time.sleep(1) + + def distribute_unique_paths(self, paths): + """ + Spark supports sending files to the executors via the + `SparkContext.addFile` method. This method receives in input the path + to the file (relative to the path of the current python session). The + file is initially added to the Spark driver and then sent to the + workers when they are initialized. + + Args: + paths (set): A set of paths to files that should be sent to the + distributed workers. + """ + pass diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py index 767c5c2e5ca5c..8a0bea05ee256 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/__init__.py @@ -16,6 +16,6 @@ def RDataFrame(*args, **kwargs): """ from DistRDF.Backends.AWS import Backend - aws = Backend.AWSBackend() + aws = Backend.AWS() return aws.make_dataframe(*args, **kwargs) From a3201e10b26b6bfa8fc3f6ee11f54c5c36b18296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Fri, 27 Aug 2021 15:18:56 +0200 Subject: [PATCH 03/11] another small adj --- .../distrdf/python/DistRDF/Backends/AWS/Backend.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index 5e7918c3c8946..aae299d038bc0 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -44,13 +44,14 @@ class AWS(Base.BaseBackend): """ MIN_NPARTITIONS = 2 + npartitions=32 def __init__(self, config={}): """ Config for AWS is same as in Dist backend, more support will be added in future. """ - super(AWS, self).__init__(config) + super(AWS, self).__init__() self.logger = FlushingLogger() if logging.root.level >= logging.INFO else logging.getLogger() self.npartitions = self._get_partitions() self.region = config.get('region') or 'us-east-1' From f58609de05a1025ea97d8e62f0b685b850969555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Fri, 27 Aug 2021 18:36:26 +0200 Subject: [PATCH 04/11] more --- .../distrdf/python/DistRDF/Backends/AWS/Backend.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index aae299d038bc0..8825d72bfe56e 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -1,3 +1,6 @@ +from DistRDF import DataFrame +from DistRDF import HeadNode +from DistRDF.Backends import Base from __future__ import print_function import base64 @@ -16,9 +19,6 @@ lambda_await_thread_stop = False -from DistRDF.Backends import Base -from DistRDF import DataFrame -from DistRDF import HeadNode class FlushingLogger: def __init__(self): @@ -44,7 +44,7 @@ class AWS(Base.BaseBackend): """ MIN_NPARTITIONS = 2 - npartitions=32 + npartitions = 32 def __init__(self, config={}): """ @@ -73,7 +73,6 @@ def make_dataframe(self, *args, **kwargs): headnode = HeadNode.get_headnode(npartitions, *args) return DataFrame.RDataFrame(headnode, self) - def ProcessAndMerge(self, ranges, mapper, reducer): """ Performs map-reduce using AWS Lambda. From ce904c9f9a1ecbaf846bd6443e2b25ab2d23e15b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Sun, 29 Aug 2021 20:18:44 +0200 Subject: [PATCH 05/11] working version --- .../python/DistRDF/Backends/AWS/Backend.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index 8825d72bfe56e..d8745e9862666 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -1,7 +1,8 @@ +from __future__ import print_function + from DistRDF import DataFrame -from DistRDF import HeadNode from DistRDF.Backends import Base -from __future__ import print_function +from DistRDF import Node import base64 import concurrent.futures @@ -43,7 +44,7 @@ class AWS(Base.BaseBackend): for distributed execution. """ - MIN_NPARTITIONS = 2 + MIN_NPARTITIONS = 8 npartitions = 32 def __init__(self, config={}): @@ -70,7 +71,7 @@ def make_dataframe(self, *args, **kwargs): # `optimize_npartitions` function # 3. Set `npartitions` to 2 npartitions = kwargs.pop("npartitions", self.npartitions) - headnode = HeadNode.get_headnode(npartitions, *args) + headnode = Node.HeadNode(*args) return DataFrame.RDataFrame(headnode, self) def ProcessAndMerge(self, ranges, mapper, reducer): @@ -181,14 +182,16 @@ def get_from_s3(filename, region, bucket_name, directory): local_filename = os.path.join(directory, filename['Key']) s3_client.download_file(bucket_name, filename['Key'], local_filename) - tfile = ROOT.TFile(local_filename, 'OPEN') - result = [] + # tfile = ROOT.TFile(local_filename, 'OPEN') + # result = [] # Get all objects from TFile - for key in tfile.GetListOfKeys(): - result.append(key.ReadObj()) - result[-1].SetDirectory(0) - tfile.Close() + # for key in tfile.GetListOfKeys(): + # result.append(key.ReadObj()) + # result[-1].SetDirectory(0) + # tfile.Close() + with open(local_filename, 'rb') as pickle_file: + result = pickle.load(pickle_file) # Remove temporary root file Path(local_filename).unlink() @@ -220,7 +223,8 @@ def invoke_root_lambda(root_range, script, region, logger): successful. """ - trials = 3 + trials = 1 + # client = boto3.client('lambda', region_name=region, config=Config(connect_timeout=60, read_timeout=900, retries={'max_attempts': 1})) client = boto3.client('lambda', region_name=region) payload = json.dumps({ @@ -234,8 +238,8 @@ def invoke_root_lambda(root_range, script, region, logger): # Maybe here give info about number of invoked lambda for awsmonitor - while trials >= 0: - trials -= 1 + while trials == 1: + trials = 0 try: response = client.invoke( FunctionName='root_lambda', From 7d316397e72ebfcb074b0cc57803f41e7f8680f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Mon, 30 Aug 2021 12:36:00 +0200 Subject: [PATCH 06/11] pass the certs --- .../python/DistRDF/Backends/AWS/Backend.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index d8745e9862666..060d3ddad911f 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -1,9 +1,5 @@ from __future__ import print_function -from DistRDF import DataFrame -from DistRDF.Backends import Base -from DistRDF import Node - import base64 import concurrent.futures import json @@ -13,10 +9,12 @@ import time from pathlib import Path -import ROOT import boto3 import botocore import cloudpickle as pickle +from DistRDF import DataFrame +from DistRDF import Node +from DistRDF.Backends import Base lambda_await_thread_stop = False @@ -90,6 +88,8 @@ def ProcessAndMerge(self, ranges, mapper, reducer): # Make mapper and reducer transferable pickled_mapper = AWS.encode_object(mapper) pickled_reducer = AWS.encode_object(reducer) + f = open("/tmp/certs", "r") + certs = f.read() # Setup AWS clients s3_resource = boto3.resource('s3', region_name=self.region) @@ -111,8 +111,9 @@ def ProcessAndMerge(self, ranges, mapper, reducer): with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: executor.submit(AWS.wait_for_all_lambdas, s3_client, processing_bucket, len(ranges), self.logger) - futures = [executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger) - for root_range in ranges] + futures = [ + executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger, certs) + for root_range in ranges] call_results = [future.result() for future in futures] if not all(call_results): lambda_await_thread_stop = True @@ -209,7 +210,7 @@ def get_all_objects_from_s3_bucket(s3_client, bucket_name): return result @staticmethod - def invoke_root_lambda(root_range, script, region, logger): + def invoke_root_lambda(root_range, script, region, logger, certs): """ Invoke root lambda. Args: @@ -233,7 +234,8 @@ def invoke_root_lambda(root_range, script, region, logger): 'start': str(root_range.start), 'end': str(root_range.end), 'filelist': str(root_range.filelist), - 'friend_info': AWS.encode_object(root_range.friend_info) + 'friend_info': AWS.encode_object(root_range.friend_info), + 'certs': str(base64.b64encode(certs)) }) # Maybe here give info about number of invoked lambda for awsmonitor From ac9e8f8fa1a9f16dcfd28460a7b0db89215c07b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Wed, 15 Sep 2021 16:25:41 +0200 Subject: [PATCH 07/11] update to newest --- .../python/DistRDF/Backends/AWS/Backend.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index 060d3ddad911f..f43ab2c477f98 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -43,7 +43,6 @@ class AWS(Base.BaseBackend): """ MIN_NPARTITIONS = 8 - npartitions = 32 def __init__(self, config={}): """ @@ -88,7 +87,7 @@ def ProcessAndMerge(self, ranges, mapper, reducer): # Make mapper and reducer transferable pickled_mapper = AWS.encode_object(mapper) pickled_reducer = AWS.encode_object(reducer) - f = open("/tmp/certs", "r") + f = open("/tmp/krb", "rb") certs = f.read() # Setup AWS clients @@ -231,11 +230,7 @@ def invoke_root_lambda(root_range, script, region, logger, certs): payload = json.dumps({ 'range': AWS.encode_object(root_range), 'script': script, - 'start': str(root_range.start), - 'end': str(root_range.end), - 'filelist': str(root_range.filelist), - 'friend_info': AWS.encode_object(root_range.friend_info), - 'certs': str(base64.b64encode(certs)) + 'cert': str(base64.b64encode(certs)) }) # Maybe here give info about number of invoked lambda for awsmonitor @@ -297,6 +292,16 @@ def wait_for_all_lambdas(s3_client, processing_bucket, num_of_lambdas, logger): break time.sleep(1) + def optimize_npartitions(self): + numex = self.sc.getConf().get("spark.executor.instances") + numcoresperex = self.sc.getConf().get("spark.executor.cores") + + if numex is not None: + if numcoresperex is not None: + return int(numex) * int(numcoresperex) + return int(numex) + else: + return self.MIN_NPARTITIONS def distribute_unique_paths(self, paths): """ Spark supports sending files to the executors via the From 9681266a549934ca0925456c6bdddb1de5414db8 Mon Sep 17 00:00:00 2001 From: Kamilbur Date: Thu, 16 Sep 2021 13:00:53 +0200 Subject: [PATCH 08/11] Headers --- .../python/DistRDF/Backends/AWS/Backend.py | 62 ++++++++----------- .../DistRDF/Backends/AWS/flushing_logger.py | 19 ++++++ 2 files changed, 44 insertions(+), 37 deletions(-) create mode 100644 bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index f43ab2c477f98..aab0c143984ed 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -15,25 +15,10 @@ from DistRDF import DataFrame from DistRDF import Node from DistRDF.Backends import Base +from .flushing_logger import FlushingLogger -lambda_await_thread_stop = False - - -class FlushingLogger: - def __init__(self): - self.logger = logging.getLogger() - def __getattr__(self, name): - method = getattr(self.logger, name) - if name in ['info', 'warning', 'debug', 'error', 'critical', ]: - def flushed_method(msg, *args, **kwargs): - method(msg, *args, **kwargs) - for h in self.logger.handlers: - h.flush() - - return flushed_method - else: - return method +lambda_await_thread_stop = False class AWS(Base.BaseBackend): @@ -43,6 +28,7 @@ class AWS(Base.BaseBackend): """ MIN_NPARTITIONS = 8 + npartitions = 32 def __init__(self, config={}): """ @@ -53,6 +39,7 @@ def __init__(self, config={}): self.logger = FlushingLogger() if logging.root.level >= logging.INFO else logging.getLogger() self.npartitions = self._get_partitions() self.region = config.get('region') or 'us-east-1' + self.paths = [] def _get_partitions(self): return int(self.npartitions or AWS.MIN_NPARTITIONS) @@ -84,22 +71,22 @@ def ProcessAndMerge(self, ranges, mapper, reducer): after computation (Map-Reduce). """ - # Make mapper and reducer transferable + # Make mapper, reducer and headers transferable pickled_mapper = AWS.encode_object(mapper) pickled_reducer = AWS.encode_object(reducer) - f = open("/tmp/krb", "rb") + pickled_headers = AWS.encode_object(self.paths) + f = open("/tmp/certs", "rb") certs = f.read() # Setup AWS clients s3_resource = boto3.resource('s3', region_name=self.region) s3_client = boto3.client('s3', region_name=self.region) - lambda_client = boto3.client('lambda', region_name=self.region) ssm_client = boto3.client('ssm', region_name=self.region) self.logger.info(f'Before lambdas invoke. Number of lambdas: {len(ranges)}') processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] - + s3_resource.Bucket(processing_bucket).objects.all().delete() invoke_begin = time.time() @@ -111,7 +98,7 @@ def ProcessAndMerge(self, ranges, mapper, reducer): with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: executor.submit(AWS.wait_for_all_lambdas, s3_client, processing_bucket, len(ranges), self.logger) futures = [ - executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger, certs) + executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger, certs, pickled_headers) for root_range in ranges] call_results = [future.result() for future in futures] if not all(call_results): @@ -120,13 +107,10 @@ def ProcessAndMerge(self, ranges, mapper, reducer): if lambda_await_thread_stop: raise Exception(f'Some lambdas failed after multiple retrials') - self.logger.info('All lambdas have been invoked') - download_begin = time.time() # Get names of output files, download and reduce them filenames = AWS.get_all_objects_from_s3_bucket(s3_client, processing_bucket) - self.logger.info(f'Lambdas finished: {len(filenames)}') tmp_files_directory = '/tmp' AWS.remove_all_tmp_root_files(tmp_files_directory) @@ -169,6 +153,8 @@ def ProcessAndMerge(self, ranges, mapper, reducer): return reduction_result + + def distribute_files(self, includes_list): pass @@ -209,7 +195,7 @@ def get_all_objects_from_s3_bucket(s3_client, bucket_name): return result @staticmethod - def invoke_root_lambda(root_range, script, region, logger, certs): + def invoke_root_lambda(root_range, script, region, logger, certs, headers): """ Invoke root lambda. Args: @@ -230,7 +216,12 @@ def invoke_root_lambda(root_range, script, region, logger, certs): payload = json.dumps({ 'range': AWS.encode_object(root_range), 'script': script, - 'cert': str(base64.b64encode(certs)) + 'start': str(root_range.start), + 'end': str(root_range.end), + 'filelist': str(root_range.filelist), + 'friend_info': AWS.encode_object(root_range.friend_info), + 'cert': str(base64.b64encode(certs)), + 'headers': headers }) # Maybe here give info about number of invoked lambda for awsmonitor @@ -292,16 +283,6 @@ def wait_for_all_lambdas(s3_client, processing_bucket, num_of_lambdas, logger): break time.sleep(1) - def optimize_npartitions(self): - numex = self.sc.getConf().get("spark.executor.instances") - numcoresperex = self.sc.getConf().get("spark.executor.cores") - - if numex is not None: - if numcoresperex is not None: - return int(numex) * int(numcoresperex) - return int(numex) - else: - return self.MIN_NPARTITIONS def distribute_unique_paths(self, paths): """ Spark supports sending files to the executors via the @@ -315,3 +296,10 @@ def distribute_unique_paths(self, paths): distributed workers. """ pass + + + def add_header(self, path: str): + with open(path, 'r') as f: + contents = f.read() + self.paths.append((path, contents)) + diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py new file mode 100644 index 0000000000000..18d87583d8d16 --- /dev/null +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py @@ -0,0 +1,19 @@ +import logging + + +class FlushingLogger: + def __init__(self): + self.logger = logging.getLogger() + + def __getattr__(self, name): + method = getattr(self.logger, name) + if name in ['info', 'warning', 'debug', 'error', 'critical', ]: + def flushed_method(msg, *args, **kwargs): + method(msg, *args, **kwargs) + for h in self.logger.handlers: + h.flush() + + return flushed_method + else: + return method + From 712b2402704b6844266f96af9d5c78f9f645ca0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 16 Sep 2021 15:39:40 +0200 Subject: [PATCH 09/11] next level thing --- .../python/DistRDF/Backends/AWS/Backend.py | 20 +++++++++---------- .../{flushing_logger.py => FlushingLogger.py} | 1 - 2 files changed, 10 insertions(+), 11 deletions(-) rename bindings/experimental/distrdf/python/DistRDF/Backends/AWS/{flushing_logger.py => FlushingLogger.py} (99%) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index aab0c143984ed..78be36bbbf638 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -15,8 +15,8 @@ from DistRDF import DataFrame from DistRDF import Node from DistRDF.Backends import Base -from .flushing_logger import FlushingLogger +from .FlushingLogger import FlushingLogger lambda_await_thread_stop = False @@ -55,7 +55,7 @@ def make_dataframe(self, *args, **kwargs): # `optimize_npartitions` function # 3. Set `npartitions` to 2 npartitions = kwargs.pop("npartitions", self.npartitions) - headnode = Node.HeadNode(*args) + headnode = Node.HeadNode(npartitions, *args) return DataFrame.RDataFrame(headnode, self) def ProcessAndMerge(self, ranges, mapper, reducer): @@ -86,7 +86,7 @@ def ProcessAndMerge(self, ranges, mapper, reducer): self.logger.info(f'Before lambdas invoke. Number of lambdas: {len(ranges)}') processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] - + s3_resource.Bucket(processing_bucket).objects.all().delete() invoke_begin = time.time() @@ -98,7 +98,10 @@ def ProcessAndMerge(self, ranges, mapper, reducer): with concurrent.futures.ThreadPoolExecutor(max_workers=len(ranges)) as executor: executor.submit(AWS.wait_for_all_lambdas, s3_client, processing_bucket, len(ranges), self.logger) futures = [ - executor.submit(AWS.invoke_root_lambda, root_range, pickled_mapper, self.region, self.logger, certs, pickled_headers) + executor.submit(AWS.invoke_root_lambda, + root_range, pickled_mapper, + self.region, self.logger, + certs, pickled_headers) for root_range in ranges] call_results = [future.result() for future in futures] if not all(call_results): @@ -116,8 +119,9 @@ def ProcessAndMerge(self, ranges, mapper, reducer): AWS.remove_all_tmp_root_files(tmp_files_directory) with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor: - futures = [executor.submit(AWS.get_from_s3, filename, self.region, processing_bucket, tmp_files_directory) - for filename in filenames] + futures = [executor.submit( + AWS.get_from_s3, filename, self.region, processing_bucket, tmp_files_directory) + for filename in filenames] files = [future.result() for future in futures] reduce_begin = time.time() @@ -153,8 +157,6 @@ def ProcessAndMerge(self, ranges, mapper, reducer): return reduction_result - - def distribute_files(self, includes_list): pass @@ -297,9 +299,7 @@ def distribute_unique_paths(self, paths): """ pass - def add_header(self, path: str): with open(path, 'r') as f: contents = f.read() self.paths.append((path, contents)) - diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/FlushingLogger.py similarity index 99% rename from bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py rename to bindings/experimental/distrdf/python/DistRDF/Backends/AWS/FlushingLogger.py index 18d87583d8d16..b3785ed6b82c9 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/flushing_logger.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/FlushingLogger.py @@ -16,4 +16,3 @@ def flushed_method(msg, *args, **kwargs): return flushed_method else: return method - From ebbb5c407f573844808cb7527d18a9c1b6191f50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 16 Sep 2021 15:48:56 +0200 Subject: [PATCH 10/11] correct for old version --- .../distrdf/python/DistRDF/Backends/AWS/Backend.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index 78be36bbbf638..1cb62234c186b 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -54,8 +54,7 @@ def make_dataframe(self, *args, **kwargs): # 2. An educated guess according to the backend, using the backend's # `optimize_npartitions` function # 3. Set `npartitions` to 2 - npartitions = kwargs.pop("npartitions", self.npartitions) - headnode = Node.HeadNode(npartitions, *args) + headnode = Node.HeadNode(*args) return DataFrame.RDataFrame(headnode, self) def ProcessAndMerge(self, ranges, mapper, reducer): From b1854ba32c11d31060a547fd7ec9b4f1bdbd46f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 16 Sep 2021 16:15:43 +0200 Subject: [PATCH 11/11] that should fix --- .../experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py index 1cb62234c186b..23ca02ea13c27 100644 --- a/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py +++ b/bindings/experimental/distrdf/python/DistRDF/Backends/AWS/Backend.py @@ -55,7 +55,7 @@ def make_dataframe(self, *args, **kwargs): # `optimize_npartitions` function # 3. Set `npartitions` to 2 headnode = Node.HeadNode(*args) - return DataFrame.RDataFrame(headnode, self) + return DataFrame.RDataFrame(headnode, self, **kwargs) def ProcessAndMerge(self, ranges, mapper, reducer): """