From e7ba3b00b599495c8f2f87f87286d4590ff5b0a3 Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 22:13:27 -0500 Subject: [PATCH 1/8] download pipeline and CSV, never use S3FS --- worker/cp-worker.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index fc6d6db..e988b22 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -169,6 +169,7 @@ def runCellProfiler(message): pass csv_name = os.path.join(DATA_ROOT,message['data_file']) + pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) downloaded_files = [] # Optional- download files @@ -179,7 +180,19 @@ def runCellProfiler(message): s3 = boto3.resource('s3') if not os.path.exists(localIn): os.mkdir(localIn) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) + #download pipeline + old_pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) + pipeline_on_bucket_name = old_pipeline_name.split(DATA_ROOT)[1][1:] + new_pipeline_name = os.path.join(localIn,pipeline_on_bucket_name) + s3.meta.client.download_file(AWS_BUCKET,pipeline_on_bucket_name,new_pipeline_name) + #download CSV + old_csv_name = os.path.join(DATA_ROOT,message['data_file']) + csv_on_bucket_name = old_csv_name.split(DATA_ROOT)[1][1:] + new_csv_name = os.path.join(localIn,csv_on_bucket_name) + if os.path.exists(new_csv_name): + os.remove(new_csv_name) + s3.meta.client.download_file(AWS_BUCKET,csv_on_bucket_name,new_csv_name) + csv_in = pandas.read_csv(new_csv_name) csv_in=csv_in.astype('str') #Figure out what metadata fields we need in this experiment, as a dict if type(message['Metadata'])==dict: @@ -215,7 +228,7 @@ def runCellProfiler(message): if not os.path.exists(local_csv_name): if not os.path.exists(os.path.split(local_csv_name)[0]): os.makedirs(os.path.split(local_csv_name)[0]) - csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file'])) + csv_in = pandas.read_csv(new_csv_name) csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) csv_in.to_csv(local_csv_name,index=False) print('Wrote updated CSV') @@ -223,16 +236,17 @@ def runCellProfiler(message): else: newtag = False csv_name = local_csv_name + pipeline_name = new_pipeline_name # Build and run CellProfiler command cpDone = localOut + '/cp.is.done' cmdstem = 'cellprofiler -c -r ' if message['pipeline'][-3:]!='.h5': - cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + cmd = cmdstem + '-p '+pipeline_name+' -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone cmd += ' --data-file='+csv_name+' ' cmd += '-g %(Metadata)s' else: - cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' + cmd = cmdstem + '-p '+pipeline_name+' -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' if USE_PLUGINS.lower() == 'true': cmd += ' --plugins-directory=%(PLUGINS)s' cmd = cmd % replaceValues From a16d25cd3e364e327b73927b212892fd7ea60f01 Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 22:17:14 -0500 Subject: [PATCH 2/8] download pipeline and CSV, never use S3FS --- worker/cp-worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index e988b22..2a9ea7e 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -184,6 +184,8 @@ def runCellProfiler(message): old_pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) pipeline_on_bucket_name = old_pipeline_name.split(DATA_ROOT)[1][1:] new_pipeline_name = os.path.join(localIn,pipeline_on_bucket_name) + if os.path.exists(new_pipeline_name): + os.remove(new_pipeline_name) s3.meta.client.download_file(AWS_BUCKET,pipeline_on_bucket_name,new_pipeline_name) #download CSV old_csv_name = os.path.join(DATA_ROOT,message['data_file']) From 71b4959ac57e9049b337c6dd3886ddd78272bad2 Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 22:23:14 -0500 Subject: [PATCH 3/8] don't list --- worker/cp-worker.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 2a9ea7e..3f99c7a 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -93,12 +93,6 @@ def printandlog(text,logger): ################################# def runCellProfiler(message): - #List the directories in the bucket- this prevents a strange s3fs error - rootlist=os.listdir(DATA_ROOT) - for eachSubDir in rootlist: - subDirName=os.path.join(DATA_ROOT,eachSubDir) - if os.path.isdir(subDirName): - trashvar=os.system('ls '+subDirName) # Configure the logs logger = logging.getLogger(__name__) From 8159684df1efb0520b54db729247f0ab82669cab Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 22:27:26 -0500 Subject: [PATCH 4/8] make folders --- worker/cp-worker.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 3f99c7a..f823430 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -178,6 +178,8 @@ def runCellProfiler(message): old_pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) pipeline_on_bucket_name = old_pipeline_name.split(DATA_ROOT)[1][1:] new_pipeline_name = os.path.join(localIn,pipeline_on_bucket_name) + if not os.path.exists(os.path.split(new_pipeline_name)[0]): + os.makedirs(os.path.split(new_pipeline_name)[0]) if os.path.exists(new_pipeline_name): os.remove(new_pipeline_name) s3.meta.client.download_file(AWS_BUCKET,pipeline_on_bucket_name,new_pipeline_name) @@ -185,6 +187,8 @@ def runCellProfiler(message): old_csv_name = os.path.join(DATA_ROOT,message['data_file']) csv_on_bucket_name = old_csv_name.split(DATA_ROOT)[1][1:] new_csv_name = os.path.join(localIn,csv_on_bucket_name) + if not os.path.exists(os.path.split(new_csv_name)[0]): + os.makedirs(os.path.split(new_csv_name)[0]) if os.path.exists(new_csv_name): os.remove(new_csv_name) s3.meta.client.download_file(AWS_BUCKET,csv_on_bucket_name,new_csv_name) From a1c672ec83879722d1e9e411a7a015912794594d Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 22:33:10 -0500 Subject: [PATCH 5/8] can't use the input folder right now --- worker/cp-worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index f823430..0a93aa2 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -164,6 +164,7 @@ def runCellProfiler(message): csv_name = os.path.join(DATA_ROOT,message['data_file']) pipeline_name = os.path.join(DATA_ROOT,message['pipeline']) + input_name = os.path.join(DATA_ROOT,message['input']) downloaded_files = [] # Optional- download files @@ -237,16 +238,17 @@ def runCellProfiler(message): newtag = False csv_name = local_csv_name pipeline_name = new_pipeline_name + input_name = localIn # Build and run CellProfiler command cpDone = localOut + '/cp.is.done' cmdstem = 'cellprofiler -c -r ' if message['pipeline'][-3:]!='.h5': - cmd = cmdstem + '-p '+pipeline_name+' -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + cmd = cmdstem + '-p '+pipeline_name+' -i '+input_name+' -o %(OUT)s -d ' + cpDone cmd += ' --data-file='+csv_name+' ' cmd += '-g %(Metadata)s' else: - cmd = cmdstem + '-p '+pipeline_name+' -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' + cmd = cmdstem + '-p '+pipeline_name+' -i '+input_name+' -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' if USE_PLUGINS.lower() == 'true': cmd += ' --plugins-directory=%(PLUGINS)s' cmd = cmd % replaceValues From 8c2ca70925ead2c61c1c63f840c3ae53b8da254b Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 23:00:23 -0500 Subject: [PATCH 6/8] logging --- worker/cp-worker.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 0a93aa2..289240c 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -230,9 +230,12 @@ def runCellProfiler(message): if not os.path.exists(os.path.split(local_csv_name)[0]): os.makedirs(os.path.split(local_csv_name)[0]) csv_in = pandas.read_csv(new_csv_name) + csv_in=csv_in.astype('str') csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) csv_in.to_csv(local_csv_name,index=False) - print('Wrote updated CSV') + printandlog('Wrote updated CSV',logger) + csvhead = csv_in.head() + printandlog(csvhead,logger) newtag = True else: newtag = False From a2de0e9a3ad48d42351697525670830ff41ec63a Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 23:33:29 -0500 Subject: [PATCH 7/8] hell if I know --- worker/cp-worker.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 289240c..42226c7 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -220,6 +220,7 @@ def runCellProfiler(message): if not os.path.exists(new_file_name): s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) + printandlog(new_file_name) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random newtag = False @@ -234,8 +235,6 @@ def runCellProfiler(message): csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) csv_in.to_csv(local_csv_name,index=False) printandlog('Wrote updated CSV',logger) - csvhead = csv_in.head() - printandlog(csvhead,logger) newtag = True else: newtag = False From 95341e33edf928ab15be17bfc0ea231aa50c3cbf Mon Sep 17 00:00:00 2001 From: Beth Cimini Date: Sat, 14 Nov 2020 23:38:39 -0500 Subject: [PATCH 8/8] hell if I know --- worker/cp-worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 42226c7..ea1d483 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -220,7 +220,7 @@ def runCellProfiler(message): if not os.path.exists(new_file_name): s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) downloaded_files.append(new_file_name) - printandlog(new_file_name) + printandlog(new_file_name,logger) printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) import random newtag = False