Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,29 @@ You can use it for whatever you want!

This code is an example of how to use AWS distributed infrastructure for running anything Dockerized.
The configuration of the AWS resources is done using boto3 and the AWS CLI.
The worker is written in Python and is encapsulated in a Docker container. There are four AWS components that are minimally needed to run distributed jobs:
The worker is written in Python and is encapsulated in a Docker container.
There are four AWS components that are minimally needed to run distributed jobs:


1. An SQS queue
2. An ECS cluster
3. An S3 bucket
4. A spot fleet of EC2 instances


All of them can be managed individually through the AWS Management Console.
However, this code helps to get started quickly and run a job autonomously if all the configuration is correct.
The code prepares the infrastructure to run a distributed job.
The code runs a script that links all these components and prepares the infrastructure to run a distributed job.
When the job is completed, the code is also able to stop resources and clean up components.
It also adds logging and alarms via CloudWatch, helping the user troubleshoot runs and destroy stuck machines.

## Running the code

### Step 1
Edit the `config.py` file with all the relevant information for your job. Then, start creating the basic AWS resources by running the following script:
Edit the config.py file with all the relevant information for your job.
Then, start creating the basic AWS resources by running the following script:

$ python run.py setup
$ python3 run.py setup

This script initializes the resources in AWS.
Notice that the docker registry is built separately and you can modify the worker code to build your own.
Expand All @@ -34,9 +38,10 @@ Any time you modify the worker code, you need to update the docker registry usin
### Step 2
After the first script runs successfully, the job can now be submitted to with the following command:

$ python run.py submitJob files/exampleJob.json
$ python3 run.py submitJob files/exampleJob.json

Running the script uploads the tasks that are configured in the json file. You have to customize the `exampleJob.json` file with information that makes sense for your project.
Running the script uploads the tasks that are configured in the json file.
You have to customize the exampleJob.json file with information that make sense for your project.
You'll want to figure out which information is generic and which is the information that makes each job unique.

### Step 3
Expand All @@ -45,14 +50,14 @@ This code starts a fleet of spot EC2 instances which will run the worker code.
The worker code is encapsulated in Docker containers, and the code uses ECS services to inject them in EC2.
All this is automated with the following command:

$ python run.py startCluster files/exampleFleet.json
$ python3 run.py startCluster files/exampleFleet.json

After the cluster is ready, the code informs you that everything is setup, and saves the spot fleet identifier in a file for further reference.

### Step 4
When the cluster is up and running, you can monitor progress using the following command:

$ python run.py monitor files/APP_NAMESpotFleetRequestId.json
$ python3 run.py monitor files/APP_NAMESpotFleetRequestId.json

The file APP_NAMESpotFleetRequestId.json is created after the cluster is setup in step 3.
It is important to keep this monitor running if you want to automatically shutdown computing resources when there are no more tasks in the queue (recommended).
Expand Down
112 changes: 70 additions & 42 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,45 @@
# AUXILIARY FUNCTIONS
#################################

def get_aws_credentials(AWS_PROFILE):
session = boto3.Session(profile_name=AWS_PROFILE)
credentials = session.get_credentials()
return credentials.access_key, credentials.secret_key

def generate_task_definition(AWS_PROFILE):
taskRoleArn = False
task_definition = TASK_DEFINITION.copy()
key, secret = get_aws_credentials(AWS_PROFILE)

config = configparser.ConfigParser()
config.read(f"{os.environ['HOME']}/.aws/config")

if config.has_section(AWS_PROFILE):
profile_name = AWS_PROFILE
elif config.has_section(f'profile {AWS_PROFILE}'):
profile_name = f'profile {AWS_PROFILE}'
else:
print ('Problem handling profile')

if config.has_option(profile_name, 'role_arn'):
print ("Using role for credentials", config[profile_name]['role_arn'])
taskRoleArn = config[profile_name]['role_arn']
else:
if config.has_option(profile_name, 'source_profile'):
creds = configparser.ConfigParser()
creds.read(f"{os.environ['HOME']}/.aws/credentials")
source_profile = config[profile_name]['source_profile']
aws_access_key = creds[source_profile]['aws_access_key_id']
aws_secret_key = creds[source_profile]['aws_secret_access_key']
elif config.has_option(profile_name, 'aws_access_key_id'):
aws_access_key = config[profile_name]['aws_access_key_id']
aws_secret_key = config[profile_name]['aws_secret_access_key']
else:
print ("Problem getting credentials")
task_definition['containerDefinitions'][0]['environment'] += [
{
"name": "AWS_ACCESS_KEY_ID",
"value": aws_access_key
},
{
"name": "AWS_SECRET_ACCESS_KEY",
"value": aws_secret_key
}]

sqs = boto3.client('sqs')
queue_name = get_queue_url(sqs)
task_definition['containerDefinitions'][0]['environment'] += [
Expand All @@ -79,14 +110,6 @@ def generate_task_definition(AWS_PROFILE):
'name': 'SQS_QUEUE_URL',
'value': queue_name
},
{
"name": "AWS_ACCESS_KEY_ID",
"value": key
},
{
"name": "AWS_SECRET_ACCESS_KEY",
"value": secret
},
{
"name": "AWS_BUCKET",
"value": AWS_BUCKET
Expand Down Expand Up @@ -122,13 +145,18 @@ def generate_task_definition(AWS_PROFILE):
{
"name": "NECESSARY_STRING",
"value": NECESSARY_STRING
}
}
]
return task_definition
return task_definition, taskRoleArn

def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE):
task_definition = generate_task_definition(AWS_PROFILE)
ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions'])
task_definition, taskRoleArn = generate_task_definition(AWS_PROFILE)
if not taskRoleArn:
ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions'])
elif taskRoleArn:
ecs.register_task_definition(family=ECS_TASK_NAME,containerDefinitions=task_definition['containerDefinitions'],taskRoleArn=taskRoleArn)
else:
print('Mistake in handling role for Task Definition.')
print('Task definition registered')

def get_or_create_cluster(ecs):
Expand Down Expand Up @@ -187,14 +215,14 @@ def killdeadAlarms(fleetId,monitorapp,ec2,cloud):
todel.append(eachevent['EventInformation']['InstanceId'])

existing_alarms = [x['AlarmName'] for x in cloud.describe_alarms(AlarmNamePrefix=monitorapp)['MetricAlarms']]

for eachmachine in todel:
monitorname = monitorapp+'_'+eachmachine
if monitorname in existing_alarms:
cloud.delete_alarms(AlarmNames=[monitorname])
print('Deleted', monitorname, 'if it existed')
time.sleep(3)

print('Old alarms deleted')

def generateECSconfig(ECS_CLUSTER,APP_NAME,AWS_BUCKET,s3client):
Expand Down Expand Up @@ -240,7 +268,7 @@ def removequeue(queueName):
for eachUrl in queueoutput["QueueUrls"]:
if eachUrl.split('/')[-1] == queueName:
queueUrl=eachUrl

sqs.delete_queue(QueueUrl=queueUrl)

def deregistertask(taskName, ecs):
Expand Down Expand Up @@ -270,7 +298,7 @@ def downscaleSpotFleet(queue, spotFleetID, ec2, manual=False):

def export_logs(logs, loggroupId, starttime, bucketId):
result = logs.create_export_task(taskName = loggroupId, logGroupName = loggroupId, fromTime = int(starttime), to = int(time.time()*1000), destination = bucketId, destinationPrefix = 'exportedlogs/'+loggroupId)

logExportId = result['taskId']

while True:
Expand All @@ -293,7 +321,7 @@ def __init__(self,name=None):
self.queue = self.sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME)
else:
self.queue = self.sqs.get_queue_by_name(QueueName=name)
self.inProcess = -1
self.inProcess = -1
self.pending = -1

def scheduleBatch(self, data):
Expand Down Expand Up @@ -362,7 +390,7 @@ def submitJob():
print('Job submitted. Check your queue')

#################################
# SERVICE 3: START CLUSTER
# SERVICE 3: START CLUSTER
#################################

def startCluster():
Expand All @@ -381,7 +409,7 @@ def startCluster():
spotfleetConfig['SpotPrice'] = '%.2f' %MACHINE_PRICE
DOCKER_BASE_SIZE = int(round(float(EBS_VOL_SIZE)/int(TASKS_PER_MACHINE))) - 2
userData=generateUserData(ecsConfigFile,DOCKER_BASE_SIZE)
for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])):
for LaunchSpecification in range(0,len(spotfleetConfig['LaunchSpecifications'])):
spotfleetConfig['LaunchSpecifications'][LaunchSpecification]["UserData"]=userData
spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['BlockDeviceMappings'][1]['Ebs']["VolumeSize"]= EBS_VOL_SIZE
spotfleetConfig['LaunchSpecifications'][LaunchSpecification]['InstanceType'] = MACHINE_TYPE[LaunchSpecification]
Expand All @@ -404,7 +432,7 @@ def startCluster():
createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "'+LOG_GROUP_NAME+'",\n')
createMonitor.write('"MONITOR_START_TIME" : "'+ starttime+'"}\n')
createMonitor.close()

# Step 4: Create a log group for this app and date if one does not already exist
logclient=boto3.client('logs')
loggroupinfo=logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME)
Expand All @@ -415,13 +443,13 @@ def startCluster():
if LOG_GROUP_NAME+'_perInstance' not in groupnames:
logclient.create_log_group(logGroupName=LOG_GROUP_NAME+'_perInstance')
logclient.put_retention_policy(logGroupName=LOG_GROUP_NAME+'_perInstance', retentionInDays=60)

# Step 5: update the ECS service to be ready to inject docker containers in EC2 instances
print('Updating service')
ecs = boto3.client('ecs')
ecs.update_service(cluster=ECS_CLUSTER, service=APP_NAME+'Service', desiredCount=CLUSTER_MACHINES*TASKS_PER_MACHINE)
print('Service updated.')
print('Service updated.')

# Step 6: Monitor the creation of the instances until all are present
status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId'])
#time.sleep(15) # This is now too fast, so sometimes the spot fleet request history throws an error!
Expand All @@ -441,7 +469,7 @@ def startCluster():
return
ec2client.cancel_spot_fleet_requests(SpotFleetRequestIds=[requestInfo['SpotFleetRequestId']], TerminateInstances=True)
return

# If everything seems good, just bide your time until you're ready to go
print('.')
time.sleep(20)
Expand All @@ -450,39 +478,39 @@ def startCluster():
print('Spot fleet successfully created. Your job should start in a few minutes.')

#################################
# SERVICE 4: MONITOR JOB
# SERVICE 4: MONITOR JOB
#################################

def monitor(cheapest=False):
if len(sys.argv) < 3:
print('Use: run.py monitor spotFleetIdFile')
sys.exit()

if '.json' not in sys.argv[2]:
print('Use: run.py monitor spotFleetIdFile')
sys.exit()

if len(sys.argv) == 4:
cheapest = sys.argv[3]

monitorInfo = loadConfig(sys.argv[2])
monitorcluster=monitorInfo["MONITOR_ECS_CLUSTER"]
monitorapp=monitorInfo["MONITOR_APP_NAME"]
fleetId=monitorInfo["MONITOR_FLEET_ID"]
queueId=monitorInfo["MONITOR_QUEUE_NAME"]

ec2 = boto3.client('ec2')
cloud = boto3.client('cloudwatch')
cloud = boto3.client('cloudwatch')

# Optional Step 0 - decide if you're going to be cheap rather than fast. This means that you'll get 15 minutes
# from the start of the monitor to get as many machines as you get, and then it will set the requested number to 1.
# Benefit: this will always be the cheapest possible way to run, because if machines die they'll die fast,
# Potential downside- if machines are at low availability when you start to run, you'll only ever get a small number
# Benefit: this will always be the cheapest possible way to run, because if machines die they'll die fast,
# Potential downside- if machines are at low availability when you start to run, you'll only ever get a small number
# of machines (as opposed to getting more later when they become available), so it might take VERY long to run if that happens.
if cheapest:
queue = JobQueue(name=queueId)
startcountdown = time.time()
while queue.pendingLoad():
while queue.pendingLoad():
if time.time() - startcountdown > 900:
downscaleSpotFleet(queue, fleetId, ec2, manual=1)
break
Expand All @@ -491,7 +519,7 @@ def monitor(cheapest=False):
# Step 1: Create job and count messages periodically
queue = JobQueue(name=queueId)
while queue.pendingLoad():
#Once an hour (except at midnight) check for terminated machines and delete their alarms.
#Once an hour (except at midnight) check for terminated machines and delete their alarms.
#This is slooooooow, which is why we don't just do it at the end
curtime=datetime.datetime.now().strftime('%H%M')
if curtime[-2:]=='00':
Expand All @@ -504,7 +532,7 @@ def monitor(cheapest=False):
if curtime[-1:]=='9':
downscaleSpotFleet(queue, fleetId, ec2)
time.sleep(MONITOR_TIME)

# Step 2: When no messages are pending, stop service
# Reload the monitor info, because for long jobs new fleets may have been started, etc
monitorInfo = loadConfig(sys.argv[2])
Expand All @@ -514,7 +542,7 @@ def monitor(cheapest=False):
queueId=monitorInfo["MONITOR_QUEUE_NAME"]
bucketId=monitorInfo["MONITOR_BUCKET_NAME"]
loggroupId=monitorInfo["MONITOR_LOG_GROUP_NAME"]
starttime=monitorInfo["MONITOR_START_TIME"]
starttime=monitorInfo["MONITOR_START_TIME"]

ecs = boto3.client('ecs')
ecs.update_service(cluster=monitorcluster, service=monitorapp+'Service', desiredCount=0)
Expand Down Expand Up @@ -565,14 +593,14 @@ def monitor(cheapest=False):
print('All export tasks done')

#################################
# MAIN USER INTERACTION
# MAIN USER INTERACTION
#################################

if __name__ == '__main__':
if len(sys.argv) < 2:
print('Use: run.py setup | submitJob | startCluster | monitor')
sys.exit()

if sys.argv[1] == 'setup':
setup()
elif sys.argv[1] == 'submitJob':
Expand Down
11 changes: 5 additions & 6 deletions worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
# - [ BROAD'16 ] -
#
# A docker instance for accessing AWS resources
# This wraps the cellprofiler docker registry
# This wraps your docker registry
#


FROM someuser/somedocker:sometag

# Install S3FS

# Install S3FS
USER root
RUN apt-get -y update && \
apt-get -y upgrade && \
apt-get -y install \
Expand All @@ -36,11 +36,11 @@ RUN make install
# Install Python - not needed if you've already got it in your container
# If you have a non-3.8 version, you will need to change python3.8 calls where specified

RUN apt install -y python3.8-dev python3.8-distutils
RUN apt install -y python3.8-dev python3.8-distutils python3-pip

# Install AWS CLI

RUN python3.8 -m pip install awscli
RUN python3.8 -m pip install awscli

# Install boto3

Expand All @@ -62,4 +62,3 @@ RUN chmod 755 run-worker.sh
WORKDIR /home/ubuntu
ENTRYPOINT ["./run-worker.sh"]
CMD [""]

Loading