From 0417e7d2a0bc5d446f9af4418a96755964304a6a Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 16 Jun 2022 14:07:41 -0700 Subject: [PATCH 01/21] correct role handling --- run.py | 66 ++++++++++++++++++++++---------------------- worker/run-worker.sh | 9 ++---- 2 files changed, 36 insertions(+), 39 deletions(-) diff --git a/run.py b/run.py index 415aca9..418ce27 100644 --- a/run.py +++ b/run.py @@ -60,11 +60,6 @@ # AUXILIARY FUNCTIONS ################################# -def get_aws_credentials(AWS_PROFILE): - session = boto3.Session(profile_name=AWS_PROFILE) - credentials = session.get_credentials() - return credentials.access_key, credentials.secret_key - def generate_task_definition(AWS_PROFILE): task_definition = TASK_DEFINITION.copy() key, secret = get_aws_credentials(AWS_PROFILE) @@ -130,13 +125,18 @@ def generate_task_definition(AWS_PROFILE): { "name": "DOWNLOAD_FILES", "value": DOWNLOAD_FILES - } + } ] return task_definition def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE): - task_definition = generate_task_definition(AWS_PROFILE) - ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions']) + task_definition, taskRoleArn = generate_task_definition(AWS_PROFILE) + if not taskRoleArn: + ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions']) + elif taskRoleArn: + ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions'],taskRoleArn=taskRoleArn) + else: + print('Mistake in handling role for Task Definition.') print('Task definition registered') def get_or_create_cluster(ecs): @@ -195,14 +195,14 @@ def killdeadAlarms(fleetId,monitorapp,ec2,cloud): todel.append(eachevent['EventInformation']['InstanceId']) existing_alarms = [x['AlarmName'] for x in cloud.describe_alarms(AlarmNamePrefix=monitorapp)['MetricAlarms']] - + for eachmachine in todel: monitorname = monitorapp+'_'+eachmachine if monitorname in existing_alarms: cloud.delete_alarms(AlarmNames=[monitorname]) print('Deleted', monitorname, 'if it existed') time.sleep(3) - + print('Old alarms deleted') def generateECSconfig(ECS_CLUSTER,APP_NAME,AWS_BUCKET,s3client): @@ -248,7 +248,7 @@ def removequeue(queueName): for eachUrl in queueoutput["QueueUrls"]: if eachUrl.split('/')[-1] == queueName: queueUrl=eachUrl - + sqs.delete_queue(QueueUrl=queueUrl) def deregistertask(taskName, ecs): @@ -278,7 +278,7 @@ def downscaleSpotFleet(queue, spotFleetID, ec2, manual=False): def export_logs(logs, loggroupId, starttime, bucketId): result = logs.create_export_task(taskName = loggroupId, logGroupName = loggroupId, fromTime = int(starttime), to = int(time.time()*1000), destination = bucketId, destinationPrefix = 'exportedlogs/'+loggroupId) - + logExportId = result['taskId'] while True: @@ -301,7 +301,7 @@ def __init__(self,name=None): self.queue = self.sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME) else: self.queue = self.sqs.get_queue_by_name(QueueName=name) - self.inProcess = -1 + self.inProcess = -1 self.pending = -1 def scheduleBatch(self, data): @@ -360,7 +360,7 @@ def submitJob(): jobInfo = loadConfig(sys.argv[2]) if 'output_structure' not in jobInfo.keys(): #backwards compatibility for 1.0.0 jobInfo["output_structure"]='' - templateMessage = {'Metadata': '', + templateMessage = {'Metadata': '', 'pipeline': jobInfo["pipeline"], 'output': jobInfo["output"], 'input': jobInfo["input"], @@ -382,7 +382,7 @@ def submitJob(): print('Job submitted. Check your queue') ################################# -# SERVICE 3: START CLUSTER +# SERVICE 3: START CLUSTER ################################# def startCluster(): @@ -401,7 +401,7 @@ def startCluster(): spotfleetConfig['SpotPrice'] = '%.2f' %MACHINE_PRICE DOCKER_BASE_SIZE = int(round(float(EBS_VOL_SIZE)/int(TASKS_PER_MACHINE))) - 2 userData=generateUserData(ecsConfigFile,DOCKER_BASE_SIZE) - for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])): + for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])): spotfleetConfig['LaunchSpecifications'][LaunchSpecification]["UserData"]=userData spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['BlockDeviceMappings'][1]['Ebs']["VolumeSize"]= EBS_VOL_SIZE spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['InstanceType'] = MACHINE_TYPE[LaunchSpecification] @@ -424,7 +424,7 @@ def startCluster(): createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "'+LOG_GROUP_NAME+'",\n') createMonitor.write('"MONITOR_START_TIME" : "'+ starttime+'"}\n') createMonitor.close() - + # Step 4: Create a log group for this app and date if one does not already exist logclient=boto3.client('logs') loggroupinfo=logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME) @@ -435,13 +435,13 @@ def startCluster(): if LOG_GROUP_NAME+'_perInstance' not in groupnames: logclient.create_log_group(logGroupName=LOG_GROUP_NAME+'_perInstance') logclient.put_retention_policy(logGroupName=LOG_GROUP_NAME+'_perInstance', retentionInDays=60) - + # Step 5: update the ECS service to be ready to inject docker containers in EC2 instances print('Updating service') ecs = boto3.client('ecs') ecs.update_service(cluster=ECS_CLUSTER, service=APP_NAME+'Service', desiredCount=CLUSTER_MACHINES*TASKS_PER_MACHINE) - print('Service updated.') - + print('Service updated.') + # Step 6: Monitor the creation of the instances until all are present status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId']) #time.sleep(15) # This is now too fast, so sometimes the spot fleet request history throws an error! @@ -461,7 +461,7 @@ def startCluster(): return ec2client.cancel_spot_fleet_requests(SpotFleetRequestIds=[requestInfo['SpotFleetRequestId']], TerminateInstances=True) return - + # If everything seems good, just bide your time until you're ready to go print('.') time.sleep(20) @@ -470,21 +470,21 @@ def startCluster(): print('Spot fleet successfully created. Your job should start in a few minutes.') ################################# -# SERVICE 4: MONITOR JOB +# SERVICE 4: MONITOR JOB ################################# def monitor(cheapest=False): if len(sys.argv) < 3: print('Use: run.py monitor spotFleetIdFile') sys.exit() - + if '.json' not in sys.argv[2]: print('Use: run.py monitor spotFleetIdFile') sys.exit() if len(sys.argv) == 4: cheapest = sys.argv[3] - + monitorInfo = loadConfig(sys.argv[2]) monitorcluster=monitorInfo["MONITOR_ECS_CLUSTER"] monitorapp=monitorInfo["MONITOR_APP_NAME"] @@ -492,17 +492,17 @@ def monitor(cheapest=False): queueId=monitorInfo["MONITOR_QUEUE_NAME"] ec2 = boto3.client('ec2') - cloud = boto3.client('cloudwatch') + cloud = boto3.client('cloudwatch') # Optional Step 0 - decide if you're going to be cheap rather than fast. This means that you'll get 15 minutes # from the start of the monitor to get as many machines as you get, and then it will set the requested number to 1. - # Benefit: this will always be the cheapest possible way to run, because if machines die they'll die fast, - # Potential downside- if machines are at low availability when you start to run, you'll only ever get a small number + # Benefit: this will always be the cheapest possible way to run, because if machines die they'll die fast, + # Potential downside- if machines are at low availability when you start to run, you'll only ever get a small number # of machines (as opposed to getting more later when they become available), so it might take VERY long to run if that happens. if cheapest: queue = JobQueue(name=queueId) startcountdown = time.time() - while queue.pendingLoad(): + while queue.pendingLoad(): if time.time() - startcountdown > 900: downscaleSpotFleet(queue, fleetId, ec2, manual=1) break @@ -511,7 +511,7 @@ def monitor(cheapest=False): # Step 1: Create job and count messages periodically queue = JobQueue(name=queueId) while queue.pendingLoad(): - #Once an hour (except at midnight) check for terminated machines and delete their alarms. + #Once an hour (except at midnight) check for terminated machines and delete their alarms. #This is slooooooow, which is why we don't just do it at the end curtime=datetime.datetime.now().strftime('%H%M') if curtime[-2:]=='00': @@ -524,7 +524,7 @@ def monitor(cheapest=False): if curtime[-1:]=='9': downscaleSpotFleet(queue, fleetId, ec2) time.sleep(MONITOR_TIME) - + # Step 2: When no messages are pending, stop service # Reload the monitor info, because for long jobs new fleets may have been started, etc monitorInfo = loadConfig(sys.argv[2]) @@ -534,7 +534,7 @@ def monitor(cheapest=False): queueId=monitorInfo["MONITOR_QUEUE_NAME"] bucketId=monitorInfo["MONITOR_BUCKET_NAME"] loggroupId=monitorInfo["MONITOR_LOG_GROUP_NAME"] - starttime=monitorInfo["MONITOR_START_TIME"] + starttime=monitorInfo["MONITOR_START_TIME"] ecs = boto3.client('ecs') ecs.update_service(cluster=monitorcluster, service=monitorapp+'Service', desiredCount=0) @@ -585,14 +585,14 @@ def monitor(cheapest=False): print('All export tasks done') ################################# -# MAIN USER INTERACTION +# MAIN USER INTERACTION ################################# if __name__ == '__main__': if len(sys.argv) < 2: print('Use: run.py setup | submitJob | startCluster | monitor') sys.exit() - + if sys.argv[1] == 'setup': setup() elif sys.argv[1] == 'submitJob': diff --git a/worker/run-worker.sh b/worker/run-worker.sh index 454c4b6..d932359 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -5,8 +5,6 @@ echo "Queue $SQS_QUEUE_URL" echo "Bucket $AWS_BUCKET" # 1. CONFIGURE AWS CLI -aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID -aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY aws configure set default.region $AWS_REGION MY_INSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id) echo "Instance ID $MY_INSTANCE_ID" @@ -17,15 +15,15 @@ aws ec2 create-tags --resources $VOL_0_ID --tags Key=Name,Value=${APP_NAME}Worke VOL_1_ID=$(aws ec2 describe-instance-attribute --instance-id $MY_INSTANCE_ID --attribute blockDeviceMapping --output text --query BlockDeviceMappings[1].Ebs.[VolumeId]) aws ec2 create-tags --resources $VOL_1_ID --tags Key=Name,Value=${APP_NAME}Worker -# 2. MOUNT S3 +# 2. MOUNT S3 echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt chmod 600 /credentials.txt mkdir -p /home/ubuntu/bucket mkdir -p /home/ubuntu/local_output -stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt +stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt # 3. SET UP ALARMS -aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" +aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" # 4. RUN VM STAT MONITOR @@ -37,4 +35,3 @@ for((k=0; k<$DOCKER_CORES; k++)); do sleep $SECONDS_TO_START done wait - From 526411d3e8979468ed3550757177f2e527a62c9f Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Tue, 2 Aug 2022 08:56:23 -0700 Subject: [PATCH 02/21] update generate_task_definition --- run.py | 80 ++++++++++++++++++++++++++++------------------------------ 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/run.py b/run.py index 418ce27..e01b184 100644 --- a/run.py +++ b/run.py @@ -61,8 +61,44 @@ ################################# def generate_task_definition(AWS_PROFILE): + taskRoleArn = False task_definition = TASK_DEFINITION.copy() - key, secret = get_aws_credentials(AWS_PROFILE) + + config = configparser.ConfigParser() + config.read(f"{os.environ['HOME']}/.aws/config") + + if config.has_section(AWS_PROFILE): + profile_name = AWS_PROFILE + elif config.has_section(f'profile {AWS_PROFILE}'): + profile_name = f'profile {AWS_PROFILE}' + else: + print ('Problem handling profile') + + if config.has_option(profile_name, 'role_arn'): + print ("Using role for credentials", config[profile_name]['role_arn']) + taskRoleArn = config[profile_name]['role_arn'] + else: + if config.has_option(profile_name, 'source_profile'): + creds = configparser.ConfigParser() + creds.read(f"{os.environ['HOME']}/.aws/credentials") + source_profile = config[profile_name]['source_profile'] + aws_access_key = creds[source_profile]['aws_access_key_id'] + aws_secret_key = creds[source_profile]['aws_secret_access_key'] + elif config.has_option(profile_name, 'aws_access_key_id'): + aws_access_key = config[profile_name]['aws_access_key_id'] + aws_secret_key = config[profile_name]['aws_secret_access_key'] + else: + print ("Problem getting credentials") + task_definition['containerDefinitions'][0]['environment'] += [ + { + "name": "AWS_ACCESS_KEY_ID", + "value": aws_access_key + }, + { + "name": "AWS_SECRET_ACCESS_KEY", + "value": aws_secret_key + }] + sqs = boto3.client('sqs') queue_name = get_queue_url(sqs) task_definition['containerDefinitions'][0]['environment'] += [ @@ -74,22 +110,6 @@ def generate_task_definition(AWS_PROFILE): 'name': 'SQS_QUEUE_URL', 'value': queue_name }, - { - "name": "AWS_ACCESS_KEY_ID", - "value": key - }, - { - "name": "AWS_SECRET_ACCESS_KEY", - "value": secret - }, - { - "name": "AWS_BUCKET", - "value": AWS_BUCKET - }, - { - "name": "DOCKER_CORES", - "value": str(DOCKER_CORES) - }, { "name": "LOG_GROUP_NAME", "value": LOG_GROUP_NAME @@ -98,36 +118,12 @@ def generate_task_definition(AWS_PROFILE): "name": "CHECK_IF_DONE_BOOL", "value": CHECK_IF_DONE_BOOL }, - { - "name": "EXPECTED_NUMBER_FILES", - "value": str(EXPECTED_NUMBER_FILES) - }, { "name": "ECS_CLUSTER", "value": ECS_CLUSTER }, - { - "name": "SECONDS_TO_START", - "value": str(SECONDS_TO_START) - }, - { - "name": "MIN_FILE_SIZE_BYTES", - "value": str(MIN_FILE_SIZE_BYTES) - }, - { - "name": "USE_PLUGINS", - "value": str(USE_PLUGINS) - }, - { - "name": "NECESSARY_STRING", - "value": NECESSARY_STRING - }, - { - "name": "DOWNLOAD_FILES", - "value": DOWNLOAD_FILES - } ] - return task_definition + return task_definition, taskRoleArn def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE): task_definition, taskRoleArn = generate_task_definition(AWS_PROFILE) From 38df4c92021bdf8e386e8aa3b16f1d14deba6be3 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Tue, 2 Aug 2022 09:02:32 -0700 Subject: [PATCH 03/21] import configparser in run.py --- run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/run.py b/run.py index e01b184..6bb4bd0 100644 --- a/run.py +++ b/run.py @@ -5,6 +5,7 @@ import json import time from base64 import b64encode +import configparser from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText From 80a8c5b2c0cacd87a04bd6e37928beaa3ada76ef Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Tue, 2 Aug 2022 10:45:29 -0700 Subject: [PATCH 04/21] enable separate source and destination buckets --- config.py | 4 +-- files/exampleJob.json | 7 +++-- run_batch_general.py | 66 ++++++++++++++++++++++++++++++------------- worker/cp-worker.py | 39 +++++++++++++++---------- 4 files changed, 76 insertions(+), 40 deletions(-) diff --git a/config.py b/config.py index 9b0c9b7..4a6836b 100644 --- a/config.py +++ b/config.py @@ -9,7 +9,7 @@ AWS_REGION = 'us-east-1' AWS_PROFILE = 'default' # The same profile used by your AWS CLI installation SSH_KEY_NAME = 'your-key-file.pem' # Expected to be in ~/.ssh -AWS_BUCKET = 'your-bucket-name' +AWS_BUCKET = 'your-bucket-name' # Bucket to use for logging # EC2 AND ECS INFORMATION: ECS_CLUSTER = 'default' @@ -32,7 +32,7 @@ SQS_DEAD_LETTER_QUEUE = 'arn:aws:sqs:some-region:111111100000:DeadMessages' # LOG GROUP INFORMATION: -LOG_GROUP_NAME = APP_NAME +LOG_GROUP_NAME = APP_NAME # REDUNDANCY CHECKS CHECK_IF_DONE_BOOL = 'False' #True or False- should it check if there are a certain number of non-empty files and delete the job if yes? diff --git a/files/exampleJob.json b/files/exampleJob.json index 64c57be..4e23c1b 100644 --- a/files/exampleJob.json +++ b/files/exampleJob.json @@ -1,8 +1,10 @@ { "_comment1": "Paths in this file are relative to the root of your S3 bucket", - "pipeline": "projects/analysis.cppipe", - "data_file": "projects/list_of_images.csv", + "pipeline": "projects/analysis.cppipe", + "data_file": "projects/list_of_images.csv", + "input_bucket": "SOURCE_BUCKET", "input": "projects/input/", + "output_bucket": "DESTINATION_BUCKET", "output": "projects/output/", "output_structure": "Metadata_Plate-Metadata_Well-Metadata_Site", "_comment2": "The following groups are tasks, and each will be run in parallel", @@ -11,4 +13,3 @@ {"Metadata": "Metadata_Plate=Plate1,Metadata_Well=A01,Metadata_Site=2"} ] } - diff --git a/run_batch_general.py b/run_batch_general.py index 2f4ba5d..9c899a3 100644 --- a/run_batch_general.py +++ b/run_batch_general.py @@ -17,13 +17,15 @@ def scheduleBatch(self, data): print('Batch sent. Message ID:',response.get('MessageId')) #project specific stuff -topdirname='' #Project name (should match the folder structure on S3) +topdirname='' #Project name (should match the folder structure on S3) appname='' #Must match config.py (except for step-specific part) batchsuffix='' #Batch name (should match the folder structure on S3) +input_bucket='' +output_bucket='' rows=list(string.ascii_uppercase)[0:16] columns=range(1,25) sites=range(1,10) -platelist=[] +platelist=[] zprojpipename='Zproj.cppipe' illumpipename='illum.cppipe' qcpipename='qc.cppipe' @@ -64,7 +66,9 @@ def MakeZprojJobs(batch=False): 'output': zprojoutpath, 'output_structure': zprojoutputstructure, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,tozproj,csv_unprojected_name) + 'data_file': posixpath.join(datafilepath,tozproj,csv_unprojected_name), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } else: templateMessage_zproj = {'Metadata': 'Metadata_Plate='+tozproj+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite), @@ -72,30 +76,37 @@ def MakeZprojJobs(batch=False): 'output': zprojoutpath, 'output_structure': zprojoutputstructure, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenamezproj) + 'data_file': posixpath.join(batchpath,batchpipenamezproj), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } zprojqueue.scheduleBatch(templateMessage_zproj) print('Z projection job submitted. Check your queue') -def MakeIllumJobs(batch=False): +def MakeIllumJobs(batch=False): illumqueue = JobQueue(appname+'_Illum') for toillum in platelist: if not batch: templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum, 'pipeline': posixpath.join(pipelinepath,illumpipename), 'output': illumoutpath, - 'input': inputpath, - 'data_file':posixpath.join(datafilepath,toillum,csvname)} + 'input': inputpath, + 'data_file':posixpath.join(datafilepath,toillum,csvname), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, + } else: templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum, 'pipeline': posixpath.join(batchpath,batchpipenameillum), 'output': illumoutpath, 'input':inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameillum) + 'data_file': posixpath.join(batchpath,batchpipenameillum), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } - + illumqueue.scheduleBatch(templateMessage_illum) print('Illum job submitted. Check your queue') @@ -110,14 +121,18 @@ def MakeQCJobs(batch=False): 'pipeline': posixpath.join(pipelinepath,qcpipename), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toqc,csvname) + 'data_file': posixpath.join(datafilepath,toqc,csvname), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } else: templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol, 'pipeline': posixpath.join(batchpath,batchpipenameqc), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameqc) + 'data_file': posixpath.join(batchpath,batchpipenameqc), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } qcqueue.scheduleBatch(templateMessage_qc) @@ -134,14 +149,18 @@ def MakeQCJobs_persite(batch=False): 'pipeline': posixpath.join(pipelinepath,qcpipename), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toqc,csvname) + 'data_file': posixpath.join(datafilepath,toqc,csvname), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } else: templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite), 'pipeline': posixpath.join(batchpath,batchpipenameqc), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameqc) + 'data_file': posixpath.join(batchpath,batchpipenameqc), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } qcqueue.scheduleBatch(templateMessage_qc) @@ -158,14 +177,18 @@ def MakeAssayDevJobs(batch=False): 'pipeline': posixpath.join(pipelinepath,assaydevpipename), 'output': assaydevoutpath, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toad,csv_with_illumname) + 'data_file': posixpath.join(datafilepath,toad,csv_with_illumname), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } else: templateMessage_ad = {'Metadata': 'Metadata_Plate='+toad+',Metadata_Well='+eachrow+'%02d' %eachcol, 'pipeline': posixpath.join(batchpath,batchpipenameassaydev), 'output': assaydevoutpath, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameassaydev) + 'data_file': posixpath.join(batchpath,batchpipenameassaydev), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } assaydevqueue.scheduleBatch(templateMessage_ad) @@ -183,25 +206,28 @@ def MakeAnalysisJobs(batch=False): 'output': analysisoutpath, 'output_structure':anlysisoutputstructure, 'input':inputpath, - 'data_file': posixpath.join(datafilepath,toanalyze,csv_with_illumname) - } + 'data_file': posixpath.join(datafilepath,toanalyze,csv_with_illumname), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, + } else: templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite), 'pipeline': posixpath.join(batchpath,batchpipenameanalysis), 'output': analysisoutpath, 'output_structure':anlysisoutputstructure, 'input':inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameanalysis) + 'data_file': posixpath.join(batchpath,batchpipenameanalysis), + 'input_bucket': input_bucket, + 'output_bucket': output_bucket, } analysisqueue.scheduleBatch(templateMessage_analysis) print('Analysis job submitted. Check your queue') -#MakeZprojJobs(batch=False) +#MakeZprojJobs(batch=False) #MakeIllumJobs(batch=False) #MakeQCJobs(batch=False) #MakeQCJobs_persite(batch=False) #MakeAssayDevJobs(batch=False) #MakeAnalysisJobs(batch=False) - diff --git a/worker/cp-worker.py b/worker/cp-worker.py index fc6d6db..186a6dd 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -6,7 +6,7 @@ import os import re import subprocess -import sys +import sys import time import watchtower import string @@ -19,7 +19,8 @@ LOCAL_OUTPUT = '/home/ubuntu/local_output' PLUGIN_DIR = '/home/ubuntu/CellProfiler-plugins' QUEUE_URL = os.environ['SQS_QUEUE_URL'] -AWS_BUCKET = os.environ['AWS_BUCKET'] +if 'AWS_BUCKET' in os.environ: + AWS_BUCKET = os.environ['AWS_BUCKET'] LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] @@ -52,7 +53,7 @@ class JobQueue(): def __init__(self, queueURL): self.client = boto3.client('sqs') self.queueURL = queueURL - + def readMessage(self): response = self.client.receive_message(QueueUrl=self.queueURL, WaitTimeSeconds=20) if 'Messages' in response.keys(): @@ -82,7 +83,7 @@ def monitorAndLog(process,logger): break if output: print(output.strip()) - logger.info(output) + logger.info(output) def printandlog(text,logger): print(text) @@ -144,18 +145,21 @@ def runCellProfiler(message): localOut = LOCAL_OUTPUT + '/%(MetadataID)s' % {'MetadataID': metadataID} remoteOut= os.path.join(message['output'],metadataID) replaceValues = {'PL':message['pipeline'], 'OUT':localOut, 'FL':message['data_file'], - 'DATA': DATA_ROOT, 'Metadata': message['Metadata'], 'IN': message['input'], + 'DATA': DATA_ROOT, 'Metadata': message['Metadata'], 'IN': message['input'], 'MetadataID':metadataID, 'PLUGINS':PLUGIN_DIR } # Start loggging now that we have a job we care about watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=metadataID,create_log_group=False) - logger.addHandler(watchtowerlogger) + logger.addHandler(watchtowerlogger) # See if this is a message you've already handled, if you've so chosen if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') - bucketlist=s3client.list_objects(Bucket=AWS_BUCKET,Prefix=remoteOut+'/') + if AWS_BUCKET: + bucketlist=s3client.list_objects(Bucket=AWS_BUCKET,Prefix=remoteOut+'/') + else: + bucketlist=s3client.list_objects(Bucket=message["output_bucket"],Prefix=remoteOut+'/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] if NECESSARY_STRING: @@ -166,8 +170,8 @@ def runCellProfiler(message): logger.removeHandler(watchtowerlogger) return 'SUCCESS' except KeyError: #Returned if that folder does not exist - pass - + pass + csv_name = os.path.join(DATA_ROOT,message['data_file']) downloaded_files = [] @@ -204,7 +208,10 @@ def runCellProfiler(message): os.makedirs(os.path.split(new_file_name)[0]) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): - s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) + if AWS_BUCKET: + s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) + else: + s3.meta.client.download_file(message['input_bucket'],prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random @@ -238,7 +245,7 @@ def runCellProfiler(message): cmd = cmd % replaceValues print('Running', cmd) logger.info(cmd) - + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) monitorAndLog(subp,logger) @@ -253,8 +260,11 @@ def runCellProfiler(message): while mvtries <3: try: printandlog('Move attempt #'+str(mvtries+1),logger) - cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if AWS_BUCKET: + cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + else: + cmd = 'aws s3 mv ' + localOut + ' s3://' + message['output_bucket'] + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() err=err.decode() @@ -288,7 +298,7 @@ def runCellProfiler(message): import shutil shutil.rmtree(localOut, ignore_errors=True) return 'CP_PROBLEM' - + ################################# # MAIN WORKER LOOP @@ -320,4 +330,3 @@ def main(): print('Worker started') main() print('Worker finished') - From 549871062f2bad6a5ca1fe950f56767f74dbef20 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Tue, 2 Aug 2022 14:37:23 -0700 Subject: [PATCH 05/21] oops shouldnt have deleted those --- run.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/run.py b/run.py index 6bb4bd0..366d481 100644 --- a/run.py +++ b/run.py @@ -111,6 +111,14 @@ def generate_task_definition(AWS_PROFILE): 'name': 'SQS_QUEUE_URL', 'value': queue_name }, + { + "name": "AWS_BUCKET", + "value": AWS_BUCKET + }, + { + "name": "DOCKER_CORES", + "value": str(DOCKER_CORES) + }, { "name": "LOG_GROUP_NAME", "value": LOG_GROUP_NAME @@ -119,10 +127,34 @@ def generate_task_definition(AWS_PROFILE): "name": "CHECK_IF_DONE_BOOL", "value": CHECK_IF_DONE_BOOL }, + { + "name": "EXPECTED_NUMBER_FILES", + "value": str(EXPECTED_NUMBER_FILES) + }, { "name": "ECS_CLUSTER", "value": ECS_CLUSTER }, + { + "name": "SECONDS_TO_START", + "value": str(SECONDS_TO_START) + }, + { + "name": "MIN_FILE_SIZE_BYTES", + "value": str(MIN_FILE_SIZE_BYTES) + }, + { + "name": "USE_PLUGINS", + "value": str(USE_PLUGINS) + }, + { + "name": "NECESSARY_STRING", + "value": NECESSARY_STRING + }, + { + "name": "DOWNLOAD_FILES", + "value": DOWNLOAD_FILES + } ] return task_definition, taskRoleArn From e657f11343a06f21d89ab6a4ffe67df80cb54a5a Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Wed, 3 Aug 2022 10:32:03 -0700 Subject: [PATCH 06/21] fix multibucket handling --- worker/cp-worker.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 186a6dd..eaf16d5 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -19,8 +19,7 @@ LOCAL_OUTPUT = '/home/ubuntu/local_output' PLUGIN_DIR = '/home/ubuntu/CellProfiler-plugins' QUEUE_URL = os.environ['SQS_QUEUE_URL'] -if 'AWS_BUCKET' in os.environ: - AWS_BUCKET = os.environ['AWS_BUCKET'] +AWS_BUCKET = os.environ['AWS_BUCKET'] LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] @@ -156,10 +155,10 @@ def runCellProfiler(message): if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') - if AWS_BUCKET: - bucketlist=s3client.list_objects(Bucket=AWS_BUCKET,Prefix=remoteOut+'/') - else: + if message["output_bucket"]: bucketlist=s3client.list_objects(Bucket=message["output_bucket"],Prefix=remoteOut+'/') + else: + bucketlist=s3client.list_objects(Bucket=AWS_BUCKET,Prefix=remoteOut+'/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] if NECESSARY_STRING: @@ -208,10 +207,10 @@ def runCellProfiler(message): os.makedirs(os.path.split(new_file_name)[0]) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): - if AWS_BUCKET: - s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) - else: + if message['input_bucket']: s3.meta.client.download_file(message['input_bucket'],prefix_on_bucket,new_file_name) + else: + s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random @@ -260,10 +259,10 @@ def runCellProfiler(message): while mvtries <3: try: printandlog('Move attempt #'+str(mvtries+1),logger) - if AWS_BUCKET: - cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' - else: + if message['output_bucket']: cmd = 'aws s3 mv ' + localOut + ' s3://' + message['output_bucket'] + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + else: + cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() From cefd9e0a3db65b5a5868aa350002a2c569063385 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Wed, 3 Aug 2022 14:16:20 -0700 Subject: [PATCH 07/21] move source destination buckets to config --- config.py | 2 ++ worker/cp-worker.py | 23 +++++++++++------------ worker/run-worker.sh | 20 +++++++++++++++++--- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/config.py b/config.py index 4a6836b..0c2cb8c 100644 --- a/config.py +++ b/config.py @@ -10,6 +10,8 @@ AWS_PROFILE = 'default' # The same profile used by your AWS CLI installation SSH_KEY_NAME = 'your-key-file.pem' # Expected to be in ~/.ssh AWS_BUCKET = 'your-bucket-name' # Bucket to use for logging +SOURCE_BUCKET = 'bucket-name' # Bucket to download files from +DESTINATION_BUCKET = 'bucket-name' # Bucket to upload files to # EC2 AND ECS INFORMATION: ECS_CLUSTER = 'default' diff --git a/worker/cp-worker.py b/worker/cp-worker.py index eaf16d5..152cf49 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -20,6 +20,14 @@ PLUGIN_DIR = '/home/ubuntu/CellProfiler-plugins' QUEUE_URL = os.environ['SQS_QUEUE_URL'] AWS_BUCKET = os.environ['AWS_BUCKET'] +if 'SOURCE_BUCKET' not in os.environ: + SOURCE_BUCKET = os.environ['SOURCE_BUCKET'] +else: + SOURCE_BUCKET = os.environ['AWS_BUCKET'] +if 'DESTINATION_BUCKET' not in os.environ: + DESTINATION_BUCKET = os.environ['AWS_BUCKET'] +else: + DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET'] LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] @@ -155,10 +163,7 @@ def runCellProfiler(message): if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') - if message["output_bucket"]: - bucketlist=s3client.list_objects(Bucket=message["output_bucket"],Prefix=remoteOut+'/') - else: - bucketlist=s3client.list_objects(Bucket=AWS_BUCKET,Prefix=remoteOut+'/') + bucketlist=s3client.list_objects(Bucket=OUTPUT_BUCKET,Prefix=remoteOut+'/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] if NECESSARY_STRING: @@ -207,10 +212,7 @@ def runCellProfiler(message): os.makedirs(os.path.split(new_file_name)[0]) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): - if message['input_bucket']: - s3.meta.client.download_file(message['input_bucket'],prefix_on_bucket,new_file_name) - else: - s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) + s3.meta.client.download_file(INPUT_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random @@ -259,10 +261,7 @@ def runCellProfiler(message): while mvtries <3: try: printandlog('Move attempt #'+str(mvtries+1),logger) - if message['output_bucket']: - cmd = 'aws s3 mv ' + localOut + ' s3://' + message['output_bucket'] + '/' + remoteOut + ' --recursive --exclude=cp.is.done' - else: - cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + cmd = 'aws s3 mv ' + localOut + ' s3://' + OUTPUT_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() diff --git a/worker/run-worker.sh b/worker/run-worker.sh index d932359..eb1156d 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -2,7 +2,11 @@ echo "Region $AWS_REGION" echo "Queue $SQS_QUEUE_URL" -echo "Bucket $AWS_BUCKET" +if ! [-v SOURCE_BUCKET] +then + SOURCE_BUCKET=$AWS_BUCKET +fi +echo "Source Bucket $SOURCE_BUCKET" # 1. CONFIGURE AWS CLI aws configure set default.region $AWS_REGION @@ -16,10 +20,20 @@ VOL_1_ID=$(aws ec2 describe-instance-attribute --instance-id $MY_INSTANCE_ID --a aws ec2 create-tags --resources $VOL_1_ID --tags Key=Name,Value=${APP_NAME}Worker # 2. MOUNT S3 -echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt -chmod 600 /credentials.txt mkdir -p /home/ubuntu/bucket mkdir -p /home/ubuntu/local_output +if ! [ -v AWS_ACCESS_KEY_ID ] +then + declare -A CREDS=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI) + echo "$CREDS" + echo "${CREDS[AccessKeyId]}" + AWS_ACCESS_KEY_ID=${CREDS[AccessKeyId]} + AWS_SECRET_ACCESS_KEY=${CREDS[SecretAccessKey]} + echo "$AWS_ACCESS_KEY_ID"declare -A + echo "$AWS_SECRET_ACCESS_KEY" +fi +echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt +chmod 600 /credentials.txt stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt # 3. SET UP ALARMS From 865d515c92e284ae616531b4571a882e25ad499b Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Wed, 3 Aug 2022 14:50:53 -0700 Subject: [PATCH 08/21] switched in and not in --- worker/cp-worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 152cf49..2ef16c0 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -21,9 +21,9 @@ QUEUE_URL = os.environ['SQS_QUEUE_URL'] AWS_BUCKET = os.environ['AWS_BUCKET'] if 'SOURCE_BUCKET' not in os.environ: - SOURCE_BUCKET = os.environ['SOURCE_BUCKET'] -else: SOURCE_BUCKET = os.environ['AWS_BUCKET'] +else: + SOURCE_BUCKET = os.environ['SOURCE_BUCKET'] if 'DESTINATION_BUCKET' not in os.environ: DESTINATION_BUCKET = os.environ['AWS_BUCKET'] else: From f7919b87390523492d317125122b243dc054992b Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 4 Aug 2022 09:40:31 -0700 Subject: [PATCH 09/21] fix s3fs bybass --- worker/cp-worker.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 2ef16c0..7cf277f 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -184,10 +184,11 @@ def runCellProfiler(message): if DOWNLOAD_FILES.lower() == 'true': printandlog('Figuring which files to download', logger) import pandas - s3 = boto3.resource('s3') + s3client = boto3.client('s3') if not os.path.exists(localIn): os.mkdir(localIn) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) + s3client.download_file(SOURCE_BUCKET, message['data_file'], os.path.join(localIn,'load_data.csv')) + csv_in = pandas.read_csv(os.path.join(localIn,'load_data.csv')) csv_in=csv_in.astype('str') #Figure out what metadata fields we need in this experiment, as a dict if type(message['Metadata'])==dict: @@ -212,7 +213,9 @@ def runCellProfiler(message): os.makedirs(os.path.split(new_file_name)[0]) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): - s3.meta.client.download_file(INPUT_BUCKET,prefix_on_bucket,new_file_name) + printandlog(prefix_on_bucket, logger) + printandlog(new_file_name, logger) + s3client.download_file(SOURCE_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random From b9f89162c94a63f214ee7d34b6a9a5c5fbd1e67f Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 4 Aug 2022 09:52:10 -0700 Subject: [PATCH 10/21] remove multibucket from job file --- files/exampleJob.json | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/files/exampleJob.json b/files/exampleJob.json index 4e23c1b..0b477eb 100644 --- a/files/exampleJob.json +++ b/files/exampleJob.json @@ -1,10 +1,9 @@ { - "_comment1": "Paths in this file are relative to the root of your S3 bucket", + "_comment1": "Paths in this file are relative to the root of S3 buckets", + "_comment2": "pipeline, data_file, and input are relative to INPUT_BUCKET; output to OUTPUT_BUCKET", "pipeline": "projects/analysis.cppipe", "data_file": "projects/list_of_images.csv", - "input_bucket": "SOURCE_BUCKET", "input": "projects/input/", - "output_bucket": "DESTINATION_BUCKET", "output": "projects/output/", "output_structure": "Metadata_Plate-Metadata_Well-Metadata_Site", "_comment2": "The following groups are tasks, and each will be run in parallel", From 3d8cc82a121337877a7693379d0e666dd6d5ee1e Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 4 Aug 2022 09:56:48 -0700 Subject: [PATCH 11/21] remove changes to run_batch_general --- run_batch_general.py | 66 ++++++++++++++------------------------------ 1 file changed, 20 insertions(+), 46 deletions(-) diff --git a/run_batch_general.py b/run_batch_general.py index 9c899a3..2f4ba5d 100644 --- a/run_batch_general.py +++ b/run_batch_general.py @@ -17,15 +17,13 @@ def scheduleBatch(self, data): print('Batch sent. Message ID:',response.get('MessageId')) #project specific stuff -topdirname='' #Project name (should match the folder structure on S3) +topdirname='' #Project name (should match the folder structure on S3) appname='' #Must match config.py (except for step-specific part) batchsuffix='' #Batch name (should match the folder structure on S3) -input_bucket='' -output_bucket='' rows=list(string.ascii_uppercase)[0:16] columns=range(1,25) sites=range(1,10) -platelist=[] +platelist=[] zprojpipename='Zproj.cppipe' illumpipename='illum.cppipe' qcpipename='qc.cppipe' @@ -66,9 +64,7 @@ def MakeZprojJobs(batch=False): 'output': zprojoutpath, 'output_structure': zprojoutputstructure, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,tozproj,csv_unprojected_name), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(datafilepath,tozproj,csv_unprojected_name) } else: templateMessage_zproj = {'Metadata': 'Metadata_Plate='+tozproj+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite), @@ -76,37 +72,30 @@ def MakeZprojJobs(batch=False): 'output': zprojoutpath, 'output_structure': zprojoutputstructure, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenamezproj), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(batchpath,batchpipenamezproj) } zprojqueue.scheduleBatch(templateMessage_zproj) print('Z projection job submitted. Check your queue') -def MakeIllumJobs(batch=False): +def MakeIllumJobs(batch=False): illumqueue = JobQueue(appname+'_Illum') for toillum in platelist: if not batch: templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum, 'pipeline': posixpath.join(pipelinepath,illumpipename), 'output': illumoutpath, - 'input': inputpath, - 'data_file':posixpath.join(datafilepath,toillum,csvname), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, - } + 'input': inputpath, + 'data_file':posixpath.join(datafilepath,toillum,csvname)} else: templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum, 'pipeline': posixpath.join(batchpath,batchpipenameillum), 'output': illumoutpath, 'input':inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameillum), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(batchpath,batchpipenameillum) } - + illumqueue.scheduleBatch(templateMessage_illum) print('Illum job submitted. Check your queue') @@ -121,18 +110,14 @@ def MakeQCJobs(batch=False): 'pipeline': posixpath.join(pipelinepath,qcpipename), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toqc,csvname), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(datafilepath,toqc,csvname) } else: templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol, 'pipeline': posixpath.join(batchpath,batchpipenameqc), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameqc), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(batchpath,batchpipenameqc) } qcqueue.scheduleBatch(templateMessage_qc) @@ -149,18 +134,14 @@ def MakeQCJobs_persite(batch=False): 'pipeline': posixpath.join(pipelinepath,qcpipename), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toqc,csvname), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(datafilepath,toqc,csvname) } else: templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite), 'pipeline': posixpath.join(batchpath,batchpipenameqc), 'output': QCoutpath, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameqc), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(batchpath,batchpipenameqc) } qcqueue.scheduleBatch(templateMessage_qc) @@ -177,18 +158,14 @@ def MakeAssayDevJobs(batch=False): 'pipeline': posixpath.join(pipelinepath,assaydevpipename), 'output': assaydevoutpath, 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toad,csv_with_illumname), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(datafilepath,toad,csv_with_illumname) } else: templateMessage_ad = {'Metadata': 'Metadata_Plate='+toad+',Metadata_Well='+eachrow+'%02d' %eachcol, 'pipeline': posixpath.join(batchpath,batchpipenameassaydev), 'output': assaydevoutpath, 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameassaydev), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(batchpath,batchpipenameassaydev) } assaydevqueue.scheduleBatch(templateMessage_ad) @@ -206,28 +183,25 @@ def MakeAnalysisJobs(batch=False): 'output': analysisoutpath, 'output_structure':anlysisoutputstructure, 'input':inputpath, - 'data_file': posixpath.join(datafilepath,toanalyze,csv_with_illumname), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, - } + 'data_file': posixpath.join(datafilepath,toanalyze,csv_with_illumname) + } else: templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+'%02d' %eachcol+',Metadata_Site='+str(eachsite), 'pipeline': posixpath.join(batchpath,batchpipenameanalysis), 'output': analysisoutpath, 'output_structure':anlysisoutputstructure, 'input':inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameanalysis), - 'input_bucket': input_bucket, - 'output_bucket': output_bucket, + 'data_file': posixpath.join(batchpath,batchpipenameanalysis) } analysisqueue.scheduleBatch(templateMessage_analysis) print('Analysis job submitted. Check your queue') -#MakeZprojJobs(batch=False) +#MakeZprojJobs(batch=False) #MakeIllumJobs(batch=False) #MakeQCJobs(batch=False) #MakeQCJobs_persite(batch=False) #MakeAssayDevJobs(batch=False) #MakeAnalysisJobs(batch=False) + From fad0284233b07206a1575d23308162bcafdf31eb Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 4 Aug 2022 15:51:30 -0700 Subject: [PATCH 12/21] csv handling to keep structure --- worker/cp-worker.py | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 7cf277f..0d8fc15 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -187,8 +187,15 @@ def runCellProfiler(message): s3client = boto3.client('s3') if not os.path.exists(localIn): os.mkdir(localIn) - s3client.download_file(SOURCE_BUCKET, message['data_file'], os.path.join(localIn,'load_data.csv')) - csv_in = pandas.read_csv(os.path.join(localIn,'load_data.csv')) + printandlog('Downloading ' + message['data_file'] + ' from ' + SOURCE_BUCKET, logger) + csv_insubfolders = message['data_file'].split('/')[-3:] + subfolders = '/'.join((csv_insubfolders)[:-1]) + csv_insubfolders = '/'.join(csv_insubfolders) + csv_name = os.path.join(localIn, csv_insubfolders) + if not os.path.exists(os.path.join(localIn,subfolders)): + os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) + s3client.download_file(SOURCE_BUCKET, message['data_file'], csv_name) + csv_in = pandas.read_csv(os.path.join(localIn,csv_name)) csv_in=csv_in.astype('str') #Figure out what metadata fields we need in this experiment, as a dict if type(message['Metadata'])==dict: @@ -213,28 +220,13 @@ def runCellProfiler(message): os.makedirs(os.path.split(new_file_name)[0]) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): - printandlog(prefix_on_bucket, logger) - printandlog(new_file_name, logger) s3client.download_file(SOURCE_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) - import random - newtag = False - while newtag == False: - tag = str(random.randint(100000,999999)) #keep files from overwriting one another - local_csv_name = os.path.join(localIn,tag,os.path.split(csv_name)[1]) - if not os.path.exists(local_csv_name): - if not os.path.exists(os.path.split(local_csv_name)[0]): - os.makedirs(os.path.split(local_csv_name)[0]) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) - csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) - csv_in.to_csv(local_csv_name,index=False) - print('Wrote updated CSV') - newtag = True - else: - newtag = False - csv_name = local_csv_name - + # Update paths in csv to local paths + csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) + csv_in.to_csv(csv_name,index=False) + print('Updated load_data_csv to local paths') # Build and run CellProfiler command cpDone = localOut + '/cp.is.done' cmdstem = 'cellprofiler -c -r ' From c840b4448a35610a596957ec7c30a485f98f461d Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 4 Aug 2022 22:07:54 -0700 Subject: [PATCH 13/21] pipelines path and run command fixes --- worker/cp-worker.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 0d8fc15..cccd6e1 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -210,23 +210,31 @@ def runCellProfiler(message): csv_in = csv_in[csv_in[eachfilter] == filter_dict[eachfilter]] #Figure out the actual file names and get them channel_list = [x.split('FileName_')[1] for x in csv_in.columns if 'FileName' in x] - printandlog('Downloading files', logger) + printandlog(f'Downloading files for channels {channel_list}', logger) for channel in channel_list: for field in range(csv_in.shape[0]): full_old_file_name = os.path.join(list(csv_in['PathName_'+channel])[field],list(csv_in['FileName_'+channel])[field]) prefix_on_bucket = full_old_file_name.split(DATA_ROOT)[1][1:] new_file_name = os.path.join(localIn,prefix_on_bucket) if not os.path.exists(os.path.split(new_file_name)[0]): - os.makedirs(os.path.split(new_file_name)[0]) + os.makedirs(os.path.split(new_file_name)[0], exist_ok=True) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): s3client.download_file(SOURCE_BUCKET,prefix_on_bucket,new_file_name) + printandlog('Downloading file '+prefix_on_bucket,logger) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) # Update paths in csv to local paths csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) csv_in.to_csv(csv_name,index=False) print('Updated load_data_csv to local paths') + # Download pipeline and update pipeline path in message + printandlog('Downloading ' + message['pipeline'] + ' from ' + SOURCE_BUCKET, logger) + localpipe = os.path.join(localIn, message['pipeline'].split('/')[-1]) + s3client.download_file(SOURCE_BUCKET, message['pipeline'], localpipe) + # Correct locations in CP run command + replaceValues['PL'] = message['pipeline'].split('/')[-1] + replaceValues['DATA'] = localIn # Build and run CellProfiler command cpDone = localOut + '/cp.is.done' cmdstem = 'cellprofiler -c -r ' From 8f5704769e7ab5dac6de3420ccff996a0e02fa70 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 4 Aug 2022 22:28:46 -0700 Subject: [PATCH 14/21] output, destination, same difference --- worker/cp-worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index cccd6e1..cbf0661 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -163,7 +163,7 @@ def runCellProfiler(message): if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') - bucketlist=s3client.list_objects(Bucket=OUTPUT_BUCKET,Prefix=remoteOut+'/') + bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=remoteOut+'/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] if NECESSARY_STRING: @@ -264,7 +264,7 @@ def runCellProfiler(message): while mvtries <3: try: printandlog('Move attempt #'+str(mvtries+1),logger) - cmd = 'aws s3 mv ' + localOut + ' s3://' + OUTPUT_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + cmd = 'aws s3 mv ' + localOut + ' s3://' + DESTINATION_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() From 4ff6463d2ccec65639b4ae89aef021ffc4ac0063 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Fri, 5 Aug 2022 09:17:44 -0700 Subject: [PATCH 15/21] add upload flags --- config.py | 1 + worker/cp-worker.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/config.py b/config.py index 0c2cb8c..b7f96b5 100644 --- a/config.py +++ b/config.py @@ -12,6 +12,7 @@ AWS_BUCKET = 'your-bucket-name' # Bucket to use for logging SOURCE_BUCKET = 'bucket-name' # Bucket to download files from DESTINATION_BUCKET = 'bucket-name' # Bucket to upload files to +UPLOAD_FLAGS = '' # Any flags needed for upload to destination bucket # EC2 AND ECS INFORMATION: ECS_CLUSTER = 'default' diff --git a/worker/cp-worker.py b/worker/cp-worker.py index cbf0661..c45cbc8 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -28,6 +28,8 @@ DESTINATION_BUCKET = os.environ['AWS_BUCKET'] else: DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET'] +if 'UPLOAD_FLAGS' in os.environ: + UPLOAD_FLAGS = os.environ['UPLOAD_FLAGS'] LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] @@ -265,6 +267,9 @@ def runCellProfiler(message): try: printandlog('Move attempt #'+str(mvtries+1),logger) cmd = 'aws s3 mv ' + localOut + ' s3://' + DESTINATION_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + if UPLOAD_FLAGS: + cmd += ' ' + UPLOAD_FLAGS + printandlog('Uploading with command ' + cmd, logger) subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() From 22abe35b765c1bf7b0d9070933f43fca42f78767 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Fri, 5 Aug 2022 12:03:27 -0700 Subject: [PATCH 16/21] but what if it doesn't exist --- worker/cp-worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index c45cbc8..71d7d1c 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -30,6 +30,8 @@ DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET'] if 'UPLOAD_FLAGS' in os.environ: UPLOAD_FLAGS = os.environ['UPLOAD_FLAGS'] +else: + UPLOAD_FLAGS = False LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] From e774d411defed750cd131bb1f37033cdca92f20f Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Fri, 5 Aug 2022 13:56:12 -0700 Subject: [PATCH 17/21] more things in task definition --- run.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/run.py b/run.py index 366d481..90a29a5 100644 --- a/run.py +++ b/run.py @@ -156,6 +156,22 @@ def generate_task_definition(AWS_PROFILE): "value": DOWNLOAD_FILES } ] + if SOURCE_BUCKET: + task_definition['containerDefinitions'][0]['environment'] += [ + { + 'name': 'SOURCE_BUCKET', + 'value': SOURCE_BUCKET + }, + { + 'name': 'DESTINATION_BUCKET', + 'value': DESTINATION_BUCKET + }] + if UPLOAD_FLAGS: + task_definition['containerDefinitions'][0]['environment'] += [ + { + 'name': 'UPLOAD_FLAGS', + 'value': UPLOAD_FLAGS + }] return task_definition, taskRoleArn def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE): From 7d8f4c84c72350ebf2dc148bfcc1d6df4edcbf39 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Thu, 15 Sep 2022 14:06:01 -0700 Subject: [PATCH 18/21] stash changes --- worker/Dockerfile | 10 ++++++---- worker/run-worker.sh | 24 +++++++++++------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/worker/Dockerfile b/worker/Dockerfile index 5c76924..87b2d4d 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -6,9 +6,9 @@ # -FROM cellprofiler/cellprofiler:4.0.6 +FROM cellprofiler/cellprofiler:4.2.4 -# Install S3FS +# Install S3FS RUN apt-get -y update && \ apt-get -y upgrade && \ @@ -17,6 +17,7 @@ RUN apt-get -y update && \ autotools-dev \ g++ \ git \ + jq \ libcurl4-gnutls-dev \ libfuse-dev \ libssl-dev \ @@ -25,6 +26,8 @@ RUN apt-get -y update && \ sysstat \ curl +RUN apt-get install --only-upgrade bash + WORKDIR /usr/local/src RUN git clone https://github.com/s3fs-fuse/s3fs-fuse.git WORKDIR /usr/local/src/s3fs-fuse @@ -35,7 +38,7 @@ RUN make install # Install AWS CLI -RUN python3.8 -m pip install awscli +RUN python3.8 -m pip install awscli # Install boto3 @@ -65,4 +68,3 @@ WORKDIR /home/ubuntu/CellProfiler-plugins WORKDIR /home/ubuntu ENTRYPOINT ["./run-worker.sh"] CMD [""] - diff --git a/worker/run-worker.sh b/worker/run-worker.sh index eb1156d..3f49cdb 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -1,8 +1,8 @@ #!/bin/bash - +echo "${BASH_VERSION}" echo "Region $AWS_REGION" echo "Queue $SQS_QUEUE_URL" -if ! [-v SOURCE_BUCKET] +if [-n "$SOURCE_BUCKET"] then SOURCE_BUCKET=$AWS_BUCKET fi @@ -22,19 +22,17 @@ aws ec2 create-tags --resources $VOL_1_ID --tags Key=Name,Value=${APP_NAME}Worke # 2. MOUNT S3 mkdir -p /home/ubuntu/bucket mkdir -p /home/ubuntu/local_output -if ! [ -v AWS_ACCESS_KEY_ID ] +if [ -n "$AWS_ACCESS_KEY_ID" ] then - declare -A CREDS=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI) - echo "$CREDS" - echo "${CREDS[AccessKeyId]}" - AWS_ACCESS_KEY_ID=${CREDS[AccessKeyId]} - AWS_SECRET_ACCESS_KEY=${CREDS[SecretAccessKey]} - echo "$AWS_ACCESS_KEY_ID"declare -A - echo "$AWS_SECRET_ACCESS_KEY" + AWS_ACCESS_KEY_ID=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI | jq '.AccessKeyId') + AWS_SECRET_ACCESS_KEY=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI | jq '.SecretAccessKey') + echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt + chmod 600 /credentials.txt + stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt -o dbglevel=info +else + stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o ecs -o dbglevel=info fi -echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt -chmod 600 /credentials.txt -stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt + # 3. SET UP ALARMS aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" From e907c6a438d99a5c5b899ad151cb9d9ca044edef Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Wed, 21 Sep 2022 13:20:50 -0700 Subject: [PATCH 19/21] update to AWS CLI2 --- worker/Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/Dockerfile b/worker/Dockerfile index 87b2d4d..9870137 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -38,7 +38,10 @@ RUN make install # Install AWS CLI -RUN python3.8 -m pip install awscli +RUN apt install unzip +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" +RUN unzip awscliv2.zip +RUN ./aws/install # Install boto3 From c66de9e9c7602484204a28cead1012c61d1b5eb3 Mon Sep 17 00:00:00 2001 From: ErinWeisbart Date: Wed, 5 Oct 2022 11:53:51 -0700 Subject: [PATCH 20/21] the right way to check for variable, finally --- worker/run-worker.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/run-worker.sh b/worker/run-worker.sh index 3f49cdb..21a0ac7 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -2,7 +2,7 @@ echo "${BASH_VERSION}" echo "Region $AWS_REGION" echo "Queue $SQS_QUEUE_URL" -if [-n "$SOURCE_BUCKET"] +if [[ -z "$AWS_BUCKET" ]] then SOURCE_BUCKET=$AWS_BUCKET fi @@ -22,7 +22,7 @@ aws ec2 create-tags --resources $VOL_1_ID --tags Key=Name,Value=${APP_NAME}Worke # 2. MOUNT S3 mkdir -p /home/ubuntu/bucket mkdir -p /home/ubuntu/local_output -if [ -n "$AWS_ACCESS_KEY_ID" ] +if [[ -z "$AWS_ACCESS_KEY_ID" ]] then AWS_ACCESS_KEY_ID=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI | jq '.AccessKeyId') AWS_SECRET_ACCESS_KEY=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI | jq '.SecretAccessKey') From 2b95a3692e4a46ed9b9f82f00c1dc5410db55835 Mon Sep 17 00:00:00 2001 From: Erin Weisbart <54687786+ErinWeisbart@users.noreply.github.com> Date: Fri, 20 Jan 2023 16:55:11 -0800 Subject: [PATCH 21/21] Update exampleJob.json fix variable names --- files/exampleJob.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/files/exampleJob.json b/files/exampleJob.json index 0b477eb..55d5691 100644 --- a/files/exampleJob.json +++ b/files/exampleJob.json @@ -1,6 +1,6 @@ { "_comment1": "Paths in this file are relative to the root of S3 buckets", - "_comment2": "pipeline, data_file, and input are relative to INPUT_BUCKET; output to OUTPUT_BUCKET", + "_comment2": "pipeline, data_file, and input are relative to SOURCE_BUCKET; output to DESTINATION_BUCKET", "pipeline": "projects/analysis.cppipe", "data_file": "projects/list_of_images.csv", "input": "projects/input/",