diff --git a/config.py b/config.py index f6af671..38d489f 100644 --- a/config.py +++ b/config.py @@ -9,7 +9,10 @@ AWS_REGION = 'us-east-1' AWS_PROFILE = 'default' # The same profile used by your AWS CLI installation SSH_KEY_NAME = 'your-key-file.pem' # Expected to be in ~/.ssh -AWS_BUCKET = 'your-bucket-name' +AWS_BUCKET = 'your-bucket-name' # Bucket to use for logging +SOURCE_BUCKET = 'bucket-name' # Bucket to download files from +DESTINATION_BUCKET = 'bucket-name' # Bucket to upload files to +UPLOAD_FLAGS = '' # Any flags needed for upload to destination bucket # EC2 AND ECS INFORMATION: ECS_CLUSTER = 'default' @@ -32,7 +35,7 @@ SQS_DEAD_LETTER_QUEUE = 'arn:aws:sqs:some-region:111111100000:DeadMessages' # LOG GROUP INFORMATION: -LOG_GROUP_NAME = APP_NAME +LOG_GROUP_NAME = APP_NAME # REDUNDANCY CHECKS CHECK_IF_DONE_BOOL = 'False' #True or False- should it check if there are a certain number of non-empty files and delete the job if yes? diff --git a/files/exampleJob.json b/files/exampleJob.json index 64c57be..55d5691 100644 --- a/files/exampleJob.json +++ b/files/exampleJob.json @@ -1,7 +1,8 @@ { - "_comment1": "Paths in this file are relative to the root of your S3 bucket", - "pipeline": "projects/analysis.cppipe", - "data_file": "projects/list_of_images.csv", + "_comment1": "Paths in this file are relative to the root of S3 buckets", + "_comment2": "pipeline, data_file, and input are relative to SOURCE_BUCKET; output to DESTINATION_BUCKET", + "pipeline": "projects/analysis.cppipe", + "data_file": "projects/list_of_images.csv", "input": "projects/input/", "output": "projects/output/", "output_structure": "Metadata_Plate-Metadata_Well-Metadata_Site", @@ -11,4 +12,3 @@ {"Metadata": "Metadata_Plate=Plate1,Metadata_Well=A01,Metadata_Site=2"} ] } - diff --git a/run.py b/run.py index 0d77f8d..e956536 100644 --- a/run.py +++ b/run.py @@ -5,12 +5,12 @@ import json import time from base64 import b64encode +import configparser from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from config import * - WAIT_TIME = 60 MONITOR_TIME = 60 @@ -23,7 +23,12 @@ "family": APP_NAME, "containerDefinitions": [ { - "environment": [{"name": "AWS_REGION", "value": AWS_REGION}], + "environment": [ + { + "name": "AWS_REGION", + "value": AWS_REGION + } + ], "name": APP_NAME, "image": DOCKERHUB_TAG, "cpu": CPU_SHARES, @@ -33,13 +38,13 @@ "logConfiguration": { "logDriver": "awslogs", "options": { - "awslogs-group": LOG_GROUP_NAME + "_perInstance", + "awslogs-group": LOG_GROUP_NAME+"_perInstance", "awslogs-region": AWS_REGION, - "awslogs-stream-prefix": APP_NAME, - }, - }, + "awslogs-stream-prefix": APP_NAME + } + } } - ], + ] } SQS_DEFINITION = { @@ -47,10 +52,8 @@ "MaximumMessageSize": "262144", "MessageRetentionPeriod": "1209600", "ReceiveMessageWaitTimeSeconds": "0", - "RedrivePolicy": '{"deadLetterTargetArn":"' - + SQS_DEAD_LETTER_QUEUE - + '","maxReceiveCount":"10"}', - "VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY), + "RedrivePolicy": "{\"deadLetterTargetArn\":\"" + SQS_DEAD_LETTER_QUEUE + "\",\"maxReceiveCount\":\"10\"}", + "VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY) } @@ -58,279 +61,288 @@ # AUXILIARY FUNCTIONS ################################# +def generate_task_definition(AWS_PROFILE): + taskRoleArn = False + task_definition = TASK_DEFINITION.copy() -def get_aws_credentials(AWS_PROFILE): - session = boto3.Session(profile_name=AWS_PROFILE) - credentials = session.get_credentials() - return credentials.access_key, credentials.secret_key + config = configparser.ConfigParser() + config.read(f"{os.environ['HOME']}/.aws/config") + if config.has_section(AWS_PROFILE): + profile_name = AWS_PROFILE + elif config.has_section(f'profile {AWS_PROFILE}'): + profile_name = f'profile {AWS_PROFILE}' + else: + print ('Problem handling profile') -def generate_task_definition(AWS_PROFILE): - task_definition = TASK_DEFINITION.copy() - key, secret = get_aws_credentials(AWS_PROFILE) - sqs = boto3.client("sqs") - queue_name = get_queue_url(sqs) - task_definition["containerDefinitions"][0]["environment"] += [ - {"name": "APP_NAME", "value": APP_NAME}, - {"name": "SQS_QUEUE_URL", "value": queue_name}, - {"name": "AWS_ACCESS_KEY_ID", "value": key}, - {"name": "AWS_SECRET_ACCESS_KEY", "value": secret}, - {"name": "AWS_BUCKET", "value": AWS_BUCKET}, - {"name": "DOCKER_CORES", "value": str(DOCKER_CORES)}, - {"name": "LOG_GROUP_NAME", "value": LOG_GROUP_NAME}, - {"name": "CHECK_IF_DONE_BOOL", "value": CHECK_IF_DONE_BOOL}, - {"name": "EXPECTED_NUMBER_FILES", "value": str(EXPECTED_NUMBER_FILES)}, - {"name": "ECS_CLUSTER", "value": ECS_CLUSTER}, - {"name": "SECONDS_TO_START", "value": str(SECONDS_TO_START)}, - {"name": "MIN_FILE_SIZE_BYTES", "value": str(MIN_FILE_SIZE_BYTES)}, - {"name": "USE_PLUGINS", "value": str(USE_PLUGINS)}, - {"name": "NECESSARY_STRING", "value": NECESSARY_STRING}, - {"name": "DOWNLOAD_FILES", "value": DOWNLOAD_FILES}, - ] - if UPDATE_PLUGINS: - task_definition["containerDefinitions"][0]["environment"] += [ - {"name": "UPDATE_PLUGINS", "value": str(UPDATE_PLUGINS)}, - {"name": "PLUGINS_COMMIT", "value": str(PLUGINS_COMMIT)}, - {"name": "INSTALL_REQUIREMENTS", "value": str(INSTALL_REQUIREMENTS)}, - {"name": "REQUIREMENTS_FILE", "value": str(REQUIREMENTS_FILE)}, - ] + if config.has_option(profile_name, 'role_arn'): + print ("Using role for credentials", config[profile_name]['role_arn']) + taskRoleArn = config[profile_name]['role_arn'] else: - task_definition["containerDefinitions"][0]["environment"] += [ - {"name": "UPDATE_PLUGINS", "value": "False"}, - {"name": "INSTALL_REQUIREMENTS", "value": "False"}, - ] - return task_definition + if config.has_option(profile_name, 'source_profile'): + creds = configparser.ConfigParser() + creds.read(f"{os.environ['HOME']}/.aws/credentials") + source_profile = config[profile_name]['source_profile'] + aws_access_key = creds[source_profile]['aws_access_key_id'] + aws_secret_key = creds[source_profile]['aws_secret_access_key'] + elif config.has_option(profile_name, 'aws_access_key_id'): + aws_access_key = config[profile_name]['aws_access_key_id'] + aws_secret_key = config[profile_name]['aws_secret_access_key'] + else: + print ("Problem getting credentials") + task_definition['containerDefinitions'][0]['environment'] += [ + { + "name": "AWS_ACCESS_KEY_ID", + "value": aws_access_key + }, + { + "name": "AWS_SECRET_ACCESS_KEY", + "value": aws_secret_key + }] + sqs = boto3.client('sqs') + queue_name = get_queue_url(sqs) + task_definition['containerDefinitions'][0]['environment'] += [ + { + 'name': 'APP_NAME', + 'value': APP_NAME + }, + { + 'name': 'SQS_QUEUE_URL', + 'value': queue_name + }, + { + "name": "AWS_BUCKET", + "value": AWS_BUCKET + }, + { + "name": "DOCKER_CORES", + "value": str(DOCKER_CORES) + }, + { + "name": "LOG_GROUP_NAME", + "value": LOG_GROUP_NAME + }, + { + "name": "CHECK_IF_DONE_BOOL", + "value": CHECK_IF_DONE_BOOL + }, + { + "name": "EXPECTED_NUMBER_FILES", + "value": str(EXPECTED_NUMBER_FILES) + }, + { + "name": "ECS_CLUSTER", + "value": ECS_CLUSTER + }, + { + "name": "SECONDS_TO_START", + "value": str(SECONDS_TO_START) + }, + { + "name": "MIN_FILE_SIZE_BYTES", + "value": str(MIN_FILE_SIZE_BYTES) + }, + { + "name": "USE_PLUGINS", + "value": str(USE_PLUGINS) + }, + { + "name": "NECESSARY_STRING", + "value": NECESSARY_STRING + }, + { + "name": "DOWNLOAD_FILES", + "value": DOWNLOAD_FILES + } + ] + if SOURCE_BUCKET: + task_definition['containerDefinitions'][0]['environment'] += [ + { + 'name': 'SOURCE_BUCKET', + 'value': SOURCE_BUCKET + }, + { + 'name': 'DESTINATION_BUCKET', + 'value': DESTINATION_BUCKET + }] + if UPLOAD_FLAGS: + task_definition['containerDefinitions'][0]['environment'] += [ + { + 'name': 'UPLOAD_FLAGS', + 'value': UPLOAD_FLAGS + }] + return task_definition, taskRoleArn def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE): - task_definition = generate_task_definition(AWS_PROFILE) - ecs.register_task_definition( - family=ECS_TASK_NAME, - containerDefinitions=task_definition["containerDefinitions"], - ) - print("Task definition registered") - + task_definition, taskRoleArn = generate_task_definition(AWS_PROFILE) + if not taskRoleArn: + ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions']) + elif taskRoleArn: + ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions'],taskRoleArn=taskRoleArn) + else: + print('Mistake in handling role for Task Definition.') + print('Task definition registered') def get_or_create_cluster(ecs): data = ecs.list_clusters() - cluster = [clu for clu in data["clusterArns"] if clu.endswith(ECS_CLUSTER)] + cluster = [clu for clu in data['clusterArns'] if clu.endswith(ECS_CLUSTER)] if len(cluster) == 0: ecs.create_cluster(clusterName=ECS_CLUSTER) time.sleep(WAIT_TIME) - print("Cluster " + ECS_CLUSTER + " created") + print('Cluster '+ECS_CLUSTER+' created') else: - print("Cluster " + ECS_CLUSTER + " exists") - + print('Cluster '+ECS_CLUSTER+' exists') def create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME): # Create the service with no workers (0 desired count) data = ecs.list_services(cluster=ECS_CLUSTER) - service = [srv for srv in data["serviceArns"] if srv.endswith(ECS_SERVICE_NAME)] + service = [srv for srv in data['serviceArns'] if srv.endswith(ECS_SERVICE_NAME)] if len(service) > 0: - print("Service exists. Removing") + print('Service exists. Removing') ecs.delete_service(cluster=ECS_CLUSTER, service=ECS_SERVICE_NAME) - print("Removed service " + ECS_SERVICE_NAME) + print('Removed service '+ECS_SERVICE_NAME) time.sleep(WAIT_TIME) - print("Creating new service") - ecs.create_service( - cluster=ECS_CLUSTER, - serviceName=ECS_SERVICE_NAME, - taskDefinition=ECS_TASK_NAME, - desiredCount=0, - ) - print("Service created") - + print('Creating new service') + ecs.create_service(cluster=ECS_CLUSTER, serviceName=ECS_SERVICE_NAME, taskDefinition=ECS_TASK_NAME, desiredCount=0) + print('Service created') def get_queue_url(sqs): result = sqs.list_queues() - if "QueueUrls" in result.keys(): - for u in result["QueueUrls"]: - if u.split("/")[-1] == SQS_QUEUE_NAME: + if 'QueueUrls' in result.keys(): + for u in result['QueueUrls']: + if u.split('/')[-1] == SQS_QUEUE_NAME: return u return None - def get_or_create_queue(sqs): u = get_queue_url(sqs) if u is None: - print("Creating queue") + print('Creating queue') sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION) time.sleep(WAIT_TIME) else: - print("Queue exists") - + print('Queue exists') def loadConfig(configFile): data = None - with open(configFile, "r") as conf: + with open(configFile, 'r') as conf: data = json.load(conf) return data +def killdeadAlarms(fleetId,monitorapp,ec2,cloud): + todel=[] + changes = ec2.describe_spot_fleet_request_history(SpotFleetRequestId=fleetId,StartTime=(datetime.datetime.now()-datetime.timedelta(hours=2)).replace(microsecond=0)) + for eachevent in changes['HistoryRecords']: + if eachevent['EventType']=='instanceChange': + if eachevent['EventInformation']['EventSubType']=='terminated': + todel.append(eachevent['EventInformation']['InstanceId']) -def killdeadAlarms(fleetId, monitorapp, ec2, cloud): - todel = [] - changes = ec2.describe_spot_fleet_request_history( - SpotFleetRequestId=fleetId, - StartTime=(datetime.datetime.now() - datetime.timedelta(hours=2)).replace( - microsecond=0 - ), - ) - for eachevent in changes["HistoryRecords"]: - if eachevent["EventType"] == "instanceChange": - if eachevent["EventInformation"]["EventSubType"] == "terminated": - todel.append(eachevent["EventInformation"]["InstanceId"]) - - existing_alarms = [ - x["AlarmName"] - for x in cloud.describe_alarms(AlarmNamePrefix=monitorapp)["MetricAlarms"] - ] + existing_alarms = [x['AlarmName'] for x in cloud.describe_alarms(AlarmNamePrefix=monitorapp)['MetricAlarms']] for eachmachine in todel: - monitorname = monitorapp + "_" + eachmachine + monitorname = monitorapp+'_'+eachmachine if monitorname in existing_alarms: cloud.delete_alarms(AlarmNames=[monitorname]) - print("Deleted", monitorname, "if it existed") + print('Deleted', monitorname, 'if it existed') time.sleep(3) - print("Old alarms deleted") + print('Old alarms deleted') - -def generateECSconfig(ECS_CLUSTER, APP_NAME, AWS_BUCKET, s3client): - configfile = open("configtemp.config", "w") - configfile.write("ECS_CLUSTER=" + ECS_CLUSTER + "\n") +def generateECSconfig(ECS_CLUSTER,APP_NAME,AWS_BUCKET,s3client): + configfile=open('configtemp.config','w') + configfile.write('ECS_CLUSTER='+ECS_CLUSTER+'\n') configfile.write('ECS_AVAILABLE_LOGGING_DRIVERS=["json-file","awslogs"]') configfile.close() - s3client.upload_file( - "configtemp.config", AWS_BUCKET, "ecsconfigs/" + APP_NAME + "_ecs.config" - ) - os.remove("configtemp.config") - return "s3://" + AWS_BUCKET + "/ecsconfigs/" + APP_NAME + "_ecs.config" - - -def generateUserData(ecsConfigFile, dockerBaseSize): - config_str = "#!/bin/bash \n" - config_str += "sudo yum install -y aws-cli \n" - config_str += "sudo yum install -y awslogs \n" - config_str += "aws s3 cp " + ecsConfigFile + " /etc/ecs/ecs.config" - - boothook_str = "#!/bin/bash \n" - boothook_str += ( - "echo 'OPTIONS=" - + '"${OPTIONS} --storage-opt dm.basesize=' - + str(dockerBaseSize) - + 'G"' - + "' >> /etc/sysconfig/docker" - ) - - config = MIMEText(config_str, _subtype="x-shellscript") - config.add_header("Content-Disposition", "attachment", filename="config_temp.txt") - - boothook = MIMEText(boothook_str, _subtype="cloud-boothook") - boothook.add_header( - "Content-Disposition", "attachment", filename="boothook_temp.txt" - ) + s3client.upload_file('configtemp.config',AWS_BUCKET,'ecsconfigs/'+APP_NAME+'_ecs.config') + os.remove('configtemp.config') + return 's3://'+AWS_BUCKET+'/ecsconfigs/'+APP_NAME+'_ecs.config' + +def generateUserData(ecsConfigFile,dockerBaseSize): + config_str = '#!/bin/bash \n' + config_str += 'sudo yum install -y aws-cli \n' + config_str += 'sudo yum install -y awslogs \n' + config_str += 'aws s3 cp '+ecsConfigFile+' /etc/ecs/ecs.config' + + boothook_str = '#!/bin/bash \n' + boothook_str += "echo 'OPTIONS="+'"${OPTIONS} --storage-opt dm.basesize='+str(dockerBaseSize)+'G"'+"' >> /etc/sysconfig/docker" + + config = MIMEText(config_str, _subtype='x-shellscript') + config.add_header('Content-Disposition', 'attachment',filename='config_temp.txt') + + boothook = MIMEText(boothook_str, _subtype='cloud-boothook') + boothook.add_header('Content-Disposition', 'attachment',filename='boothook_temp.txt') pre_user_data = MIMEMultipart() pre_user_data.attach(boothook) pre_user_data.attach(config) - try: # Python2 + try: #Python2 return b64encode(pre_user_data.as_string()) - except TypeError: # Python3 + except TypeError: #Python3 pre_user_data_string = pre_user_data.as_string() - return b64encode(pre_user_data_string.encode("utf-8")).decode("utf-8") - + return b64encode(pre_user_data_string.encode('utf-8')).decode('utf-8') def removequeue(queueName): - sqs = boto3.client("sqs") - queueoutput = sqs.list_queues(QueueNamePrefix=queueName) - if len(queueoutput["QueueUrls"]) == 1: - queueUrl = queueoutput["QueueUrls"][0] - else: # In case we have "AnalysisQueue" and "AnalysisQueue1" and only want to delete the first of those + sqs = boto3.client('sqs') + queueoutput= sqs.list_queues(QueueNamePrefix=queueName) + if len(queueoutput["QueueUrls"])==1: + queueUrl=queueoutput["QueueUrls"][0] + else: #In case we have "AnalysisQueue" and "AnalysisQueue1" and only want to delete the first of those for eachUrl in queueoutput["QueueUrls"]: - if eachUrl.split("/")[-1] == queueName: - queueUrl = eachUrl + if eachUrl.split('/')[-1] == queueName: + queueUrl=eachUrl sqs.delete_queue(QueueUrl=queueUrl) - def deregistertask(taskName, ecs): - taskArns = ecs.list_task_definitions(familyPrefix=taskName, status="ACTIVE") - for eachtask in taskArns["taskDefinitionArns"]: - fulltaskname = eachtask.split("/")[-1] + taskArns = ecs.list_task_definitions(familyPrefix=taskName, status='ACTIVE') + for eachtask in taskArns['taskDefinitionArns']: + fulltaskname=eachtask.split('/')[-1] ecs.deregister_task_definition(taskDefinition=fulltaskname) - def removeClusterIfUnused(clusterName, ecs): - if clusterName != "default": - # never delete the default cluster + if clusterName != 'default': + #never delete the default cluster result = ecs.describe_clusters(clusters=[clusterName]) - if ( - sum( - [ - result["clusters"][0]["pendingTasksCount"], - result["clusters"][0]["runningTasksCount"], - result["clusters"][0]["activeServicesCount"], - result["clusters"][0]["registeredContainerInstancesCount"], - ] - ) - == 0 - ): + if sum([result['clusters'][0]['pendingTasksCount'],result['clusters'][0]['runningTasksCount'],result['clusters'][0]['activeServicesCount'],result['clusters'][0]['registeredContainerInstancesCount']])==0: ecs.delete_cluster(cluster=clusterName) - def downscaleSpotFleet(queue, spotFleetID, ec2, manual=False): visible, nonvisible = queue.returnLoad() if manual: - ec2.modify_spot_fleet_request( - ExcessCapacityTerminationPolicy="noTermination", - SpotFleetRequestId=spotFleetID, - TargetCapacity=int(manual), - ) + ec2.modify_spot_fleet_request(ExcessCapacityTerminationPolicy='noTermination', SpotFleetRequestId=spotFleetID, TargetCapacity = int(manual)) return elif visible > 0: return else: status = ec2.describe_spot_fleet_instances(SpotFleetRequestId=spotFleetID) - if nonvisible < len(status["ActiveInstances"]): - ec2.modify_spot_fleet_request( - ExcessCapacityTerminationPolicy="noTermination", - SpotFleetRequestId=spotFleetID, - TargetCapacity=nonvisible, - ) - + if nonvisible < len(status['ActiveInstances']): + ec2.modify_spot_fleet_request(ExcessCapacityTerminationPolicy='noTermination', SpotFleetRequestId=spotFleetID, TargetCapacity = nonvisible) def export_logs(logs, loggroupId, starttime, bucketId): - result = logs.create_export_task( - taskName=loggroupId, - logGroupName=loggroupId, - fromTime=int(starttime), - to=int(time.time() * 1000), - destination=bucketId, - destinationPrefix="exportedlogs/" + loggroupId, - ) + result = logs.create_export_task(taskName = loggroupId, logGroupName = loggroupId, fromTime = int(starttime), to = int(time.time()*1000), destination = bucketId, destinationPrefix = 'exportedlogs/'+loggroupId) - logExportId = result["taskId"] + logExportId = result['taskId'] while True: - result = logs.describe_export_tasks(taskId=logExportId) - if result["exportTasks"][0]["status"]["code"] != "PENDING": - if result["exportTasks"][0]["status"]["code"] != "RUNNING": - print(result["exportTasks"][0]["status"]["code"]) + result = logs.describe_export_tasks(taskId = logExportId) + if result['exportTasks'][0]['status']['code']!='PENDING': + if result['exportTasks'][0]['status']['code']!='RUNNING': + print(result['exportTasks'][0]['status']['code']) break time.sleep(30) - ################################# # CLASS TO HANDLE SQS QUEUE ################################# +class JobQueue(): -class JobQueue: - def __init__(self, name=None): - self.sqs = boto3.resource("sqs") - if name == None: + def __init__(self,name=None): + self.sqs = boto3.resource('sqs') + if name==None: self.queue = self.sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME) else: self.queue = self.sqs.get_queue_by_name(QueueName=name) @@ -340,17 +352,17 @@ def __init__(self, name=None): def scheduleBatch(self, data): msg = json.dumps(data) response = self.queue.send_message(MessageBody=msg) - print("Batch sent. Message ID:", response.get("MessageId")) + print('Batch sent. Message ID:',response.get('MessageId')) def pendingLoad(self): self.queue.load() - visible = int(self.queue.attributes["ApproximateNumberOfMessages"]) - nonVis = int(self.queue.attributes["ApproximateNumberOfMessagesNotVisible"]) - if [visible, nonVis] != [self.pending, self.inProcess]: + visible = int( self.queue.attributes['ApproximateNumberOfMessages'] ) + nonVis = int( self.queue.attributes['ApproximateNumberOfMessagesNotVisible'] ) + if [visible, nonVis] != [self.pending,self.inProcess]: self.pending = visible self.inProcess = nonVis d = datetime.datetime.now() - print(d, "In process:", nonVis, "Pending", visible) + print(d,'In process:',nonVis,'Pending',visible) if visible + nonVis > 0: return True else: @@ -358,8 +370,8 @@ def pendingLoad(self): def returnLoad(self): self.queue.load() - visible = int(self.queue.attributes["ApproximateNumberOfMessages"]) - nonVis = int(self.queue.attributes["ApproximateNumberOfMessagesNotVisible"]) + visible = int( self.queue.attributes['ApproximateNumberOfMessages'] ) + nonVis = int( self.queue.attributes['ApproximateNumberOfMessagesNotVisible'] ) return visible, nonVis @@ -367,210 +379,165 @@ def returnLoad(self): # SERVICE 1: SETUP (formerly fab) ################################# - def setup(): - ECS_TASK_NAME = APP_NAME + "Task" - ECS_SERVICE_NAME = APP_NAME + "Service" - USER = os.environ["HOME"].split("/")[-1] - AWS_CONFIG_FILE_NAME = os.environ["HOME"] + "/.aws/config" - AWS_CREDENTIAL_FILE_NAME = os.environ["HOME"] + "/.aws/credentials" - sqs = boto3.client("sqs") + ECS_TASK_NAME = APP_NAME + 'Task' + ECS_SERVICE_NAME = APP_NAME + 'Service' + USER = os.environ['HOME'].split('/')[-1] + AWS_CONFIG_FILE_NAME = os.environ['HOME'] + '/.aws/config' + AWS_CREDENTIAL_FILE_NAME = os.environ['HOME'] + '/.aws/credentials' + sqs = boto3.client('sqs') get_or_create_queue(sqs) - ecs = boto3.client("ecs") + ecs = boto3.client('ecs') get_or_create_cluster(ecs) update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE) create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME) - ################################# # SERVICE 2: SUBMIT JOB ################################# - def submitJob(): if len(sys.argv) < 3: - print("Use: run.py submitJob jobfile") + print('Use: run.py submitJob jobfile') sys.exit() # Step 1: Read the job configuration file jobInfo = loadConfig(sys.argv[2]) - if "output_structure" not in jobInfo.keys(): # backwards compatibility for 1.0.0 - jobInfo["output_structure"] = "" - templateMessage = { - "Metadata": "", - "pipeline": jobInfo["pipeline"], - "output": jobInfo["output"], - "input": jobInfo["input"], - "data_file": jobInfo["data_file"], - "output_structure": jobInfo["output_structure"], - } + if 'output_structure' not in jobInfo.keys(): #backwards compatibility for 1.0.0 + jobInfo["output_structure"]='' + templateMessage = {'Metadata': '', + 'pipeline': jobInfo["pipeline"], + 'output': jobInfo["output"], + 'input': jobInfo["input"], + 'data_file': jobInfo["data_file"], + 'output_structure':jobInfo["output_structure"] + } # Step 2: Reach the queue and schedule tasks - print("Contacting queue") + print('Contacting queue') queue = JobQueue() - print("Scheduling tasks") + print('Scheduling tasks') for batch in jobInfo["groups"]: - # support Metadata passed as either a single string or as a list - try: # single string ('canonical' DCP) + #support Metadata passed as either a single string or as a list + try: #single string ('canonical' DCP) templateMessage["Metadata"] = batch["Metadata"] - except KeyError: # list of parameters (cellprofiler --print-groups) + except KeyError: #list of parameters (cellprofiler --print-groups) templateMessage["Metadata"] = batch queue.scheduleBatch(templateMessage) - print("Job submitted. Check your queue") - + print('Job submitted. Check your queue') ################################# # SERVICE 3: START CLUSTER ################################# - def startCluster(): if len(sys.argv) < 3: - print("Use: run.py startCluster configFile") + print('Use: run.py startCluster configFile') sys.exit() thistime = datetime.datetime.now().replace(microsecond=0) - # Step 1: set up the configuration files - s3client = boto3.client("s3") - ecsConfigFile = generateECSconfig(ECS_CLUSTER, APP_NAME, AWS_BUCKET, s3client) - spotfleetConfig = loadConfig(sys.argv[2]) - spotfleetConfig["ValidFrom"] = thistime - spotfleetConfig["ValidUntil"] = (thistime + datetime.timedelta(days=365)).replace( - microsecond=0 - ) - spotfleetConfig["TargetCapacity"] = CLUSTER_MACHINES - spotfleetConfig["SpotPrice"] = "%.2f" % MACHINE_PRICE - DOCKER_BASE_SIZE = int(round(float(EBS_VOL_SIZE) / int(TASKS_PER_MACHINE))) - 2 - userData = generateUserData(ecsConfigFile, DOCKER_BASE_SIZE) - for LaunchSpecification in range(0, len(spotfleetConfig["LaunchSpecifications"])): - spotfleetConfig["LaunchSpecifications"][LaunchSpecification][ - "UserData" - ] = userData - spotfleetConfig["LaunchSpecifications"][LaunchSpecification][ - "BlockDeviceMappings" - ][1]["Ebs"]["VolumeSize"] = EBS_VOL_SIZE - spotfleetConfig["LaunchSpecifications"][LaunchSpecification][ - "InstanceType" - ] = MACHINE_TYPE[LaunchSpecification] + #Step 1: set up the configuration files + s3client = boto3.client('s3') + ecsConfigFile=generateECSconfig(ECS_CLUSTER,APP_NAME,AWS_BUCKET,s3client) + spotfleetConfig=loadConfig(sys.argv[2]) + spotfleetConfig['ValidFrom']=thistime + spotfleetConfig['ValidUntil']=(thistime+datetime.timedelta(days=365)).replace(microsecond=0) + spotfleetConfig['TargetCapacity']= CLUSTER_MACHINES + spotfleetConfig['SpotPrice'] = '%.2f' %MACHINE_PRICE + DOCKER_BASE_SIZE = int(round(float(EBS_VOL_SIZE)/int(TASKS_PER_MACHINE))) - 2 + userData=generateUserData(ecsConfigFile,DOCKER_BASE_SIZE) + for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])): + spotfleetConfig['LaunchSpecifications'][LaunchSpecification]["UserData"]=userData + spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['BlockDeviceMappings'][1]['Ebs']["VolumeSize"]= EBS_VOL_SIZE + spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['InstanceType'] = MACHINE_TYPE[LaunchSpecification] + # Step 2: make the spot fleet request - ec2client = boto3.client("ec2") + ec2client=boto3.client('ec2') requestInfo = ec2client.request_spot_fleet(SpotFleetRequestConfig=spotfleetConfig) - print("Request in process. Wait until your machines are available in the cluster.") - print("SpotFleetRequestId", requestInfo["SpotFleetRequestId"]) + print('Request in process. Wait until your machines are available in the cluster.') + print('SpotFleetRequestId',requestInfo['SpotFleetRequestId']) # Step 3: Make the monitor - starttime = str(int(time.time() * 1000)) - createMonitor = open("files/" + APP_NAME + "SpotFleetRequestId.json", "w") - createMonitor.write( - '{"MONITOR_FLEET_ID" : "' + requestInfo["SpotFleetRequestId"] + '",\n' - ) - createMonitor.write('"MONITOR_APP_NAME" : "' + APP_NAME + '",\n') - createMonitor.write('"MONITOR_ECS_CLUSTER" : "' + ECS_CLUSTER + '",\n') - createMonitor.write('"MONITOR_QUEUE_NAME" : "' + SQS_QUEUE_NAME + '",\n') - createMonitor.write('"MONITOR_BUCKET_NAME" : "' + AWS_BUCKET + '",\n') - createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "' + LOG_GROUP_NAME + '",\n') - createMonitor.write('"MONITOR_START_TIME" : "' + starttime + '"}\n') + starttime=str(int(time.time()*1000)) + createMonitor=open('files/' + APP_NAME + 'SpotFleetRequestId.json','w') + createMonitor.write('{"MONITOR_FLEET_ID" : "'+requestInfo['SpotFleetRequestId']+'",\n') + createMonitor.write('"MONITOR_APP_NAME" : "'+APP_NAME+'",\n') + createMonitor.write('"MONITOR_ECS_CLUSTER" : "'+ECS_CLUSTER+'",\n') + createMonitor.write('"MONITOR_QUEUE_NAME" : "'+SQS_QUEUE_NAME+'",\n') + createMonitor.write('"MONITOR_BUCKET_NAME" : "'+AWS_BUCKET+'",\n') + createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "'+LOG_GROUP_NAME+'",\n') + createMonitor.write('"MONITOR_START_TIME" : "'+ starttime+'"}\n') createMonitor.close() # Step 4: Create a log group for this app and date if one does not already exist - logclient = boto3.client("logs") - loggroupinfo = logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME) - groupnames = [d["logGroupName"] for d in loggroupinfo["logGroups"]] + logclient=boto3.client('logs') + loggroupinfo=logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME) + groupnames=[d['logGroupName'] for d in loggroupinfo['logGroups']] if LOG_GROUP_NAME not in groupnames: logclient.create_log_group(logGroupName=LOG_GROUP_NAME) logclient.put_retention_policy(logGroupName=LOG_GROUP_NAME, retentionInDays=60) - if LOG_GROUP_NAME + "_perInstance" not in groupnames: - logclient.create_log_group(logGroupName=LOG_GROUP_NAME + "_perInstance") - logclient.put_retention_policy( - logGroupName=LOG_GROUP_NAME + "_perInstance", retentionInDays=60 - ) + if LOG_GROUP_NAME+'_perInstance' not in groupnames: + logclient.create_log_group(logGroupName=LOG_GROUP_NAME+'_perInstance') + logclient.put_retention_policy(logGroupName=LOG_GROUP_NAME+'_perInstance', retentionInDays=60) # Step 5: update the ECS service to be ready to inject docker containers in EC2 instances - print("Updating service") - ecs = boto3.client("ecs") - ecs.update_service( - cluster=ECS_CLUSTER, - service=APP_NAME + "Service", - desiredCount=CLUSTER_MACHINES * TASKS_PER_MACHINE, - ) - print("Service updated.") + print('Updating service') + ecs = boto3.client('ecs') + ecs.update_service(cluster=ECS_CLUSTER, service=APP_NAME+'Service', desiredCount=CLUSTER_MACHINES*TASKS_PER_MACHINE) + print('Service updated.') # Step 6: Monitor the creation of the instances until all are present - status = ec2client.describe_spot_fleet_instances( - SpotFleetRequestId=requestInfo["SpotFleetRequestId"] - ) - # time.sleep(15) # This is now too fast, so sometimes the spot fleet request history throws an error! - while len(status["ActiveInstances"]) < CLUSTER_MACHINES: + status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId']) + #time.sleep(15) # This is now too fast, so sometimes the spot fleet request history throws an error! + while len(status['ActiveInstances']) < CLUSTER_MACHINES: # First check to make sure there's not a problem - errorcheck = ec2client.describe_spot_fleet_request_history( - SpotFleetRequestId=requestInfo["SpotFleetRequestId"], - EventType="error", - StartTime=thistime - datetime.timedelta(minutes=1), - ) - if len(errorcheck["HistoryRecords"]) != 0: - print( - "Your spot fleet request is causing an error and is now being cancelled. Please check your configuration and try again" - ) - for eacherror in errorcheck["HistoryRecords"]: - print( - eacherror["EventInformation"]["EventSubType"] - + " : " - + eacherror["EventInformation"]["EventDescription"] - ) - # If there's only one error, and it's the type we see for insufficient capacity (but also other types) - # AND if there are some machines on, indicating that other than capacity the spec is otherwise good, don't cancel - if len(errorcheck["HistoryRecords"]) == 1: - if ( - errorcheck["HistoryRecords"][0]["EventInformation"]["EventSubType"] - == "allLaunchSpecsTemporarilyBlacklisted" - ): - if len(status["ActiveInstances"]) >= 1: - print( - "I think, but am not sure, that this is an insufficient capacity error. You should check the console for more information." - ) + errorcheck = ec2client.describe_spot_fleet_request_history(SpotFleetRequestId=requestInfo['SpotFleetRequestId'], EventType='error', StartTime=thistime - datetime.timedelta(minutes=1)) + if len(errorcheck['HistoryRecords']) != 0: + print('Your spot fleet request is causing an error and is now being cancelled. Please check your configuration and try again') + for eacherror in errorcheck['HistoryRecords']: + print(eacherror['EventInformation']['EventSubType'] + ' : ' + eacherror['EventInformation']['EventDescription']) + #If there's only one error, and it's the type we see for insufficient capacity (but also other types) + #AND if there are some machines on, indicating that other than capacity the spec is otherwise good, don't cancel + if len(errorcheck['HistoryRecords']) == 1: + if errorcheck['HistoryRecords'][0]['EventInformation']['EventSubType'] == 'allLaunchSpecsTemporarilyBlacklisted': + if len(status['ActiveInstances']) >= 1: + print("I think, but am not sure, that this is an insufficient capacity error. You should check the console for more information.") return - ec2client.cancel_spot_fleet_requests( - SpotFleetRequestIds=[requestInfo["SpotFleetRequestId"]], - TerminateInstances=True, - ) + ec2client.cancel_spot_fleet_requests(SpotFleetRequestIds=[requestInfo['SpotFleetRequestId']], TerminateInstances=True) return # If everything seems good, just bide your time until you're ready to go - print(".") + print('.') time.sleep(20) - status = ec2client.describe_spot_fleet_instances( - SpotFleetRequestId=requestInfo["SpotFleetRequestId"] - ) - - print("Spot fleet successfully created. Your job should start in a few minutes.") + status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId']) + print('Spot fleet successfully created. Your job should start in a few minutes.') ################################# # SERVICE 4: MONITOR JOB ################################# - def monitor(cheapest=False): if len(sys.argv) < 3: - print("Use: run.py monitor spotFleetIdFile") + print('Use: run.py monitor spotFleetIdFile') sys.exit() - if ".json" not in sys.argv[2]: - print("Use: run.py monitor spotFleetIdFile") + if '.json' not in sys.argv[2]: + print('Use: run.py monitor spotFleetIdFile') sys.exit() if len(sys.argv) == 4: cheapest = sys.argv[3] monitorInfo = loadConfig(sys.argv[2]) - monitorcluster = monitorInfo["MONITOR_ECS_CLUSTER"] - monitorapp = monitorInfo["MONITOR_APP_NAME"] - fleetId = monitorInfo["MONITOR_FLEET_ID"] - queueId = monitorInfo["MONITOR_QUEUE_NAME"] + monitorcluster=monitorInfo["MONITOR_ECS_CLUSTER"] + monitorapp=monitorInfo["MONITOR_APP_NAME"] + fleetId=monitorInfo["MONITOR_FLEET_ID"] + queueId=monitorInfo["MONITOR_QUEUE_NAME"] - ec2 = boto3.client("ec2") - cloud = boto3.client("cloudwatch") + ec2 = boto3.client('ec2') + cloud = boto3.client('cloudwatch') # Optional Step 0 - decide if you're going to be cheap rather than fast. This means that you'll get 15 minutes # from the start of the monitor to get as many machines as you get, and then it will set the requested number to 1. @@ -589,98 +556,93 @@ def monitor(cheapest=False): # Step 1: Create job and count messages periodically queue = JobQueue(name=queueId) while queue.pendingLoad(): - # Once an hour (except at midnight) check for terminated machines and delete their alarms. - # This is slooooooow, which is why we don't just do it at the end - curtime = datetime.datetime.now().strftime("%H%M") - if curtime[-2:] == "00": - if curtime[:2] != "00": - killdeadAlarms(fleetId, monitorapp, ec2, cloud) - # Once every 10 minutes, check if all jobs are in process, and if so scale the spot fleet size to match - # the number of jobs still in process WITHOUT force terminating them. - # This can help keep costs down if, for example, you start up 100+ machines to run a large job, and - # 1-10 jobs with errors are keeping it rattling around for hours. - if curtime[-1:] == "9": + #Once an hour (except at midnight) check for terminated machines and delete their alarms. + #This is slooooooow, which is why we don't just do it at the end + curtime=datetime.datetime.now().strftime('%H%M') + if curtime[-2:]=='00': + if curtime[:2]!='00': + killdeadAlarms(fleetId,monitorapp,ec2,cloud) + #Once every 10 minutes, check if all jobs are in process, and if so scale the spot fleet size to match + #the number of jobs still in process WITHOUT force terminating them. + #This can help keep costs down if, for example, you start up 100+ machines to run a large job, and + #1-10 jobs with errors are keeping it rattling around for hours. + if curtime[-1:]=='9': downscaleSpotFleet(queue, fleetId, ec2) time.sleep(MONITOR_TIME) # Step 2: When no messages are pending, stop service # Reload the monitor info, because for long jobs new fleets may have been started, etc monitorInfo = loadConfig(sys.argv[2]) - monitorcluster = monitorInfo["MONITOR_ECS_CLUSTER"] - monitorapp = monitorInfo["MONITOR_APP_NAME"] - fleetId = monitorInfo["MONITOR_FLEET_ID"] - queueId = monitorInfo["MONITOR_QUEUE_NAME"] - bucketId = monitorInfo["MONITOR_BUCKET_NAME"] - loggroupId = monitorInfo["MONITOR_LOG_GROUP_NAME"] - starttime = monitorInfo["MONITOR_START_TIME"] - - ecs = boto3.client("ecs") - ecs.update_service( - cluster=monitorcluster, service=monitorapp + "Service", desiredCount=0 - ) - print("Service has been downscaled") + monitorcluster=monitorInfo["MONITOR_ECS_CLUSTER"] + monitorapp=monitorInfo["MONITOR_APP_NAME"] + fleetId=monitorInfo["MONITOR_FLEET_ID"] + queueId=monitorInfo["MONITOR_QUEUE_NAME"] + bucketId=monitorInfo["MONITOR_BUCKET_NAME"] + loggroupId=monitorInfo["MONITOR_LOG_GROUP_NAME"] + starttime=monitorInfo["MONITOR_START_TIME"] + + ecs = boto3.client('ecs') + ecs.update_service(cluster=monitorcluster, service=monitorapp+'Service', desiredCount=0) + print('Service has been downscaled') # Step3: Delete the alarms from active machines and machines that have died since the last sweep # This is in a try loop, because while it is important, we don't want to not stop the spot fleet try: result = ec2.describe_spot_fleet_instances(SpotFleetRequestId=fleetId) - instancelist = result["ActiveInstances"] + instancelist = result['ActiveInstances'] while len(instancelist) > 0: to_del = instancelist[:100] - del_alarms = [monitorapp + "_" + x["InstanceId"] for x in to_del] + del_alarms = [monitorapp+'_'+x['InstanceId'] for x in to_del] cloud.delete_alarms(AlarmNames=del_alarms) time.sleep(10) instancelist = instancelist[100:] - killdeadAlarms(fleetId, monitorapp) + killdeadAlarms(fleetId,monitorapp) except: pass # Step 4: Read spot fleet id and terminate all EC2 instances - print("Shutting down spot fleet", fleetId) - ec2.cancel_spot_fleet_requests( - SpotFleetRequestIds=[fleetId], TerminateInstances=True - ) - print("Job done.") + print('Shutting down spot fleet',fleetId) + ec2.cancel_spot_fleet_requests(SpotFleetRequestIds=[fleetId], TerminateInstances=True) + print('Job done.') # Step 5. Release other resources # Remove SQS queue, ECS Task Definition, ECS Service - ECS_TASK_NAME = monitorapp + "Task" - ECS_SERVICE_NAME = monitorapp + "Service" - print("Deleting existing queue.") + ECS_TASK_NAME = monitorapp + 'Task' + ECS_SERVICE_NAME = monitorapp + 'Service' + print('Deleting existing queue.') removequeue(queueId) - print("Deleting service") - ecs.delete_service(cluster=monitorcluster, service=ECS_SERVICE_NAME) - print("De-registering task") - deregistertask(ECS_TASK_NAME, ecs) + print('Deleting service') + ecs.delete_service(cluster=monitorcluster, service = ECS_SERVICE_NAME) + print('De-registering task') + deregistertask(ECS_TASK_NAME,ecs) print("Removing cluster if it's not the default and not otherwise in use") removeClusterIfUnused(monitorcluster, ecs) - # Step 6: Export the logs to S3 - logs = boto3.client("logs") + #Step 6: Export the logs to S3 + logs=boto3.client('logs') - print("Transfer of CellProfiler logs to S3 initiated") + print('Transfer of CellProfiler logs to S3 initiated') export_logs(logs, loggroupId, starttime, bucketId) - print("Transfer of per-instance to S3 initiated") - export_logs(logs, loggroupId + "_perInstance", starttime, bucketId) - - print("All export tasks done") + print('Transfer of per-instance to S3 initiated') + export_logs(logs, loggroupId+'_perInstance', starttime, bucketId) + print('All export tasks done') ################################# # MAIN USER INTERACTION ################################# -if __name__ == "__main__": +if __name__ == '__main__': if len(sys.argv) < 2: - print("Use: run.py setup | submitJob | startCluster | monitor") + print('Use: run.py setup | submitJob | startCluster | monitor') sys.exit() - if sys.argv[1] == "setup": + if sys.argv[1] == 'setup': setup() - elif sys.argv[1] == "submitJob": + elif sys.argv[1] == 'submitJob': submitJob() - elif sys.argv[1] == "startCluster": + elif sys.argv[1] == 'startCluster': startCluster() - elif sys.argv[1] == "monitor": - monitor() + elif sys.argv[1] == 'monitor': + monitor() \ No newline at end of file diff --git a/worker/Dockerfile b/worker/Dockerfile index 3644ee0..9870137 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -8,7 +8,7 @@ FROM cellprofiler/cellprofiler:4.2.4 -# Install S3FS +# Install S3FS RUN apt-get -y update && \ apt-get -y upgrade && \ @@ -17,6 +17,7 @@ RUN apt-get -y update && \ autotools-dev \ g++ \ git \ + jq \ libcurl4-gnutls-dev \ libfuse-dev \ libssl-dev \ @@ -25,6 +26,8 @@ RUN apt-get -y update && \ sysstat \ curl +RUN apt-get install --only-upgrade bash + WORKDIR /usr/local/src RUN git clone https://github.com/s3fs-fuse/s3fs-fuse.git WORKDIR /usr/local/src/s3fs-fuse @@ -35,7 +38,10 @@ RUN make install # Install AWS CLI -RUN python3.8 -m pip install awscli +RUN apt install unzip +RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" +RUN unzip awscliv2.zip +RUN ./aws/install # Install boto3 @@ -65,4 +71,3 @@ WORKDIR /home/ubuntu/CellProfiler-plugins WORKDIR /home/ubuntu ENTRYPOINT ["./run-worker.sh"] CMD [""] - diff --git a/worker/cp-worker.py b/worker/cp-worker.py index fc6d6db..71d7d1c 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -6,7 +6,7 @@ import os import re import subprocess -import sys +import sys import time import watchtower import string @@ -20,6 +20,18 @@ PLUGIN_DIR = '/home/ubuntu/CellProfiler-plugins' QUEUE_URL = os.environ['SQS_QUEUE_URL'] AWS_BUCKET = os.environ['AWS_BUCKET'] +if 'SOURCE_BUCKET' not in os.environ: + SOURCE_BUCKET = os.environ['AWS_BUCKET'] +else: + SOURCE_BUCKET = os.environ['SOURCE_BUCKET'] +if 'DESTINATION_BUCKET' not in os.environ: + DESTINATION_BUCKET = os.environ['AWS_BUCKET'] +else: + DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET'] +if 'UPLOAD_FLAGS' in os.environ: + UPLOAD_FLAGS = os.environ['UPLOAD_FLAGS'] +else: + UPLOAD_FLAGS = False LOG_GROUP_NAME= os.environ['LOG_GROUP_NAME'] CHECK_IF_DONE_BOOL= os.environ['CHECK_IF_DONE_BOOL'] EXPECTED_NUMBER_FILES= os.environ['EXPECTED_NUMBER_FILES'] @@ -52,7 +64,7 @@ class JobQueue(): def __init__(self, queueURL): self.client = boto3.client('sqs') self.queueURL = queueURL - + def readMessage(self): response = self.client.receive_message(QueueUrl=self.queueURL, WaitTimeSeconds=20) if 'Messages' in response.keys(): @@ -82,7 +94,7 @@ def monitorAndLog(process,logger): break if output: print(output.strip()) - logger.info(output) + logger.info(output) def printandlog(text,logger): print(text) @@ -144,18 +156,18 @@ def runCellProfiler(message): localOut = LOCAL_OUTPUT + '/%(MetadataID)s' % {'MetadataID': metadataID} remoteOut= os.path.join(message['output'],metadataID) replaceValues = {'PL':message['pipeline'], 'OUT':localOut, 'FL':message['data_file'], - 'DATA': DATA_ROOT, 'Metadata': message['Metadata'], 'IN': message['input'], + 'DATA': DATA_ROOT, 'Metadata': message['Metadata'], 'IN': message['input'], 'MetadataID':metadataID, 'PLUGINS':PLUGIN_DIR } # Start loggging now that we have a job we care about watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=metadataID,create_log_group=False) - logger.addHandler(watchtowerlogger) + logger.addHandler(watchtowerlogger) # See if this is a message you've already handled, if you've so chosen if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') - bucketlist=s3client.list_objects(Bucket=AWS_BUCKET,Prefix=remoteOut+'/') + bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=remoteOut+'/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] if NECESSARY_STRING: @@ -166,8 +178,8 @@ def runCellProfiler(message): logger.removeHandler(watchtowerlogger) return 'SUCCESS' except KeyError: #Returned if that folder does not exist - pass - + pass + csv_name = os.path.join(DATA_ROOT,message['data_file']) downloaded_files = [] @@ -176,10 +188,18 @@ def runCellProfiler(message): if DOWNLOAD_FILES.lower() == 'true': printandlog('Figuring which files to download', logger) import pandas - s3 = boto3.resource('s3') + s3client = boto3.client('s3') if not os.path.exists(localIn): os.mkdir(localIn) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) + printandlog('Downloading ' + message['data_file'] + ' from ' + SOURCE_BUCKET, logger) + csv_insubfolders = message['data_file'].split('/')[-3:] + subfolders = '/'.join((csv_insubfolders)[:-1]) + csv_insubfolders = '/'.join(csv_insubfolders) + csv_name = os.path.join(localIn, csv_insubfolders) + if not os.path.exists(os.path.join(localIn,subfolders)): + os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) + s3client.download_file(SOURCE_BUCKET, message['data_file'], csv_name) + csv_in = pandas.read_csv(os.path.join(localIn,csv_name)) csv_in=csv_in.astype('str') #Figure out what metadata fields we need in this experiment, as a dict if type(message['Metadata'])==dict: @@ -194,36 +214,31 @@ def runCellProfiler(message): csv_in = csv_in[csv_in[eachfilter] == filter_dict[eachfilter]] #Figure out the actual file names and get them channel_list = [x.split('FileName_')[1] for x in csv_in.columns if 'FileName' in x] - printandlog('Downloading files', logger) + printandlog(f'Downloading files for channels {channel_list}', logger) for channel in channel_list: for field in range(csv_in.shape[0]): full_old_file_name = os.path.join(list(csv_in['PathName_'+channel])[field],list(csv_in['FileName_'+channel])[field]) prefix_on_bucket = full_old_file_name.split(DATA_ROOT)[1][1:] new_file_name = os.path.join(localIn,prefix_on_bucket) if not os.path.exists(os.path.split(new_file_name)[0]): - os.makedirs(os.path.split(new_file_name)[0]) + os.makedirs(os.path.split(new_file_name)[0], exist_ok=True) printandlog('made directory '+os.path.split(new_file_name)[0],logger) if not os.path.exists(new_file_name): - s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) + s3client.download_file(SOURCE_BUCKET,prefix_on_bucket,new_file_name) + printandlog('Downloading file '+prefix_on_bucket,logger) downloaded_files.append(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) - import random - newtag = False - while newtag == False: - tag = str(random.randint(100000,999999)) #keep files from overwriting one another - local_csv_name = os.path.join(localIn,tag,os.path.split(csv_name)[1]) - if not os.path.exists(local_csv_name): - if not os.path.exists(os.path.split(local_csv_name)[0]): - os.makedirs(os.path.split(local_csv_name)[0]) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) - csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) - csv_in.to_csv(local_csv_name,index=False) - print('Wrote updated CSV') - newtag = True - else: - newtag = False - csv_name = local_csv_name - + # Update paths in csv to local paths + csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) + csv_in.to_csv(csv_name,index=False) + print('Updated load_data_csv to local paths') + # Download pipeline and update pipeline path in message + printandlog('Downloading ' + message['pipeline'] + ' from ' + SOURCE_BUCKET, logger) + localpipe = os.path.join(localIn, message['pipeline'].split('/')[-1]) + s3client.download_file(SOURCE_BUCKET, message['pipeline'], localpipe) + # Correct locations in CP run command + replaceValues['PL'] = message['pipeline'].split('/')[-1] + replaceValues['DATA'] = localIn # Build and run CellProfiler command cpDone = localOut + '/cp.is.done' cmdstem = 'cellprofiler -c -r ' @@ -238,7 +253,7 @@ def runCellProfiler(message): cmd = cmd % replaceValues print('Running', cmd) logger.info(cmd) - + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) monitorAndLog(subp,logger) @@ -253,8 +268,11 @@ def runCellProfiler(message): while mvtries <3: try: printandlog('Move attempt #'+str(mvtries+1),logger) - cmd = 'aws s3 mv ' + localOut + ' s3://' + AWS_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = 'aws s3 mv ' + localOut + ' s3://' + DESTINATION_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' + if UPLOAD_FLAGS: + cmd += ' ' + UPLOAD_FLAGS + printandlog('Uploading with command ' + cmd, logger) + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out,err = subp.communicate() out=out.decode() err=err.decode() @@ -288,7 +306,7 @@ def runCellProfiler(message): import shutil shutil.rmtree(localOut, ignore_errors=True) return 'CP_PROBLEM' - + ################################# # MAIN WORKER LOOP @@ -320,4 +338,3 @@ def main(): print('Worker started') main() print('Worker finished') - diff --git a/worker/run-worker.sh b/worker/run-worker.sh index b10815f..7efa848 100644 --- a/worker/run-worker.sh +++ b/worker/run-worker.sh @@ -1,12 +1,14 @@ #!/bin/bash - +echo "${BASH_VERSION}" echo "Region $AWS_REGION" echo "Queue $SQS_QUEUE_URL" -echo "Bucket $AWS_BUCKET" +if [[ -z "$AWS_BUCKET" ]] +then + SOURCE_BUCKET=$AWS_BUCKET +fi +echo "Source Bucket $SOURCE_BUCKET" # 1. CONFIGURE AWS CLI -aws configure set aws_access_key_id $AWS_ACCESS_KEY_ID -aws configure set aws_secret_access_key $AWS_SECRET_ACCESS_KEY aws configure set default.region $AWS_REGION MY_INSTANCE_ID=$(curl http://169.254.169.254/latest/meta-data/instance-id) echo "Instance ID $MY_INSTANCE_ID" @@ -17,15 +19,23 @@ aws ec2 create-tags --resources $VOL_0_ID --tags Key=Name,Value=${APP_NAME}Worke VOL_1_ID=$(aws ec2 describe-instance-attribute --instance-id $MY_INSTANCE_ID --attribute blockDeviceMapping --output text --query BlockDeviceMappings[1].Ebs.[VolumeId]) aws ec2 create-tags --resources $VOL_1_ID --tags Key=Name,Value=${APP_NAME}Worker -# 2. MOUNT S3 -echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt -chmod 600 /credentials.txt +# 2. MOUNT S3 mkdir -p /home/ubuntu/bucket mkdir -p /home/ubuntu/local_output -stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt +if [[ -z "$AWS_ACCESS_KEY_ID" ]] +then + AWS_ACCESS_KEY_ID=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI | jq '.AccessKeyId') + AWS_SECRET_ACCESS_KEY=$(curl 169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI | jq '.SecretAccessKey') + echo $AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY > /credentials.txt + chmod 600 /credentials.txt + stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o passwd_file=/credentials.txt -o dbglevel=info +else + stdbuf -o0 s3fs $AWS_BUCKET /home/ubuntu/bucket -o ecs -o dbglevel=info +fi + # 3. SET UP ALARMS -aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" +aws cloudwatch put-metric-alarm --alarm-name ${APP_NAME}_${MY_INSTANCE_ID} --alarm-actions arn:aws:swf:${AWS_REGION}:${OWNER_ID}:action/actions/AWS_EC2.InstanceId.Terminate/1.0 --statistic Maximum --period 60 --threshold 1 --comparison-operator LessThanThreshold --metric-name CPUUtilization --namespace AWS/EC2 --evaluation-periods 15 --dimensions "Name=InstanceId,Value=${MY_INSTANCE_ID}" # 4. RUN VM STAT MONITOR @@ -50,4 +60,3 @@ for((k=0; k<$DOCKER_CORES; k++)); do sleep $SECONDS_TO_START done wait -