Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions worker/cp-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ def printandlog(text,logger):
#################################

def runCellProfiler(message):
#List the directories in the bucket- this prevents a strange s3fs error
rootlist=os.listdir(DATA_ROOT)
for eachSubDir in rootlist:
subDirName=os.path.join(DATA_ROOT,eachSubDir)
if os.path.isdir(subDirName):
trashvar=os.system('ls '+subDirName)

# Configure the logs
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -169,6 +163,8 @@ def runCellProfiler(message):
pass

csv_name = os.path.join(DATA_ROOT,message['data_file'])
pipeline_name = os.path.join(DATA_ROOT,message['pipeline'])
input_name = os.path.join(DATA_ROOT,message['input'])
downloaded_files = []

# Optional- download files
Expand All @@ -179,7 +175,25 @@ def runCellProfiler(message):
s3 = boto3.resource('s3')
if not os.path.exists(localIn):
os.mkdir(localIn)
csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file']))
#download pipeline
old_pipeline_name = os.path.join(DATA_ROOT,message['pipeline'])
pipeline_on_bucket_name = old_pipeline_name.split(DATA_ROOT)[1][1:]
new_pipeline_name = os.path.join(localIn,pipeline_on_bucket_name)
if not os.path.exists(os.path.split(new_pipeline_name)[0]):
os.makedirs(os.path.split(new_pipeline_name)[0])
if os.path.exists(new_pipeline_name):
os.remove(new_pipeline_name)
s3.meta.client.download_file(AWS_BUCKET,pipeline_on_bucket_name,new_pipeline_name)
#download CSV
old_csv_name = os.path.join(DATA_ROOT,message['data_file'])
csv_on_bucket_name = old_csv_name.split(DATA_ROOT)[1][1:]
new_csv_name = os.path.join(localIn,csv_on_bucket_name)
if not os.path.exists(os.path.split(new_csv_name)[0]):
os.makedirs(os.path.split(new_csv_name)[0])
if os.path.exists(new_csv_name):
os.remove(new_csv_name)
s3.meta.client.download_file(AWS_BUCKET,csv_on_bucket_name,new_csv_name)
csv_in = pandas.read_csv(new_csv_name)
csv_in=csv_in.astype('str')
#Figure out what metadata fields we need in this experiment, as a dict
if type(message['Metadata'])==dict:
Expand All @@ -206,6 +220,7 @@ def runCellProfiler(message):
if not os.path.exists(new_file_name):
s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name)
downloaded_files.append(new_file_name)
printandlog(new_file_name,logger)
printandlog('Downloaded '+str(len(downloaded_files))+' files',logger)
import random
newtag = False
Expand All @@ -215,24 +230,27 @@ def runCellProfiler(message):
if not os.path.exists(local_csv_name):
if not os.path.exists(os.path.split(local_csv_name)[0]):
os.makedirs(os.path.split(local_csv_name)[0])
csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file']))
csv_in = pandas.read_csv(new_csv_name)
csv_in=csv_in.astype('str')
csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True)
csv_in.to_csv(local_csv_name,index=False)
print('Wrote updated CSV')
printandlog('Wrote updated CSV',logger)
newtag = True
else:
newtag = False
csv_name = local_csv_name
pipeline_name = new_pipeline_name
input_name = localIn

# Build and run CellProfiler command
cpDone = localOut + '/cp.is.done'
cmdstem = 'cellprofiler -c -r '
if message['pipeline'][-3:]!='.h5':
cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone
cmd = cmdstem + '-p '+pipeline_name+' -i '+input_name+' -o %(OUT)s -d ' + cpDone
cmd += ' --data-file='+csv_name+' '
cmd += '-g %(Metadata)s'
else:
cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s'
cmd = cmdstem + '-p '+pipeline_name+' -i '+input_name+' -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s'
if USE_PLUGINS.lower() == 'true':
cmd += ' --plugins-directory=%(PLUGINS)s'
cmd = cmd % replaceValues
Expand Down