diff --git a/documentation/DCP-documentation/AWS_hygiene_scripts.md b/documentation/DCP-documentation/AWS_hygiene_scripts.md index cd49e07..31c777c 100644 --- a/documentation/DCP-documentation/AWS_hygiene_scripts.md +++ b/documentation/DCP-documentation/AWS_hygiene_scripts.md @@ -1,6 +1,6 @@ # AWS Hygiene Scripts -See also (AUSPICES)[https://github.com/broadinstitute/AuSPICES] for setting up various hygiene scripts to automatically run in your AWS account. +See also [AUSPICES](https://github.com/broadinstitute/AuSPICES) for setting up various hygiene scripts to automatically run in your AWS account. ## Clean out old alarms diff --git a/documentation/DCP-documentation/_toc.yml b/documentation/DCP-documentation/_toc.yml index a029b59..8dd1776 100644 --- a/documentation/DCP-documentation/_toc.yml +++ b/documentation/DCP-documentation/_toc.yml @@ -11,6 +11,8 @@ parts: - file: config_examples - file: SQS_QUEUE_information - file: step_2_submit_jobs + sections: + - file: passing_files_to_DCP - file: step_3_start_cluster - file: step_4_monitor - caption: diff --git a/documentation/DCP-documentation/images/LoadDataCSV.png b/documentation/DCP-documentation/images/LoadDataCSV.png new file mode 100644 index 0000000..c050d96 Binary files /dev/null and b/documentation/DCP-documentation/images/LoadDataCSV.png differ diff --git a/documentation/DCP-documentation/passing_files_to_DCP.md b/documentation/DCP-documentation/passing_files_to_DCP.md new file mode 100644 index 0000000..7a0ee8f --- /dev/null +++ b/documentation/DCP-documentation/passing_files_to_DCP.md @@ -0,0 +1,72 @@ +# Passing Files to DCP + +Distributed-CellProfiler can be told what files to use through LoadData.csv, Batch Files, or file lists. + +## Load Data + +![LoadData.csv](images/LoadDataCSV.png) + +LoadData.csv are CSVs that tell CellProfiler how the images should be parsed. +At a minimum, this CSV should contain PathName_{NameOfChannel} and FileName_{NameOfChannel} columns for each of your channels, as well as Metadata_{PieceOfMetadata} for each kind of metadata being used to group your image sets. +It can contain any other metadata you would like to track. +Some users have reported issues with using relative paths in the PathName columns; using absolute paths beginning with `/home/ubuntu/bucket/{relativepath}` may increase your odds of success. + +### Creating LoadData.csv + +You can create this CSV yourself via your favorite scripting language. +We maintain a script for creating LoadData.csv from Phenix metadata XML files called [pe2loaddata](https://github.com/broadinstitute/pe2loaddata). + +You can also create the LoadData.csv in a local copy of CellProfiler using the standard input modules of Images, Metadata, NamesAndTypes and Groups. +More written and video information about using the input modules can be found [here](broad.io/CellProfilerInput). +After loading in your images, use the Export->Image Set Listing command. +You will then need to replace the local paths with the paths where the files can be found in the cloud. +If your files are in the same structure, this can be done with a simple find and replace in any text editing software. +(e.g. Find '/Users/eweisbar/Desktop' and replace with '/home/ubuntu/bucket') + +### Using LoadData.csv + +To use a LoadData.csv with submitJobs, put the path to the LoadData.csv in **data_file:**. + +To use a LoadData.csv with run_batch_general.py, enter the name of the LoadData.csv under **#project specific stuff** in `{STEP}name`. +At the bottom of the file, make sure there are no arguments or `batch=False` in the command for the step you are running. +(e.g. `MakeAnalysisJobs()` or `MakeAnalysisJobs(batch=False)`) +Note that if you do not follow our standard file organization, under **#not project specific, unless you deviate from the structure** you will also need to edit `datafilepath`. + +## Batch Files + +Batch files are an easy way to transition from running locally to distributed. +A batch file is an `.h5` file created by CellProfiler which captures all the data needed to run your workflow - pipeline and file information are packaged together. +To use a batch file, your data needs to have the same structure in the cloud as on your local machine. + +### Creating batch files + +To create a batch file, load all your images into a local copy of CellProfiler using the standard input modules of Images, Metadata, NamesAndTypes and Groups. +More written and video information about using the input modules can be found [here](broad.io/CellProfilerInput). +Put the `CreateBatchFiles` module at the end of your pipeline and ensure that it is selected. +Add a path mapping and edit the `Local root path` and `Cluster root path`. +Run the CellProfiler pipeline by pressing the `Analyze Images` button; note that it won't actually run your pipeline but will instead create a batch file. +More information on the `CreateBatchFiles` module can be found [here](https://cellprofiler-manual.s3.amazonaws.com/CellProfiler-4.2.4/modules/fileprocessing.html). + +### Using batch files + +To use a batch file with submitJobs, put the path to the `.h5` file in **data_file:** and **pipeline:**. + +To use a batch file with run_batch_general.py, enter the name of the batch file under **#project specific stuff** in `batchpipename{STEP}`. +At the bottom of the file, set `batch=True` in the command for the step you are running. +(e.g. `MakeAnalysisJobs(batch=True)`) +Note that if you do not follow our standard file organization, under **#not project specific, unless you deviate from the structure** you will also need to edit `batchpath`. + +## File lists + +You can also simply pass a list of absolute file paths (not relative paths) with one file per row in `.txt` format. +Note that file lists themselves do not associate metadata with file paths (in contrast to LoadData.csv files where you can enter any metadata columns you desire.) +Therefore, you need to extract metadata for Distributed-CellProfiler to use for grouping by extracting metadata from file and folder names in the Metadata module in your CellProfiler pipeline. +You can pass additional metadata to CellProfiler by `Add another extraction method`, setting the method to `Import from file` and setting Metadata file location to `Default Input Folder`. + +### Creating File Lists + +Use any text editing software to create a `.txt` file where each line of the file is a path to a single image that you want to process. + +### Using File Lists + +To use a file list with submitJobs, put the path to the `.txt` file in **data_file:**. \ No newline at end of file diff --git a/documentation/DCP-documentation/step_2_submit_jobs.md b/documentation/DCP-documentation/step_2_submit_jobs.md index 13be949..0f413e8 100644 --- a/documentation/DCP-documentation/step_2_submit_jobs.md +++ b/documentation/DCP-documentation/step_2_submit_jobs.md @@ -25,12 +25,9 @@ If using LoadData, make sure your "Base image location" is set to "None". ## Configuring your job file * **pipeline:** The path to your pipeline file. -* **data_file:** The path to your CSV. -At a minimum, this CSV should contain PathName_{NameOfChannel} and FileName_{NameOfChannel} columns for each of your channels, as well as Metadata_{PieceOfMetadata} for each kind of metadata being used to group your image sets. -You can create this CSV yourself via your favorite scripting language or by using the Images, Metadata, and NamesAndTypes modules in CellProfiler to generate image sets then using the Export->Image Set Listing command. -Some users have reported issues with using relative paths in the PathName columns; using absolute paths beginning with `/home/ubuntu/bucket/{relativepath}` may increase your odds of success. +* **data_file:** The path to your LoadData.csv, batch file, or file list file. * **input:** The path to your default input directory. -This is not necessary for every pipeline but can be helpful when non-image files are needed in the pipeline (such as a text file containing quality control rules for the FlagImage module). +This is not necessary for every pipeline but can be helpful when non-image files are needed in the pipeline (such as a text file containing quality control rules for the FlagImage module or a metadata file for use with file lists). DO NOT set this to a large directory, or CellProfiler will try to scan the entire thing before running your pipeline. * **output:** The top output directory you'd like your files placed in. * **output_structure:** By default, Distributed-CellProfiler will put your output in subfolders created by hyphenating all your Metadata entries (see below) in order (e.g. if the individual group being processed was `{"Metadata": "Metadata_Plate=Plate1,Metadata_Well=A01"}`, the output would be placed in `output_top_directory/Plate1-A01`.) diff --git a/documentation/DCP-documentation/troubleshooting_runs.md b/documentation/DCP-documentation/troubleshooting_runs.md index 8cfffd5..01724c4 100644 --- a/documentation/DCP-documentation/troubleshooting_runs.md +++ b/documentation/DCP-documentation/troubleshooting_runs.md @@ -15,7 +15,7 @@ | Jobs completing(total messages decreasing) much more quickly than expected. |"==OUT, SUCCESS"| No outcome/saved files on s3 | | There is a mismatch in your metadata somewhere. |Check the Metadata_ columns in your LoadData.csv for typos or a mismatch with your jobs file. The most common sources of mismatch are case and zero padding (e.g. A01 vs a01 vs A1). Check for these mismatches and edit the job file accordingly. If you use pe2loaddata to create your csvs and the plate was imaged multiple times, pay particular attention to the Metadata_Plate column as numbering reflecting this will be automatically passed into the Load_data.csv | | | Your specified output structure does not match the Metadata passed. |Expected output is seen.| | This is not necessarily an error. If the input grouping is different than the output grouping (e.g. jobs are run by Plate-Well-Site but are all output to a single Plate folder) then this will print in the Cloudwatch log that matches the input structure but actual job progress will print in the Cloudwatch log that matches the output structure. | | | | Your perinstance logs have an IOError indicating that an .h5 batchfile does not exist | No outcome/saved files on s3 | | No batchfiles exist for your project. | Either you need to create the batch files and make sure that they are in the appropriate directory OR re-start and use MakeAnalysisJobs() instead of MakeAnalysisJobs(mode=‘batch’) in run_batch_general.py | -| | | | Machines made in EC2 and dockers are made in ECS but the dockers are not placed on the machines | There is a mismatch in your DCP config file. | Confirm that the MEMORY matches the MACHINE_TYPE set in your config. | +| | | | Machines made in EC2 and dockers are made in ECS but the dockers are not placed on the machines | There is a mismatch in your DCP config file. | Confirm that the MEMORY matches the MACHINE_TYPE set in your config. Confirm that there are no typos in your DOCKERHUB_TAG set in your config. | | | Your perinstance logs have an IOError indicating that CellProfiler cannot open your pipeline | | | You have a corrupted pipeline. | Check if you can open your pipeline locally. It may have been corrupted on upload or it may have an error within the pipeline itself. | | |"== ERR move failed:An error occurred (SlowDown) when calling the PutObject operation (reached max retries: 4): Please reduce your request rate." Error may not show initially and may become more prevalent with time. | | | Too many jobs are finishing too quickly creating a backlog of jobs waiting to upload to S3. | You can 1) check out fewer machines at a time, 2) check out smaller machines and run fewer copies of DCP at the same time, or 3) group jobs in larger groupings (e.g. by Plate instead of Well or Site). If this happens because you have many jobs finishing at the same time (but not finishing very rapidly such that it's not creating an increasing backlog) you can increase SECONDS_TO_START in config.py so there is more separation between jobs finishing.| | | "/home/ubuntu/bucket: Transport endpoint is not connected" | Cannot be accessed by fleet. | | S3FS has stochastically dropped/failed to connect. | Perform your run without using S3FS by setting DOWNLOAD_FILES = TRUE in your config.py. Note that, depending upon your job and machine setup, you may need to increase the size of your EBS volume to account for the files being downloaded. | diff --git a/example_project/README.md b/example_project/README.md new file mode 100644 index 0000000..3d064d1 --- /dev/null +++ b/example_project/README.md @@ -0,0 +1,54 @@ +Included in this folder is all of the resources for running a complete mini-example of Distributed-Cellprofiler. +It includes 3 sample image sets and a CellProfiler pipeline that identifies cells within the images and makes measuremements. +It also includes the Distributed-CellProfiler files pre-configured to create a queue of all 3 jobs and spin up a spot fleet of 3 instances, each of which will process a single image set. + +## Running example project + +### Step 0 + +Before running this mini-example, you will need to set up your AWS infrastructure as described in our [online documentation](https://distributedscience.github.io/Distributed-CellProfiler/step_0_prep.html). +This includes creating the fleet file that you will use in Step 3. + +Upload the 'sample_project' folder to the top level of your bucket. +While in the `Distributed-CellProfiler` folder, use the following command, replacing `yourbucket` with your bucket name: + +```bash +# Copy example files to S3 +BUCKET=yourbucket +aws s3 sync example_project/project_folder s3://${BUCKET}/project_folder + +# Replace the default config with the example config +cp example_project/config.py config.py +``` + +### Step 1 +In config.py you will need to update the following fields specific to your AWS configuration: +``` +# AWS GENERAL SETTINGS: +AWS_REGION = 'us-east-1' +AWS_PROFILE = 'default' # The same profile used by your AWS CLI installation +SSH_KEY_NAME = 'your-key-file.pem' # Expected to be in ~/.ssh +AWS_BUCKET = 'your-bucket-name' +SOURCE_BUCKET = 'your-bucket-name' # Only differs from AWS_BUCKET with advanced configuration +DESTINATION_BUCKET = 'your-bucket-name' # Only differs from AWS_BUCKET with advanced configuration +``` +Then run `python3 run.py setup` + +### Step 2 +This command points to the job file created for this demonstartion and should be run as-is. +`python3 run.py submitJob example_project/files/exampleJob.json` + +### Step 3 +This command should point to whatever fleet file you created in Step 0 so you may need to update the `exampleFleet.json` file name. +`python3 run.py startCluster files/exampleFleet.json` + +### Step 4 +This command points to the monitor file that is automatically created with your run and should be run as-is. +`python3 run.py monitor files/FlyExampleSpotFleetRequestId.json` + +## Results + +While the run is happening, you can watch real-time metrics in your Cloudwatch Dashboard by navigating in the [Cloudwatch console](https://console.aws.amazon.com/cloudwatch). +Note that the metrics update at intervals that may not be helpful with this fast, minimal example. + +After the run is done, you should see your CellProfiler output files in S3 at s3://${BUCKET}/project_folder/output in per-image folders. \ No newline at end of file diff --git a/example_project/config.py b/example_project/config.py new file mode 100644 index 0000000..f589db6 --- /dev/null +++ b/example_project/config.py @@ -0,0 +1,54 @@ +# Constants (User configurable) + +APP_NAME = 'FlyExample' # Used to generate derivative names unique to the application. + +# DOCKER REGISTRY INFORMATION: +DOCKERHUB_TAG = 'cellprofiler/distributed-cellprofiler:2.0.0_4.2.4' + +# AWS GENERAL SETTINGS: +AWS_REGION = 'us-east-1' +AWS_PROFILE = 'default' # The same profile used by your AWS CLI installation +SSH_KEY_NAME = 'your-key-file.pem' # Expected to be in ~/.ssh +AWS_BUCKET = 'your-bucket-name' # Bucket to use for logging (likely all three buckets the same for this example) +SOURCE_BUCKET = 'your-bucket-name' # Bucket to download files from (likely all three buckets the same for this example) +DESTINATION_BUCKET = 'your-bucket-name' # Bucket to upload files to (likely all three buckets the same for this example) + +# EC2 AND ECS INFORMATION: +ECS_CLUSTER = 'default' +CLUSTER_MACHINES = 3 +TASKS_PER_MACHINE = 1 +MACHINE_TYPE = ['c4.xlarge'] +MACHINE_PRICE = 0.10 +EBS_VOL_SIZE = 22 # In GB. Minimum allowed is 22. +DOWNLOAD_FILES = 'False' + +# DOCKER INSTANCE RUNNING ENVIRONMENT: +DOCKER_CORES = 1 # Number of CellProfiler processes to run inside a docker container +CPU_SHARES = DOCKER_CORES * 1024 # ECS computing units assigned to each docker container (1024 units = 1 core) +MEMORY = 7500 # Memory assigned to the docker container in MB +SECONDS_TO_START = 3*60 # Wait before the next CP process is initiated to avoid memory collisions + +# SQS QUEUE INFORMATION: +SQS_QUEUE_NAME = APP_NAME + 'Queue' +SQS_MESSAGE_VISIBILITY = 10*60 # Timeout (secs) for messages in flight (average time to be processed) +SQS_DEAD_LETTER_QUEUE = 'ExampleProject_DeadMessages' + +# LOG GROUP INFORMATION: +LOG_GROUP_NAME = APP_NAME + +# CLOUDWATCH DASHBOARD CREATION +CREATE_DASHBOARD = 'True' # Create a dashboard in Cloudwatch for run +CLEAN_DASHBOARD = 'True' # Automatically remove dashboard at end of run with Monitor + +# REDUNDANCY CHECKS +CHECK_IF_DONE_BOOL = 'False' #True or False- should it check if there are a certain number of non-empty files and delete the job if yes? +EXPECTED_NUMBER_FILES = 7 #What is the number of files that trigger skipping a job? +MIN_FILE_SIZE_BYTES = 1 #What is the minimal number of bytes an object should be to "count"? +NECESSARY_STRING = '' #Is there any string that should be in the file name to "count"? + +# PLUGINS +USE_PLUGINS = 'False' +UPDATE_PLUGINS = 'False' +PLUGINS_COMMIT = '' # What commit or version tag do you want to check out? +INSTALL_REQUIREMENTS = 'False' +REQUIREMENTS_FILE = '' # Path within the plugins repo to a requirements file diff --git a/example_project/files/exampleJob.json b/example_project/files/exampleJob.json new file mode 100644 index 0000000..c1ad5ee --- /dev/null +++ b/example_project/files/exampleJob.json @@ -0,0 +1,15 @@ +{ + "_comment1": "Paths in this file are relative to the root of your S3 bucket", + "pipeline": "project_folder/workspace/ExampleFly.cppipe", + "data_file": "project_folder/workspace/load_data.csv", + "input": "project_folder/workspace/", + "output": "project_folder/output", + "output_structure": "Metadata_Position", + "_comment2": "The following groups are tasks, and each will be run in parallel", + "groups": [ + {"Metadata": "Metadata_Position=2"}, + {"Metadata": "Metadata_Position=76"}, + {"Metadata": "Metadata_Position=218"} + ] +} + diff --git a/example_project/project_folder/images/01_POS002_D.TIF b/example_project/project_folder/images/01_POS002_D.TIF new file mode 100644 index 0000000..9cae667 Binary files /dev/null and b/example_project/project_folder/images/01_POS002_D.TIF differ diff --git a/example_project/project_folder/images/01_POS002_F.TIF b/example_project/project_folder/images/01_POS002_F.TIF new file mode 100644 index 0000000..e08d1fb Binary files /dev/null and b/example_project/project_folder/images/01_POS002_F.TIF differ diff --git a/example_project/project_folder/images/01_POS002_R.TIF b/example_project/project_folder/images/01_POS002_R.TIF new file mode 100644 index 0000000..8180090 Binary files /dev/null and b/example_project/project_folder/images/01_POS002_R.TIF differ diff --git a/example_project/project_folder/images/01_POS076_D.TIF b/example_project/project_folder/images/01_POS076_D.TIF new file mode 100644 index 0000000..1169c81 Binary files /dev/null and b/example_project/project_folder/images/01_POS076_D.TIF differ diff --git a/example_project/project_folder/images/01_POS076_F.TIF b/example_project/project_folder/images/01_POS076_F.TIF new file mode 100644 index 0000000..688b5db Binary files /dev/null and b/example_project/project_folder/images/01_POS076_F.TIF differ diff --git a/example_project/project_folder/images/01_POS076_R.TIF b/example_project/project_folder/images/01_POS076_R.TIF new file mode 100644 index 0000000..0609830 Binary files /dev/null and b/example_project/project_folder/images/01_POS076_R.TIF differ diff --git a/example_project/project_folder/images/01_POS218_D.TIF b/example_project/project_folder/images/01_POS218_D.TIF new file mode 100644 index 0000000..90d6981 Binary files /dev/null and b/example_project/project_folder/images/01_POS218_D.TIF differ diff --git a/example_project/project_folder/images/01_POS218_F.TIF b/example_project/project_folder/images/01_POS218_F.TIF new file mode 100644 index 0000000..4ad625e Binary files /dev/null and b/example_project/project_folder/images/01_POS218_F.TIF differ diff --git a/example_project/project_folder/images/01_POS218_R.TIF b/example_project/project_folder/images/01_POS218_R.TIF new file mode 100644 index 0000000..237807d Binary files /dev/null and b/example_project/project_folder/images/01_POS218_R.TIF differ diff --git a/example_project/project_folder/workspace/ExampleFly.cppipe b/example_project/project_folder/workspace/ExampleFly.cppipe new file mode 100644 index 0000000..e4366c1 --- /dev/null +++ b/example_project/project_folder/workspace/ExampleFly.cppipe @@ -0,0 +1,286 @@ +CellProfiler Pipeline: http://www.cellprofiler.org +Version:5 +DateRevision:500 +GitHash: +ModuleCount:14 +HasImagePlaneDetails:False + +LoadData:[module_num:1|svn_version:'Unknown'|variable_revision_number:6|show_window:True|notes:[]|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Input data file location:Default Input Folder|workspace + Name of the file:load_data.csv + Load images based on this data?:Yes + Base image location:Default Input Folder| + Process just a range of rows?:No + Rows to process:1,100000 + Group images by metadata?:Yes + Select metadata tags for grouping:Position + Rescale intensities?:Yes + +IdentifyPrimaryObjects:[module_num:2|svn_version:'Unknown'|variable_revision_number:15|show_window:True|notes:['Identify the nuclei from the DAPI image. Three-class thresholding performs better than the default two-class thresholding in this case.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select the input image:OrigBlue + Name the primary objects to be identified:Nuclei + Typical diameter of objects, in pixel units (Min,Max):10,40 + Discard objects outside the diameter range?:Yes + Discard objects touching the border of the image?:Yes + Method to distinguish clumped objects:Shape + Method to draw dividing lines between clumped objects:Shape + Size of smoothing filter:10 + Suppress local maxima that are closer than this minimum allowed distance:5 + Speed up by using lower-resolution image to find local maxima?:Yes + Fill holes in identified objects?:After both thresholding and declumping + Automatically calculate size of smoothing filter for declumping?:Yes + Automatically calculate minimum allowed distance between local maxima?:Yes + Handling of objects if excessive number of objects identified:Continue + Maximum number of objects:500 + Use advanced settings?:Yes + Threshold setting version:12 + Threshold strategy:Global + Thresholding method:Minimum Cross-Entropy + Threshold smoothing scale:1.3488 + Threshold correction factor:1.0 + Lower and upper bounds on threshold:0,1 + Manual threshold:0.0 + Select the measurement to threshold with:None + Two-class or three-class thresholding?:Three classes + Log transform before thresholding?:No + Assign pixels in the middle intensity class to the foreground or the background?:Background + Size of adaptive window:10 + Lower outlier fraction:0.05 + Upper outlier fraction:0.05 + Averaging method:Mean + Variance method:Standard deviation + # of deviations:2 + Thresholding method:Otsu + +IdentifySecondaryObjects:[module_num:3|svn_version:'Unknown'|variable_revision_number:10|show_window:True|notes:['Identify the cells by using the nuclei as a "seed" region, then growing outwards until stopped by the image threshold or by a neighbor. The Propagation method is used to delineate the boundary between neighboring cells.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select the input objects:Nuclei + Name the objects to be identified:Cells + Select the method to identify the secondary objects:Propagation + Select the input image:OrigGreen + Number of pixels by which to expand the primary objects:10 + Regularization factor:0.05 + Discard secondary objects touching the border of the image?:No + Discard the associated primary objects?:No + Name the new primary objects:FilteredNuclei + Fill holes in identified objects?:Yes + Threshold setting version:12 + Threshold strategy:Global + Thresholding method:Minimum Cross-Entropy + Threshold smoothing scale:0 + Threshold correction factor:1 + Lower and upper bounds on threshold:0,1 + Manual threshold:0 + Select the measurement to threshold with:None + Two-class or three-class thresholding?:Two classes + Log transform before thresholding?:No + Assign pixels in the middle intensity class to the foreground or the background?:Foreground + Size of adaptive window:10 + Lower outlier fraction:0.05 + Upper outlier fraction:0.05 + Averaging method:Mean + Variance method:Standard deviation + # of deviations:2 + Thresholding method:Otsu + +IdentifyTertiaryObjects:[module_num:4|svn_version:'Unknown'|variable_revision_number:3|show_window:True|notes:['Identify the cytoplasm by "subtracting" the nuclei objects from the cell objects.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select the larger identified objects:Cells + Select the smaller identified objects:Nuclei + Name the tertiary objects to be identified:Cytoplasm + Shrink smaller object prior to subtraction?:Yes + +MeasureObjectSizeShape:[module_num:5|svn_version:'Unknown'|variable_revision_number:3|show_window:True|notes:['Measure morphological features from the cell, nuclei and cytoplasm objects.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select object sets to measure:Cells, Nuclei, Cytoplasm + Calculate the Zernike features?:Yes + Calculate the advanced features?:No + +MeasureObjectIntensity:[module_num:6|svn_version:'Unknown'|variable_revision_number:4|show_window:True|notes:['Measure intensity features from nuclei and cell objects against the DAPI image.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select images to measure:OrigBlue + Select objects to measure:Nuclei, Cells, Cytoplasm + +MeasureTexture:[module_num:7|svn_version:'Unknown'|variable_revision_number:7|show_window:True|notes:['Measure texture features of the nuclei, cells and cytoplasm from the DAPI image.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select images to measure:OrigBlue + Select objects to measure:Nuclei, Cytoplasm, Cells + Enter how many gray levels to measure the texture at:256 + Hidden:1 + Measure whole images or objects?:Both + Texture scale to measure:3 + +MeasureObjectNeighbors:[module_num:8|svn_version:'Unknown'|variable_revision_number:3|show_window:True|notes:['Obtain the nuclei neighborhood measures, considering nuclei within 4 pixels in any direction as a neighbor.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select objects to measure:Nuclei + Select neighboring objects to measure:Nuclei + Method to determine neighbors:Within a specified distance + Neighbor distance:4 + Consider objects discarded for touching image border?:Yes + Retain the image of objects colored by numbers of neighbors?:No + Name the output image:Do not use + Select colormap:Default + Retain the image of objects colored by percent of touching pixels?:No + Name the output image:PercentTouching + Select colormap:Default + +MeasureColocalization:[module_num:9|svn_version:'Unknown'|variable_revision_number:5|show_window:True|notes:['Measure the pixel intensity correlation between the pixels in the nuclei objects in the DAPI and FITC images, as well as the entire image.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select images to measure:OrigBlue, OrigGreen + Set threshold as percentage of maximum intensity for the images:15.0 + Select where to measure correlation:Both + Select objects to measure:Nuclei + Run all metrics?:Accurate + Calculate correlation and slope metrics?:Yes + Calculate the Manders coefficients?:Yes + Calculate the Rank Weighted Colocalization coefficients?:Yes + Calculate the Overlap coefficients?:Yes + Calculate the Manders coefficients using Costes auto threshold?:Yes + Method for Costes thresholding:Fast + +MeasureImageIntensity:[module_num:10|svn_version:'Unknown'|variable_revision_number:4|show_window:True|notes:['Measure the image intensity from the DAPI image.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select images to measure:OrigBlue + Measure the intensity only from areas enclosed by objects?:No + Select input object sets: + Calculate custom percentiles:No + Specify percentiles to measure:10,90 + +MeasureImageQuality:[module_num:11|svn_version:'Unknown'|variable_revision_number:6|show_window:True|notes:['Obtain some measurements for quality control purposes.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Calculate metrics for which images?:Select... + Image count:3 + Scale count:1 + Threshold count:1 + Scale count:1 + Threshold count:1 + Scale count:1 + Threshold count:1 + Select the images to measure:OrigBlue + Include the image rescaling value?:Yes + Calculate blur metrics?:Yes + Spatial scale for blur measurements:20 + Calculate saturation metrics?:Yes + Calculate intensity metrics?:Yes + Calculate thresholds?:Yes + Use all thresholding methods?:No + Select a thresholding method:Otsu + Typical fraction of the image covered by objects:0.1 + Two-class or three-class thresholding?:Two classes + Minimize the weighted variance or the entropy?:Weighted variance + Assign pixels in the middle intensity class to the foreground or the background?:Foreground + Select the images to measure:OrigGreen + Include the image rescaling value?:Yes + Calculate blur metrics?:Yes + Spatial scale for blur measurements:20 + Calculate saturation metrics?:Yes + Calculate intensity metrics?:Yes + Calculate thresholds?:Yes + Use all thresholding methods?:No + Select a thresholding method:Otsu + Typical fraction of the image covered by objects:0.1 + Two-class or three-class thresholding?:Two classes + Minimize the weighted variance or the entropy?:Weighted variance + Assign pixels in the middle intensity class to the foreground or the background?:Foreground + Select the images to measure:OrigRed + Include the image rescaling value?:Yes + Calculate blur metrics?:Yes + Spatial scale for blur measurements:20 + Calculate saturation metrics?:Yes + Calculate intensity metrics?:Yes + Calculate thresholds?:Yes + Use all thresholding methods?:No + Select a thresholding method:Otsu + Typical fraction of the image covered by objects:0.1 + Two-class or three-class thresholding?:Two classes + Minimize the weighted variance or the entropy?:Weighted variance + Assign pixels in the middle intensity class to the foreground or the background?:Foreground + +CalculateMath:[module_num:12|svn_version:'Unknown'|variable_revision_number:3|show_window:True|notes:['Compute a ratio of nuclei mean intensity to nuclear area for each nucleus.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Name the output measurement:Ratio + Operation:Divide + Select the numerator measurement type:Object + Select the numerator objects:Nuclei + Select the numerator measurement:Intensity_MeanIntensity_OrigBlue + Multiply the above operand by:1.0 + Raise the power of above operand by:1.0 + Select the denominator measurement type:Object + Select the denominator objects:Nuclei + Select the denominator measurement:AreaShape_Area + Multiply the above operand by:1.0 + Raise the power of above operand by:1.0 + Take log10 of result?:No + Multiply the result by:1.0 + Raise the power of result by:1.0 + Add to the result:0 + How should the output value be rounded?:Not rounded + Enter how many decimal places the value should be rounded to:0 + Constrain the result to a lower bound?:No + Enter the lower bound:0 + Constrain the result to an upper bound?:No + Enter the upper bound:1 + +ClassifyObjects:[module_num:13|svn_version:'Unknown'|variable_revision_number:4|show_window:True|notes:['Classify the nuclei on the basis of area. Divide the areas into 3 bins and give each bin a name.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Make classification decision based on:Single measurement + Hidden:1 + Hidden:1 + Select the object to be classified:Nuclei + Select the measurement to classify by:AreaShape_Area + Select bin spacing:Evenly spaced bins + Number of bins:3 + Lower threshold:350 + Use a bin for objects below the threshold?:No + Upper threshold:700 + Use a bin for objects above the threshold?:No + Enter the custom thresholds separating the values between bins:0,1 + Give each bin a name?:Yes + Enter the bin names separated by commas:Small,Medium,Large + Retain an image of the classified objects?:No + Name the output image:Do not use + Select a class:None + Name the output objects:ClassifiedObjects + Select the object name:Nuclei + Select the first measurement:None + Method to select the cutoff:Mean + Enter the cutoff value:.5 + Select the second measurement:None + Method to select the cutoff:Mean + Enter the cutoff value:.5 + Use custom names for the bins?:No + Enter the low-low bin name:LowLow + Enter the low-high bin name:HighLow + Enter the high-low bin name:LowHigh + Enter the high-high bin name:HighHigh + Retain an image of the classified objects?:No + Enter the image name:ClassifiedNuclei + Save classes as new object sets?:No + Select the location of the classifier model file:Default Output Folder|None + Rules or classifier file name:None + Allow fuzzy feature matching?:No + +ExportToSpreadsheet:[module_num:14|svn_version:'Unknown'|variable_revision_number:13|show_window:True|notes:['Export any measurements to a comma-delimited file (.csv). The measurements made for the nuclei, cell and cytoplasm objects will be saved to separate .csv files, in addition to the per-image .csv’s.']|batch_state:array([], dtype=uint8)|enabled:True|wants_pause:False] + Select the column delimiter:Comma (",") + Add image metadata columns to your object data file?:No + Add image file and folder names to your object data file?:No + Select the measurements to export:No + Calculate the per-image mean values for object measurements?:Yes + Calculate the per-image median values for object measurements?:No + Calculate the per-image standard deviation values for object measurements?:No + Output file location:Default Output Folder|. + Create a GenePattern GCT file?:No + Select source of sample row name:Metadata + Select the image to use as the identifier:None + Select the metadata to use as the identifier:None + Export all measurement types?:No + Press button to select measurements:None|None + Representation of Nan/Inf:NaN + Add a prefix to file names?:No + Filename prefix:MyExpt_ + Overwrite existing files without warning?:Yes + Data to export:Image + Combine these object measurements with those of the previous object?:No + File name:Image.csv + Use the object name for the file name?:No + Data to export:Nuclei + Combine these object measurements with those of the previous object?:No + File name:Nuclei.csv + Use the object name for the file name?:No + Data to export:Cells + Combine these object measurements with those of the previous object?:No + File name:Cells.csv + Use the object name for the file name?:No + Data to export:Cytoplasm + Combine these object measurements with those of the previous object?:No + File name:Cytoplasm.csv + Use the object name for the file name?:No diff --git a/example_project/project_folder/workspace/load_data.csv b/example_project/project_folder/workspace/load_data.csv new file mode 100644 index 0000000..146390b --- /dev/null +++ b/example_project/project_folder/workspace/load_data.csv @@ -0,0 +1,4 @@ +FileName_OrigBlue,FileName_OrigGreen,FileName_OrigRed,Metadata_Position,PathName_OrigBlue,PathName_OrigGreen,PathName_OrigRed +01_POS002_D.TIF,01_POS002_F.TIF,01_POS002_R.TIF,002,/home/ubuntu/bucket/project_folder/images,/home/ubuntu/bucket/project_folder/images,/home/ubuntu/bucket/project_folder/images +01_POS076_D.TIF,01_POS076_F.TIF,01_POS076_R.TIF,076,/home/ubuntu/bucket/project_folder/images,/home/ubuntu/bucket/project_folder/images,/home/ubuntu/bucket/project_folder/images +01_POS218_D.TIF,01_POS218_F.TIF,01_POS218_R.TIF,218,/home/ubuntu/bucket/project_folder/images,/home/ubuntu/bucket/project_folder/images,/home/ubuntu/bucket/project_folder/images \ No newline at end of file diff --git a/run.py b/run.py index e956536..f143ff1 100644 --- a/run.py +++ b/run.py @@ -1,4 +1,3 @@ -from __future__ import print_function import os, sys import boto3 import datetime @@ -10,7 +9,19 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText +# Back compatability with old config versions +SOURCE_BUCKET = False +UPLOAD_FLAGS = False +UPDATE_PLUGINS = False +CREATE_DASHBOARD = False +CLEAN_DASHBOARD = False + from config import * + +# Back compatability with old config requirements +if ':' in SQS_DEAD_LETTER_QUEUE: + SQS_DEAD_LETTER_QUEUE = SQS_DEAD_LETTER_QUEUE.rsplit(':',1)[1] + WAIT_TIME = 60 MONITOR_TIME = 60 @@ -47,22 +58,12 @@ ] } -SQS_DEFINITION = { - "DelaySeconds": "0", - "MaximumMessageSize": "262144", - "MessageRetentionPeriod": "1209600", - "ReceiveMessageWaitTimeSeconds": "0", - "RedrivePolicy": "{\"deadLetterTargetArn\":\"" + SQS_DEAD_LETTER_QUEUE + "\",\"maxReceiveCount\":\"10\"}", - "VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY) -} - ################################# # AUXILIARY FUNCTIONS ################################# def generate_task_definition(AWS_PROFILE): - taskRoleArn = False task_definition = TASK_DEFINITION.copy() config = configparser.ConfigParser() @@ -79,6 +80,7 @@ def generate_task_definition(AWS_PROFILE): print ("Using role for credentials", config[profile_name]['role_arn']) taskRoleArn = config[profile_name]['role_arn'] else: + taskRoleArn = False if config.has_option(profile_name, 'source_profile'): creds = configparser.ConfigParser() creds.read(f"{os.environ['HOME']}/.aws/credentials") @@ -88,8 +90,13 @@ def generate_task_definition(AWS_PROFILE): elif config.has_option(profile_name, 'aws_access_key_id'): aws_access_key = config[profile_name]['aws_access_key_id'] aws_secret_key = config[profile_name]['aws_secret_access_key'] + elif profile_name == 'default': + creds = configparser.ConfigParser() + creds.read(f"{os.environ['HOME']}/.aws/credentials") + aws_access_key = creds['default']['aws_access_key_id'] + aws_secret_key = creds['default']['aws_secret_access_key'] else: - print ("Problem getting credentials") + print (f"Problem getting credentials.") task_definition['containerDefinitions'][0]['environment'] += [ { "name": "AWS_ACCESS_KEY_ID", @@ -99,62 +106,22 @@ def generate_task_definition(AWS_PROFILE): "name": "AWS_SECRET_ACCESS_KEY", "value": aws_secret_key }] - - sqs = boto3.client('sqs') - queue_name = get_queue_url(sqs) - task_definition['containerDefinitions'][0]['environment'] += [ - { - 'name': 'APP_NAME', - 'value': APP_NAME - }, - { - 'name': 'SQS_QUEUE_URL', - 'value': queue_name - }, - { - "name": "AWS_BUCKET", - "value": AWS_BUCKET - }, - { - "name": "DOCKER_CORES", - "value": str(DOCKER_CORES) - }, - { - "name": "LOG_GROUP_NAME", - "value": LOG_GROUP_NAME - }, - { - "name": "CHECK_IF_DONE_BOOL", - "value": CHECK_IF_DONE_BOOL - }, - { - "name": "EXPECTED_NUMBER_FILES", - "value": str(EXPECTED_NUMBER_FILES) - }, - { - "name": "ECS_CLUSTER", - "value": ECS_CLUSTER - }, - { - "name": "SECONDS_TO_START", - "value": str(SECONDS_TO_START) - }, - { - "name": "MIN_FILE_SIZE_BYTES", - "value": str(MIN_FILE_SIZE_BYTES) - }, - { - "name": "USE_PLUGINS", - "value": str(USE_PLUGINS) - }, - { - "name": "NECESSARY_STRING", - "value": NECESSARY_STRING - }, - { - "name": "DOWNLOAD_FILES", - "value": DOWNLOAD_FILES - } + sqs = boto3.client("sqs") + queue_url = get_queue_url(sqs, SQS_QUEUE_NAME) + task_definition["containerDefinitions"][0]["environment"] += [ + {"name": "APP_NAME", "value": APP_NAME}, + {"name": "SQS_QUEUE_URL", "value": queue_url}, + {"name": "AWS_BUCKET", "value": AWS_BUCKET}, + {"name": "DOCKER_CORES", "value": str(DOCKER_CORES)}, + {"name": "LOG_GROUP_NAME", "value": LOG_GROUP_NAME}, + {"name": "CHECK_IF_DONE_BOOL", "value": CHECK_IF_DONE_BOOL}, + {"name": "EXPECTED_NUMBER_FILES", "value": str(EXPECTED_NUMBER_FILES)}, + {"name": "ECS_CLUSTER", "value": ECS_CLUSTER}, + {"name": "SECONDS_TO_START", "value": str(SECONDS_TO_START)}, + {"name": "MIN_FILE_SIZE_BYTES", "value": str(MIN_FILE_SIZE_BYTES)}, + {"name": "USE_PLUGINS", "value": str(USE_PLUGINS)}, + {"name": "NECESSARY_STRING", "value": NECESSARY_STRING}, + {"name": "DOWNLOAD_FILES", "value": DOWNLOAD_FILES}, ] if SOURCE_BUCKET: task_definition['containerDefinitions'][0]['environment'] += [ @@ -172,6 +139,13 @@ def generate_task_definition(AWS_PROFILE): 'name': 'UPLOAD_FLAGS', 'value': UPLOAD_FLAGS }] + if UPDATE_PLUGINS: + task_definition["containerDefinitions"][0]["environment"] += [ + {"name": "UPDATE_PLUGINS", "value": str(UPDATE_PLUGINS)}, + {"name": "PLUGINS_COMMIT", "value": str(PLUGINS_COMMIT)}, + {"name": "INSTALL_REQUIREMENTS", "value": str(INSTALL_REQUIREMENTS)}, + {"name": "REQUIREMENTS_FILE", "value": str(REQUIREMENTS_FILE)}, + ] return task_definition, taskRoleArn def update_ecs_task_definition(ecs, ECS_TASK_NAME, AWS_PROFILE): @@ -208,18 +182,39 @@ def create_or_update_ecs_service(ecs, ECS_SERVICE_NAME, ECS_TASK_NAME): ecs.create_service(cluster=ECS_CLUSTER, serviceName=ECS_SERVICE_NAME, taskDefinition=ECS_TASK_NAME, desiredCount=0) print('Service created') -def get_queue_url(sqs): +def get_queue_url(sqs, queue_name): result = sqs.list_queues() + queue_url = None if 'QueueUrls' in result.keys(): for u in result['QueueUrls']: - if u.split('/')[-1] == SQS_QUEUE_NAME: - return u - return None + if u.split('/')[-1] == queue_name: + queue_url = u + return queue_url def get_or_create_queue(sqs): - u = get_queue_url(sqs) - if u is None: + queue_url = get_queue_url(sqs, SQS_QUEUE_NAME) + dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE) + if dead_url is None: + print("Creating DeadLetter queue") + sqs.create_queue(QueueName=SQS_DEAD_LETTER_QUEUE) + time.sleep(WAIT_TIME) + dead_url = get_queue_url(sqs, SQS_DEAD_LETTER_QUEUE) + else: + print (f'DeadLetter queue {SQS_DEAD_LETTER_QUEUE} already exists.') + if queue_url is None: print('Creating queue') + response = sqs.get_queue_attributes(QueueUrl=dead_url, AttributeNames=["QueueArn"]) + dead_arn = response["Attributes"]["QueueArn"] + SQS_DEFINITION = { + "DelaySeconds": "0", + "MaximumMessageSize": "262144", + "MessageRetentionPeriod": "1209600", + "ReceiveMessageWaitTimeSeconds": "0", + "RedrivePolicy": '{"deadLetterTargetArn":"' + + dead_arn + + '","maxReceiveCount":"10"}', + "VisibilityTimeout": str(SQS_MESSAGE_VISIBILITY), + } sqs.create_queue(QueueName=SQS_QUEUE_NAME, Attributes=SQS_DEFINITION) time.sleep(WAIT_TIME) else: @@ -334,6 +329,143 @@ def export_logs(logs, loggroupId, starttime, bucketId): break time.sleep(30) +def create_dashboard(requestInfo): + cloudwatch = boto3.client("cloudwatch") + DashboardMessage = { + "widgets": [ + { + "height": 6, + "width": 6, + "y": 0, + "x": 18, + "type": "metric", + "properties": { + "metrics": [ + [ "AWS/SQS", "NumberOfMessagesReceived", "QueueName", f'{APP_NAME}Queue' ], + [ ".", "NumberOfMessagesDeleted", ".", "." ], + ], + "view": "timeSeries", + "stacked": False, + "region": AWS_REGION, + "period": 300, + "stat": "Average" + } + }, + { + "height": 6, + "width": 6, + "y": 0, + "x": 6, + "type": "metric", + "properties": { + "view": "timeSeries", + "stacked": False, + "metrics": [ + [ "AWS/ECS", "MemoryUtilization", "ClusterName", ECS_CLUSTER ] + ], + "region": AWS_REGION, + "period": 300, + "yAxis": { + "left": { + "min": 0 + } + } + } + }, + { + "height": 6, + "width": 6, + "y": 0, + "x": 12, + "type": "metric", + "properties": { + "metrics": [ + [ "AWS/SQS", "ApproximateNumberOfMessagesVisible", "QueueName", f'{APP_NAME}Queue' ], + [ ".", "ApproximateNumberOfMessagesNotVisible", ".", "."], + ], + "view": "timeSeries", + "stacked": True, + "region": AWS_REGION, + "period": 300, + "stat": "Average" + } + }, + { + "height": 6, + "width": 12, + "y": 6, + "x": 12, + "type": "log", + "properties": { + "query": f"SOURCE {APP_NAME} | fields @message| filter @message like 'cellprofiler -c'| stats count_distinct(@message)\n", + "region": AWS_REGION, + "stacked": False, + "title": "Distinct Logs with \"cellprofiler -c\"", + "view": "table" + } + }, + { + "height": 6, + "width": 12, + "y": 6, + "x": 0, + "type": "log", + "properties": { + "query": f"SOURCE {APP_NAME} | fields @message| filter @message like 'cellprofiler -c'| stats count(@message)", + "region": AWS_REGION, + "stacked": False, + "title": "All Logs \"cellprofiler -c\"", + "view": "table" + } + }, + { + "height": 6, + "width": 24, + "y": 12, + "x": 0, + "type": "log", + "properties": { + "query": f"SOURCE {APP_NAME} | fields @message | filter @message like \"Error\" | display @message", + "region": AWS_REGION, + "stacked": False, + "title": "Errors", + "view": "table" + } + }, + { + "height": 6, + "width": 6, + "y": 0, + "x": 0, + "type": "metric", + "properties": { + "metrics": [ + [ "AWS/EC2Spot", "FulfilledCapacity", "FleetRequestId", requestInfo["SpotFleetRequestId"]], + [ ".", "TargetCapacity", ".", "."], + ], + "view": "timeSeries", + "stacked": False, + "region": AWS_REGION, + "period": 300, + "stat": "Average" + } + } + ] + } + DashboardMessage_json = json.dumps(DashboardMessage, indent = 4) + response = cloudwatch.put_dashboard(DashboardName=APP_NAME, DashboardBody=DashboardMessage_json) + if response['DashboardValidationMessages']: + print ('Likely error in Dashboard creation') + print (response['DashboardValidationMessages']) + + +def clean_dashboard(monitorapp): + cloudwatch = boto3.client("cloudwatch") + dashboard_list = cloudwatch.list_dashboards() + for entry in dashboard_list["DashboardEntries"]: + if monitorapp in entry["DashboardName"]: + cloudwatch.delete_dashboards(DashboardNames=[entry["DashboardName"]]) + ################################# # CLASS TO HANDLE SQS QUEUE ################################# @@ -382,9 +514,6 @@ def returnLoad(self): def setup(): ECS_TASK_NAME = APP_NAME + 'Task' ECS_SERVICE_NAME = APP_NAME + 'Service' - USER = os.environ['HOME'].split('/')[-1] - AWS_CONFIG_FILE_NAME = os.environ['HOME'] + '/.aws/config' - AWS_CREDENTIAL_FILE_NAME = os.environ['HOME'] + '/.aws/credentials' sqs = boto3.client('sqs') get_or_create_queue(sqs) ecs = boto3.client('ecs') @@ -510,9 +639,14 @@ def startCluster(): # If everything seems good, just bide your time until you're ready to go print('.') time.sleep(20) - status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId']) - - print('Spot fleet successfully created. Your job should start in a few minutes.') + status = ec2client.describe_spot_fleet_instances( + SpotFleetRequestId=requestInfo["SpotFleetRequestId"] + ) + print("Spot fleet successfully created. Your job should start in a few minutes.") + + if CREATE_DASHBOARD: + print ("Creating CloudWatch dashboard for run metrics") + create_dashboard(requestInfo) ################################# # SERVICE 4: MONITOR JOB @@ -617,6 +751,9 @@ def monitor(cheapest=False): deregistertask(ECS_TASK_NAME,ecs) print("Removing cluster if it's not the default and not otherwise in use") removeClusterIfUnused(monitorcluster, ecs) + # Remove Cloudwatch dashboard if created and cleanup desired + if CREATE_DASHBOARD and CLEAN_DASHBOARD: + clean_dashboard(monitorapp) #Step 6: Export the logs to S3 logs=boto3.client('logs') diff --git a/worker/cp-worker.py b/worker/cp-worker.py index 71d7d1c..f93424e 100644 --- a/worker/cp-worker.py +++ b/worker/cp-worker.py @@ -1,15 +1,10 @@ -from __future__ import print_function import boto3 -import glob import json import logging import os -import re import subprocess -import sys import time import watchtower -import string ################################# # CONSTANT PATHS IN THE CONTAINER @@ -110,7 +105,7 @@ def runCellProfiler(message): for eachSubDir in rootlist: subDirName=os.path.join(DATA_ROOT,eachSubDir) if os.path.isdir(subDirName): - trashvar=os.system('ls '+subDirName) + trashvar=os.system(f'ls {subDirName}') # Configure the logs logger = logging.getLogger(__name__) @@ -130,11 +125,11 @@ def runCellProfiler(message): if eachMetadata not in metadataID: watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=str(message['Metadata'].values()),create_log_group=False) logger.addHandler(watchtowerlogger) - printandlog('Your specified output structure does not match the Metadata passed',logger) + printandlog('Your specified output structure does not match the Metadata passed. If your CellProfiler-pipeline-grouping is different than your output-file-location-grouping (typically because you are using the output_structure job parameter), then this is expected and NOT an error. Cloudwatch logs will be stored under the output-file-location-grouping, rather than the CellProfiler-pipeline-grouping.',logger) logger.removeHandler(watchtowerlogger) else: metadataID = str.replace(metadataID,eachMetadata,message['Metadata'][eachMetadata]) - metadataForCall+=eachMetadata+'='+message['Metadata'][eachMetadata]+',' + metadataForCall+=f"{eachMetadata}={message['Metadata'][eachMetadata]}," message['Metadata']=metadataForCall[:-1] elif 'output_structure' in message.keys(): if message['output_structure']!='': #support for explicit output structuring @@ -143,21 +138,18 @@ def runCellProfiler(message): metadataID = message['output_structure'] for eachMetadata in message['Metadata'].split(','): if eachMetadata.split('=')[0] not in metadataID: - printandlog('Your specified output structure does not match the Metadata passed',logger) + printandlog('Your specified output structure does not match the Metadata passed. If your CellProfiler-pipeline-grouping is different than your output-file-location-grouping (typically because you are using the output_structure job parameter), then this is expected and NOT an error. Cloudwatch logs will be stored under the output-file-location-grouping, rather than the CellProfiler-pipeline-grouping.',logger) else: metadataID = str.replace(metadataID,eachMetadata.split('=')[0],eachMetadata.split('=')[1]) - printandlog('metadataID ='+metadataID, logger) + printandlog(f'metadataID={metadataID}', logger) logger.removeHandler(watchtowerlogger) else: #backwards compatability with 1.0.0 and/or no desire to structure output metadataID = '-'.join([x.split('=')[1] for x in message['Metadata'].split(',')]) # Strip equal signs from the metadata else: #backwards compatability with 1.0.0 and/or no desire to structure output metadataID = '-'.join([x.split('=')[1] for x in message['Metadata'].split(',')]) # Strip equal signs from the metadata - localOut = LOCAL_OUTPUT + '/%(MetadataID)s' % {'MetadataID': metadataID} + localOut = f'{LOCAL_OUTPUT}/{metadataID}' remoteOut= os.path.join(message['output'],metadataID) - replaceValues = {'PL':message['pipeline'], 'OUT':localOut, 'FL':message['data_file'], - 'DATA': DATA_ROOT, 'Metadata': message['Metadata'], 'IN': message['input'], - 'MetadataID':metadataID, 'PLUGINS':PLUGIN_DIR } # Start loggging now that we have a job we care about watchtowerlogger=watchtower.CloudWatchLogHandler(log_group=LOG_GROUP_NAME, stream_name=metadataID,create_log_group=False) @@ -167,7 +159,7 @@ def runCellProfiler(message): if CHECK_IF_DONE_BOOL.upper() == 'TRUE': try: s3client=boto3.client('s3') - bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=remoteOut+'/') + bucketlist=s3client.list_objects(Bucket=DESTINATION_BUCKET,Prefix=f'{remoteOut}/') objectsizelist=[k['Size'] for k in bucketlist['Contents']] objectsizelist = [i for i in objectsizelist if i >= MIN_FILE_SIZE_BYTES] if NECESSARY_STRING: @@ -178,80 +170,94 @@ def runCellProfiler(message): logger.removeHandler(watchtowerlogger) return 'SUCCESS' except KeyError: #Returned if that folder does not exist - pass + pass + + data_file_path = os.path.join(DATA_ROOT,message['data_file']) - csv_name = os.path.join(DATA_ROOT,message['data_file']) downloaded_files = [] # Optional- download files if DOWNLOAD_FILES: if DOWNLOAD_FILES.lower() == 'true': - printandlog('Figuring which files to download', logger) - import pandas - s3client = boto3.client('s3') if not os.path.exists(localIn): os.mkdir(localIn) - printandlog('Downloading ' + message['data_file'] + ' from ' + SOURCE_BUCKET, logger) - csv_insubfolders = message['data_file'].split('/')[-3:] - subfolders = '/'.join((csv_insubfolders)[:-1]) - csv_insubfolders = '/'.join(csv_insubfolders) - csv_name = os.path.join(localIn, csv_insubfolders) - if not os.path.exists(os.path.join(localIn,subfolders)): - os.makedirs(os.path.join(localIn,subfolders), exist_ok=True) - s3client.download_file(SOURCE_BUCKET, message['data_file'], csv_name) - csv_in = pandas.read_csv(os.path.join(localIn,csv_name)) - csv_in=csv_in.astype('str') - #Figure out what metadata fields we need in this experiment, as a dict - if type(message['Metadata'])==dict: - filter_dict = message['Metadata'] - else: - filter_dict = {} - for eachMetadata in message['Metadata'].split(','): - filterkey, filterval = eachMetadata.split('=') - filter_dict[filterkey] = filterval - #Filter our CSV to just the rows CellProfiler will process, so that we can download only what we need - for eachfilter in filter_dict.keys(): - csv_in = csv_in[csv_in[eachfilter] == filter_dict[eachfilter]] - #Figure out the actual file names and get them - channel_list = [x.split('FileName_')[1] for x in csv_in.columns if 'FileName' in x] - printandlog(f'Downloading files for channels {channel_list}', logger) - for channel in channel_list: - for field in range(csv_in.shape[0]): - full_old_file_name = os.path.join(list(csv_in['PathName_'+channel])[field],list(csv_in['FileName_'+channel])[field]) - prefix_on_bucket = full_old_file_name.split(DATA_ROOT)[1][1:] - new_file_name = os.path.join(localIn,prefix_on_bucket) - if not os.path.exists(os.path.split(new_file_name)[0]): - os.makedirs(os.path.split(new_file_name)[0], exist_ok=True) - printandlog('made directory '+os.path.split(new_file_name)[0],logger) - if not os.path.exists(new_file_name): - s3client.download_file(SOURCE_BUCKET,prefix_on_bucket,new_file_name) - printandlog('Downloading file '+prefix_on_bucket,logger) - downloaded_files.append(new_file_name) - printandlog('Downloaded '+str(len(downloaded_files))+' files',logger) - # Update paths in csv to local paths - csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) - csv_in.to_csv(csv_name,index=False) - print('Updated load_data_csv to local paths') - # Download pipeline and update pipeline path in message - printandlog('Downloading ' + message['pipeline'] + ' from ' + SOURCE_BUCKET, logger) - localpipe = os.path.join(localIn, message['pipeline'].split('/')[-1]) - s3client.download_file(SOURCE_BUCKET, message['pipeline'], localpipe) - # Correct locations in CP run command - replaceValues['PL'] = message['pipeline'].split('/')[-1] - replaceValues['DATA'] = localIn + s3 = boto3.resource('s3') + if message['data_file'][-3:]=='.csv': + printandlog('Figuring which files to download', logger) + import pandas + csv_in = pandas.read_csv(data_file_path) + csv_in=csv_in.astype('str') + #Figure out what metadata fields we need in this experiment, as a dict + if type(message['Metadata'])==dict: + filter_dict = message['Metadata'] + else: + filter_dict = {} + for eachMetadata in message['Metadata'].split(','): + filterkey, filterval = eachMetadata.split('=') + filter_dict[filterkey] = filterval + #Filter our CSV to just the rows CellProfiler will process, so that we can download only what we need + for eachfilter in filter_dict.keys(): + csv_in = csv_in[csv_in[eachfilter] == filter_dict[eachfilter]] + if len(csv_in) <= 1: + printandlog('WARNING: All rows filtered out of csv before download. Check your Metadata.') + #Figure out the actual file names and get them + channel_list = [x.split('FileName_')[1] for x in csv_in.columns if 'FileName' in x] + printandlog('Downloading files', logger) + for channel in channel_list: + for field in range(csv_in.shape[0]): + full_old_file_name = os.path.join(list(csv_in[f'PathName_{channel}'])[field],list(csv_in[f'FileName_{channel}'])[field]) + prefix_on_bucket = full_old_file_name.split(DATA_ROOT)[1][1:] + new_file_name = os.path.join(localIn,prefix_on_bucket) + if not os.path.exists(os.path.split(new_file_name)[0]): + os.makedirs(os.path.split(new_file_name)[0]) + printandlog(f'made directory {os.path.split(new_file_name)[0]}',logger) + if not os.path.exists(new_file_name): + s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) + downloaded_files.append(new_file_name) + printandlog(f'Downloaded {str(len(downloaded_files))} files',logger) + import random + newtag = False + while newtag == False: + tag = str(random.randint(100000,999999)) #keep files from overwriting one another + local_data_file_path = os.path.join(localIn,tag,os.path.split(data_file_path)[1]) + if not os.path.exists(local_data_file_path): + if not os.path.exists(os.path.split(local_data_file_path)[0]): + os.makedirs(os.path.split(local_data_file_path)[0]) + csv_in = pandas.read_csv(data_file_path) + csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True) + csv_in.to_csv(local_data_file_path,index=False) + print('Wrote updated CSV') + newtag = True + else: + newtag = False + data_file_path = local_data_file_path + elif message['data_file'][-3:]=='.txt': + printandlog('Downloading files', logger) + with open(data_file_path, 'r') as f: + for file_path in f: + prefix_on_bucket = file_path.split(DATA_ROOT)[1][1:] + new_file_name = os.path.join(localIn,prefix_on_bucket) + if not os.path.exists(os.path.split(new_file_name)[0]): + os.makedirs(os.path.split(new_file_name)[0]) + printandlog(f'made directory {os.path.split(new_file_name)[0]}',logger) + if not os.path.exists(new_file_name): + s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name) + downloaded_files.append(new_file_name) + printandlog(f'Downloaded {str(len(downloaded_files))} files',logger) + # Build and run CellProfiler command - cpDone = localOut + '/cp.is.done' - cmdstem = 'cellprofiler -c -r ' - if message['pipeline'][-3:]!='.h5': - cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone - cmd += ' --data-file='+csv_name+' ' - cmd += '-g %(Metadata)s' + cpDone = f'{localOut}/cp.is.done' + if message['data_file'][-4:]=='.csv': + cmd = f'cellprofiler -c -r -p {DATA_ROOT}/{message["pipeline"]} -i {DATA_ROOT}/{message["input"]} -o {localOut} -d {cpDone} --data-file={data_file_path} -g {message["Metadata"]}' + elif message['data_file'][-3:]=='.h5': + cmd = f'cellprofiler -c -r -p {DATA_ROOT}/{message["pipeline"]} -i {DATA_ROOT}/{message["input"]} -o {localOut} -d {cpDone} -g {message["Metadata"]}' + elif message['data_file'][-4:]=='.txt': + cmd = f'cellprofiler -c -r -p {DATA_ROOT}/{message["pipeline"]} -i {DATA_ROOT}/{message["input"]} -o {localOut} -d {cpDone} --file-list={data_file_path} -g {message["Metadata"]}' else: - cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s' + printandlog("Didn't recognize input file",logger) if USE_PLUGINS.lower() == 'true': - cmd += ' --plugins-directory=%(PLUGINS)s' - cmd = cmd % replaceValues - print('Running', cmd) + cmd += f' --plugins-directory={PLUGIN_DIR}' + print(f'Running {cmd}') logger.info(cmd) subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -267,24 +273,24 @@ def runCellProfiler(message): mvtries=0 while mvtries <3: try: - printandlog('Move attempt #'+str(mvtries+1),logger) - cmd = 'aws s3 mv ' + localOut + ' s3://' + DESTINATION_BUCKET + '/' + remoteOut + ' --recursive --exclude=cp.is.done' - if UPLOAD_FLAGS: - cmd += ' ' + UPLOAD_FLAGS - printandlog('Uploading with command ' + cmd, logger) - subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out,err = subp.communicate() - out=out.decode() - err=err.decode() - printandlog('== OUT \n'+out, logger) - if err == '': - break - else: - printandlog('== ERR \n'+err,logger) - mvtries+=1 + printandlog(f'Move attempt #{mvtries+1}',logger) + cmd = f'aws s3 mv {localOut} s3://{DESTINATION_BUCKET}/{remoteOut} --recursive --exclude=cp.is.done' + if UPLOAD_FLAGS: + cmd += f' {UPLOAD_FLAGS}' + printandlog(f'Uploading with command {cmd}', logger) + subp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out,err = subp.communicate() + out=out.decode() + err=err.decode() + printandlog(f'== OUT {out}', logger) + if err == '': + break + else: + printandlog(f'== ERR {err}',logger) + mvtries+=1 except: printandlog('Move failed',logger) - printandlog('== ERR \n'+err,logger) + printandlog(f'== ERR {err}',logger) time.sleep(30) mvtries+=1 if next(open(cpDone))=='Complete\n': @@ -293,7 +299,7 @@ def runCellProfiler(message): logger.removeHandler(watchtowerlogger) return 'SUCCESS' else: - printandlog('OUTPUT PROBLEM. Giving up on '+metadataID,logger) + printandlog(f'OUTPUT PROBLEM. Giving up on {metadataID}',logger) logger.removeHandler(watchtowerlogger) return 'OUTPUT_PROBLEM' else: