diff --git a/worker/cp-worker.py b/worker/cp-worker.py index fc6d6db..ea1d483 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -93,12 +93,6 @@ def printandlog(text,logger): ################################# def runCellProfiler(message): - #List the directories in the bucket- this prevents a strange s3fs error - rootlist=os.listdir(DATA_ROOT) - for eachSubDir in rootlist: - subDirName=os.path.join(DATA_ROOT,eachSubDir) - if os.path.isdir(subDirName): - trashvar=os.system('ls '+subDirName) # Configure the logs logger = logging.getLogger(__name__) @@ -169,6 +163,8 @@ def runCellProfiler(message): pass csv_name = os.path.join(DATA_ROOT,message['data_file']) + pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) + input_name = os.path.join(DATA_ROOT,message['input']) downloaded_files = [] # Optional- download files @@ -179,7 +175,25 @@ def runCellProfiler(message): s3 = boto3.resource('s3') if not os.path.exists(localIn): os.mkdir(localIn) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) + #download pipeline + old_pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) + pipeline_on_bucket_name = old_pipeline_name.split(DATA_ROOT)[1][1:] + new_pipeline_name = os.path.join(localIn,pipeline_on_bucket_name) + if not os.path.exists(os.path.split(new_pipeline_name)[0]): + os.makedirs(os.path.split(new_pipeline_name)[0]) + if os.path.exists(new_pipeline_name): + os.remove(new_pipeline_name) + s3.meta.client.download_file(AWS_BUCKET,pipeline_on_bucket_name,new_pipeline_name) + #download CSV + old_csv_name = os.path.join(DATA_ROOT,message['data_file']) + csv_on_bucket_name = old_csv_name.split(DATA_ROOT)[1][1:] + new_csv_name = os.path.join(localIn,csv_on_bucket_name) + if not os.path.exists(os.path.split(new_csv_name)[0]): + os.makedirs(os.path.split(new_csv_name)[0]) + if os.path.exists(new_csv_name): + os.remove(new_csv_name) + s3.meta.client.download_file(AWS_BUCKET,csv_on_bucket_name,new_csv_name) + csv_in = pandas.read_csv(new_csv_name) csv_in=csv_in.astype('str') #Figure out what metadata fields we need in this experiment, as a dict if type(message['Metadata'])==dict: @@ -206,6 +220,7 @@ def runCellProfiler(message): if not os.path.exists(new_file_name): s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) + printandlog(new_file_name,logger) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random newtag = False @@ -215,24 +230,27 @@ def runCellProfiler(message): if not os.path.exists(local_csv_name): if not os.path.exists(os.path.split(local_csv_name)[0]): os.makedirs(os.path.split(local_csv_name)[0]) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) + csv_in = pandas.read_csv(new_csv_name) + csv_in=csv_in.astype('str') csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) csv_in.to_csv(local_csv_name,index=False) - print('Wrote updated CSV') + printandlog('Wrote updated CSV',logger) newtag = True else: newtag = False csv_name = local_csv_name + pipeline_name = new_pipeline_name + input_name = localIn # Build and run CellProfiler command cpDone = localOut + '/cp.is.done' cmdstem = 'cellprofiler -c -r ' if message['pipeline'][-3:]!='.h5': - cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + cmd = cmdstem + '-p '+pipeline_name+' -i '+input_name+' -o %(OUT)s -d ' + cpDone cmd += ' --data-file='+csv_name+' ' cmd += '-g %(Metadata)s' else: - cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' + cmd = cmdstem + '-p '+pipeline_name+' -i '+input_name+' -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' if USE_PLUGINS.lower() == 'true': cmd += ' --plugins-directory=%(PLUGINS)s' cmd = cmd % replaceValues