diff --git a/README.md b/README.md index 246dcc1..d833ab7 100644 --- a/README.md +++ b/README.md @@ -7,25 +7,29 @@ You can use it for whatever you want! This code is an example of how to use AWS distributed infrastructure for running anything Dockerized. The configuration of the AWS resources is done using boto3 and the AWS CLI. -The worker is written in Python and is encapsulated in a Docker container. There are four AWS components that are minimally needed to run distributed jobs: +The worker is written in Python and is encapsulated in a Docker container. +There are four AWS components that are minimally needed to run distributed jobs: + 1. An SQS queue 2. An ECS cluster 3. An S3 bucket 4. A spot fleet of EC2 instances + All of them can be managed individually through the AWS Management Console. However, this code helps to get started quickly and run a job autonomously if all the configuration is correct. -The code prepares the infrastructure to run a distributed job. +The code runs a script that links all these components and prepares the infrastructure to run a distributed job. When the job is completed, the code is also able to stop resources and clean up components. It also adds logging and alarms via CloudWatch, helping the user troubleshoot runs and destroy stuck machines. ## Running the code ### Step 1 -Edit the `config.py` file with all the relevant information for your job. Then, start creating the basic AWS resources by running the following script: +Edit the config.py file with all the relevant information for your job. +Then, start creating the basic AWS resources by running the following script: - $ python run.py setup + $ python3 run.py setup This script initializes the resources in AWS. Notice that the docker registry is built separately and you can modify the worker code to build your own. @@ -34,9 +38,10 @@ Any time you modify the worker code, you need to update the docker registry usin ### Step 2 After the first script runs successfully, the job can now be submitted to with the following command: - $ python run.py submitJob files/exampleJob.json + $ python3 run.py submitJob files/exampleJob.json -Running the script uploads the tasks that are configured in the json file. You have to customize the `exampleJob.json` file with information that makes sense for your project. +Running the script uploads the tasks that are configured in the json file. +You have to customize the exampleJob.json file with information that make sense for your project. You'll want to figure out which information is generic and which is the information that makes each job unique. ### Step 3 @@ -45,14 +50,14 @@ This code starts a fleet of spot EC2 instances which will run the worker code. The worker code is encapsulated in Docker containers, and the code uses ECS services to inject them in EC2. All this is automated with the following command: - $ python run.py startCluster files/exampleFleet.json + $ python3 run.py startCluster files/exampleFleet.json After the cluster is ready, the code informs you that everything is setup, and saves the spot fleet identifier in a file for further reference. ### Step 4 When the cluster is up and running, you can monitor progress using the following command: - $ python run.py monitor files/APP_NAMESpotFleetRequestId.json + $ python3 run.py monitor files/APP_NAMESpotFleetRequestId.json The file APP_NAMESpotFleetRequestId.json is created after the cluster is setup in step 3. It is important to keep this monitor running if you want to automatically shutdown computing resources when there are no more tasks in the queue (recommended). diff --git a/run.py b/run.py index 32dc36f..c946061 100644 --- a/run.py +++ b/run.py @@ -60,14 +60,45 @@ # AUXILIARY FUNCTIONS ################################# -def get_aws_credentials(AWS_PROFILE): - session = boto3.Session(profile_name=AWS_PROFILE) - credentials = session.get_credentials() - return credentials.access_key, credentials.secret_key - def generate_task_definition(AWS_PROFILE): + taskRoleArn = False task_definition = TASK_DEFINITION.copy() - key, secret = get_aws_credentials(AWS_PROFILE) + + config = configparser.ConfigParser() + config.read(f"{os.environ['HOME']}/.aws/config") + + if config.has_section(AWS_PROFILE): + profile_name = AWS_PROFILE + elif config.has_section(f'profile {AWS_PROFILE}'): + profile_name = f'profile {AWS_PROFILE}' + else: + print ('Problem handling profile') + + if config.has_option(profile_name, 'role_arn'): + print ("Using role for credentials", config[profile_name]['role_arn']) + taskRoleArn = config[profile_name]['role_arn'] + else: + if config.has_option(profile_name, 'source_profile'): + creds = configparser.ConfigParser() + creds.read(f"{os.environ['HOME']}/.aws/credentials") + source_profile = config[profile_name]['source_profile'] + aws_access_key = creds[source_profile]['aws_access_key_id'] + aws_secret_key = creds[source_profile]['aws_secret_access_key'] + elif config.has_option(profile_name, 'aws_access_key_id'): + aws_access_key = config[profile_name]['aws_access_key_id'] + aws_secret_key = config[profile_name]['aws_secret_access_key'] + else: + print ("Problem getting credentials") + task_definition['containerDefinitions'][0]['environment'] += [ + { + "name": "AWS_ACCESS_KEY_ID", + "value": aws_access_key + }, + { + "name": "AWS_SECRET_ACCESS_KEY", + "value": aws_secret_key + }] + sqs = boto3.client('sqs') queue_name = get_queue_url(sqs) task_definition['containerDefinitions'][0]['environment'] += [ @@ -79,14 +110,6 @@ def generate_task_definition(AWS_PROFILE): 'name': 'SQS_QUEUE_URL', 'value': queue_name }, - { - "name": "AWS_ACCESS_KEY_ID", - "value": key - }, - { - "name": "AWS_SECRET_ACCESS_KEY", - "value": secret - }, { "name": "AWS_BUCKET", "value": AWS_BUCKET @@ -122,13 +145,18 @@ def generate_task_definition(AWS_PROFILE): { "name": "NECESSARY_STRING", "value": NECESSARY_STRING - } + } ] - return task_definition + return task_definition, taskRoleArn def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE): - task_definition = generate_task_definition(AWS_PROFILE) - ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions']) + task_definition, taskRoleArn = generate_task_definition(AWS_PROFILE) + if not taskRoleArn: + ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions']) + elif taskRoleArn: + ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions'],taskRoleArn=taskRoleArn) + else: + print('Mistake in handling role for Task Definition.') print('Task definition registered') def get_or_create_cluster(ecs): @@ -187,14 +215,14 @@ def killdeadAlarms(fleetId,monitorapp,ec2,cloud): todel.append(eachevent['EventInformation']['InstanceId']) existing_alarms = [x['AlarmName'] for x in cloud.describe_alarms(AlarmNamePrefix=monitorapp)['MetricAlarms']] - + for eachmachine in todel: monitorname = monitorapp+'_'+eachmachine if monitorname in existing_alarms: cloud.delete_alarms(AlarmNames=[monitorname]) print('Deleted', monitorname, 'if it existed') time.sleep(3) - + print('Old alarms deleted') def generateECSconfig(ECS_CLUSTER,APP_NAME,AWS_BUCKET,s3client): @@ -240,7 +268,7 @@ def removequeue(queueName): for eachUrl in queueoutput["QueueUrls"]: if eachUrl.split('/')[-1] == queueName: queueUrl=eachUrl - + sqs.delete_queue(QueueUrl=queueUrl) def deregistertask(taskName, ecs): @@ -270,7 +298,7 @@ def downscaleSpotFleet(queue, spotFleetID, ec2, manual=False): def export_logs(logs, loggroupId, starttime, bucketId): result = logs.create_export_task(taskName = loggroupId, logGroupName = loggroupId, fromTime = int(starttime), to = int(time.time()*1000), destination = bucketId, destinationPrefix = 'exportedlogs/'+loggroupId) - + logExportId = result['taskId'] while True: @@ -293,7 +321,7 @@ def __init__(self,name=None): self.queue = self.sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME) else: self.queue = self.sqs.get_queue_by_name(QueueName=name) - self.inProcess = -1 + self.inProcess = -1 self.pending = -1 def scheduleBatch(self, data): @@ -362,7 +390,7 @@ def submitJob(): print('Job submitted. Check your queue') ################################# -# SERVICE 3: START CLUSTER +# SERVICE 3: START CLUSTER ################################# def startCluster(): @@ -381,7 +409,7 @@ def startCluster(): spotfleetConfig['SpotPrice'] = '%.2f' %MACHINE_PRICE DOCKER_BASE_SIZE = int(round(float(EBS_VOL_SIZE)/int(TASKS_PER_MACHINE))) - 2 userData=generateUserData(ecsConfigFile,DOCKER_BASE_SIZE) - for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])): + for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])): spotfleetConfig['LaunchSpecifications'][LaunchSpecification]["UserData"]=userData spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['BlockDeviceMappings'][1]['Ebs']["VolumeSize"]= EBS_VOL_SIZE spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['InstanceType'] = MACHINE_TYPE[LaunchSpecification] @@ -404,7 +432,7 @@ def startCluster(): createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "'+LOG_GROUP_NAME+'",\n') createMonitor.write('"MONITOR_START_TIME" : "'+ starttime+'"}\n') createMonitor.close() - + # Step 4: Create a log group for this app and date if one does not already exist logclient=boto3.client('logs') loggroupinfo=logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME) @@ -415,13 +443,13 @@ def startCluster(): if LOG_GROUP_NAME+'_perInstance' not in groupnames: logclient.create_log_group(logGroupName=LOG_GROUP_NAME+'_perInstance') logclient.put_retention_policy(logGroupName=LOG_GROUP_NAME+'_perInstance', retentionInDays=60) - + # Step 5: update the ECS service to be ready to inject docker containers in EC2 instances print('Updating service') ecs = boto3.client('ecs') ecs.update_service(cluster=ECS_CLUSTER, service=APP_NAME+'Service', desiredCount=CLUSTER_MACHINES*TASKS_PER_MACHINE) - print('Service updated.') - + print('Service updated.') + # Step 6: Monitor the creation of the instances until all are present status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId']) #time.sleep(15) # This is now too fast, so sometimes the spot fleet request history throws an error! @@ -441,7 +469,7 @@ def startCluster(): return ec2client.cancel_spot_fleet_requests(SpotFleetRequestIds=[requestInfo['SpotFleetRequestId']], TerminateInstances=True) return - + # If everything seems good, just bide your time until you're ready to go print('.') time.sleep(20) @@ -450,21 +478,21 @@ def startCluster(): print('Spot fleet successfully created. Your job should start in a few minutes.') ################################# -# SERVICE 4: MONITOR JOB +# SERVICE 4: MONITOR JOB ################################# def monitor(cheapest=False): if len(sys.argv) < 3: print('Use: run.py monitor spotFleetIdFile') sys.exit() - + if '.json' not in sys.argv[2]: print('Use: run.py monitor spotFleetIdFile') sys.exit() if len(sys.argv) == 4: cheapest = sys.argv[3] - + monitorInfo = loadConfig(sys.argv[2]) monitorcluster=monitorInfo["MONITOR_ECS_CLUSTER"] monitorapp=monitorInfo["MONITOR_APP_NAME"] @@ -472,17 +500,17 @@ def monitor(cheapest=False): queueId=monitorInfo["MONITOR_QUEUE_NAME"] ec2 = boto3.client('ec2') - cloud = boto3.client('cloudwatch') + cloud = boto3.client('cloudwatch') # Optional Step 0 - decide if you're going to be cheap rather than fast. This means that you'll get 15 minutes # from the start of the monitor to get as many machines as you get, and then it will set the requested number to 1. - # Benefit: this will always be the cheapest possible way to run, because if machines die they'll die fast, - # Potential downside- if machines are at low availability when you start to run, you'll only ever get a small number + # Benefit: this will always be the cheapest possible way to run, because if machines die they'll die fast, + # Potential downside- if machines are at low availability when you start to run, you'll only ever get a small number # of machines (as opposed to getting more later when they become available), so it might take VERY long to run if that happens. if cheapest: queue = JobQueue(name=queueId) startcountdown = time.time() - while queue.pendingLoad(): + while queue.pendingLoad(): if time.time() - startcountdown > 900: downscaleSpotFleet(queue, fleetId, ec2, manual=1) break @@ -491,7 +519,7 @@ def monitor(cheapest=False): # Step 1: Create job and count messages periodically queue = JobQueue(name=queueId) while queue.pendingLoad(): - #Once an hour (except at midnight) check for terminated machines and delete their alarms. + #Once an hour (except at midnight) check for terminated machines and delete their alarms. #This is slooooooow, which is why we don't just do it at the end curtime=datetime.datetime.now().strftime('%H%M') if curtime[-2:]=='00': @@ -504,7 +532,7 @@ def monitor(cheapest=False): if curtime[-1:]=='9': downscaleSpotFleet(queue, fleetId, ec2) time.sleep(MONITOR_TIME) - + # Step 2: When no messages are pending, stop service # Reload the monitor info, because for long jobs new fleets may have been started, etc monitorInfo = loadConfig(sys.argv[2]) @@ -514,7 +542,7 @@ def monitor(cheapest=False): queueId=monitorInfo["MONITOR_QUEUE_NAME"] bucketId=monitorInfo["MONITOR_BUCKET_NAME"] loggroupId=monitorInfo["MONITOR_LOG_GROUP_NAME"] - starttime=monitorInfo["MONITOR_START_TIME"] + starttime=monitorInfo["MONITOR_START_TIME"] ecs = boto3.client('ecs') ecs.update_service(cluster=monitorcluster, service=monitorapp+'Service', desiredCount=0) @@ -565,14 +593,14 @@ def monitor(cheapest=False): print('All export tasks done') ################################# -# MAIN USER INTERACTION +# MAIN USER INTERACTION ################################# if __name__ == '__main__': if len(sys.argv) < 2: print('Use: run.py setup | submitJob | startCluster | monitor') sys.exit() - + if sys.argv[1] == 'setup': setup() elif sys.argv[1] == 'submitJob': diff --git a/worker/Dockerfile b/worker/Dockerfile index da3ab4e..80708f4 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -2,14 +2,14 @@ # - [ BROAD'16 ] - # # A docker instance for accessing AWS resources -# This wraps the cellprofiler docker registry +# This wraps your docker registry # FROM someuser/somedocker:sometag -# Install S3FS - +# Install S3FS +USER root RUN apt-get -y update && \ apt-get -y upgrade && \ apt-get -y install \ @@ -36,11 +36,11 @@ RUN make install # Install Python - not needed if you've already got it in your container # If you have a non-3.8 version, you will need to change python3.8 calls where specified -RUN apt install -y python3.8-dev python3.8-distutils +RUN apt install -y python3.8-dev python3.8-distutils python3-pip # Install AWS CLI -RUN python3.8 -m pip install awscli +RUN python3.8 -m pip install awscli # Install boto3 @@ -62,4 +62,3 @@ RUN chmod 755 run-worker.sh WORKDIR /home/ubuntu ENTRYPOINT ["./run-worker.sh"] CMD [""] - diff --git a/worker/generic-worker.py b/worker/generic-worker.py index 63706f7..1f00228 100644 --- a/worker/generic-worker.py +++ b/worker/generic-worker.py @@ -6,7 +6,7 @@ import os import re import subprocess -import sys +import sys import time import watchtower import string @@ -20,8 +20,14 @@ QUEUE_URL = os.environ['SQS_QUEUE_URL'] AWS_BUCKET = os.environ['AWS_BUCKET'] LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] -CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] -EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] +if 'CHECK_IF_DONE_BOOL' not in os.environ: + CHECK_IF_DONE_BOOL = False +else: + CHECK_IF_DONE_BOOL = os.environ['CHECK_IF_DONE_BOOL'] +if 'EXPECTED_NUMBER_FILES' not in os.environ: + EXPECTED_NUMBER_FILES = 1 +else: + EXPECTED_NUMBER_FILES = int(os.environ['EXPECTED_NUMBER_FILES']) if 'MIN_FILE_SIZE_BYTES' not in os.environ: MIN_FILE_SIZE_BYTES = 1 else: @@ -51,7 +57,7 @@ class JobQueue(): def __init__(self, queueURL): self.client = boto3.client('sqs') self.queueURL = queueURL - + def readMessage(self): response = self.client.receive_message(QueueUrl=self.queueURL, WaitTimeSeconds=20) if 'Messages' in response.keys(): @@ -81,7 +87,7 @@ def monitorAndLog(process,logger): break if output: print(output.strip()) - logger.info(output) + logger.info(output) def printandlog(text,logger): print(text) @@ -108,15 +114,15 @@ def runSomething(message): #groupkeys = list(group_to_run.keys()) #groupkeys.sort() #metadataID = '-'.join(groupkeys) - - # Add a handler with + + # Add a handler with # watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=str(metadataID),create_log_group=False) # logger.addHandler(watchtowerlogger) - # See if this is a message you've already handled, if you've so chosen - # First, build a variable called remoteOut that equals your unique prefix of where your output should be + # See if this is a message you've already handled, if you've so chosen + # First, build a variable called remoteOut that equals your unique prefix of where your output should be # Then check if there are too many files - + if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') @@ -131,7 +137,7 @@ def runSomething(message): logger.removeHandler(watchtowerlogger) return 'SUCCESS' except KeyError: #Returned if that folder does not exist - pass + pass # Build and run your program's command # ie cmd = my-program --my-flag-1 True --my-flag-2 VARIABLE @@ -142,18 +148,16 @@ def runSomething(message): subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) monitorAndLog(subp,logger) - # Figure out a done condition - a number of files being created, a particular file being created, an exit code, etc. - # Set its success to the boolean variable `done` - - # Get the outputs and move them to S3 - if done: + # Figure out a done condition - a number of files being created, a particular file being created, an exit code, etc. + # If done, get the outputs and move them to S3 + if [ENTER DONE CONDITION HERE]: time.sleep(30) mvtries=0 while mvtries <3: try: printandlog('Move attempt #'+str(mvtries+1),logger) - cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive' - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive' + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() err=err.decode() @@ -184,7 +188,7 @@ def runSomething(message): import shutil shutil.rmtree(localOut, ignore_errors=True) return 'PROBLEM' - + ################################# # MAIN WORKER LOOP @@ -216,4 +220,3 @@ def main(): print('Worker started') main() print('Worker finished') - diff --git a/worker/run-worker.sh b/worker/run-worker.sh index fb64ad8..ecab077 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -5,8 +5,6 @@ echo "Queue $SQS_QUEUE_URL" echo "Bucket $AWS_BUCKET" # 1. CONFIGURE AWS CLI -aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID -aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY aws configure set default.region $AWS_REGION MY_INSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id) echo "Instance ID $MY_INSTANCE_ID" @@ -17,15 +15,15 @@ aws ec2 create-tags --resources $VOL_0_ID --tags Key=Name,Value=${APP_NAME}Worke VOL_1_ID=$(aws ec2 describe-instance-attribute --instance-id $MY_INSTANCE_ID --attribute blockDeviceMapping --output text --query BlockDeviceMappings[1].Ebs.[VolumeId]) aws ec2 create-tags --resources $VOL_1_ID --tags Key=Name,Value=${APP_NAME}Worker -# 2. MOUNT S3 +# 2. MOUNT S3 (OPTIONAL, DEPENDING ON WORKFLOW) echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt chmod 600 /credentials.txt mkdir -p /home/ubuntu/bucket mkdir -p /home/ubuntu/local_output -stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt +stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt # 3. SET UP ALARMS -aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" +aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" # 4. RUN VM STAT MONITOR @@ -37,4 +35,3 @@ for((k=0; k<$DOCKER_CORES; k++)); do sleep $SECONDS_TO_START done wait -